diff --git a/cmd/admin-handlers-users.go b/cmd/admin-handlers-users.go index 31f6f852b..b7b76e0c2 100644 --- a/cmd/admin-handlers-users.go +++ b/cmd/admin-handlers-users.go @@ -1217,9 +1217,9 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ } bucketStorageCache.InitOnce(10*time.Second, - cachevalue.Opts{ReturnLastGood: true, NoWait: true}, - func() (DataUsageInfo, error) { - ctx, done := context.WithTimeout(context.Background(), 2*time.Second) + cachevalue.Opts{ReturnLastGood: true}, + func(ctx context.Context) (DataUsageInfo, error) { + ctx, done := context.WithTimeout(ctx, 2*time.Second) defer done() return loadDataUsageFromBackend(ctx, objectAPI) diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index 294a1ba60..b287fe6e3 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -49,8 +49,8 @@ var bucketStorageCache = cachevalue.New[DataUsageInfo]() func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) { bucketStorageCache.InitOnce(10*time.Second, cachevalue.Opts{ReturnLastGood: true, NoWait: true}, - func() (DataUsageInfo, error) { - ctx, done := context.WithTimeout(context.Background(), 2*time.Second) + func(ctx context.Context) (DataUsageInfo, error) { + ctx, done := context.WithTimeout(ctx, 2*time.Second) defer done() return loadDataUsageFromBackend(ctx, objAPI) @@ -59,8 +59,8 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) { } // GetBucketUsageInfo return bucket usage info for a given bucket -func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) { - dui, err := bucketStorageCache.Get() +func (sys *BucketQuotaSys) GetBucketUsageInfo(ctx context.Context, bucket string) (BucketUsageInfo, error) { + dui, err := bucketStorageCache.GetWithCtx(ctx) timedout := OperationTimedOut{} if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) { if len(dui.BucketsUsage) > 0 { @@ -118,7 +118,7 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string, return BucketQuotaExceeded{Bucket: bucket} } - bui, err := sys.GetBucketUsageInfo(bucket) + bui, err := sys.GetBucketUsageInfo(ctx, bucket) if err != nil { return err } diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 339ac16a2..51227e106 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -79,12 +79,12 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket prefixUsageCache.InitOnce(30*time.Second, // No need to fail upon Update() error, fallback to old value. cachevalue.Opts{ReturnLastGood: true, NoWait: true}, - func() (map[string]uint64, error) { + func(ctx context.Context) (map[string]uint64, error) { m := make(map[string]uint64) for _, pool := range z.serverPools { for _, er := range pool.sets { // Load bucket usage prefixes - ctx, done := context.WithTimeout(context.Background(), 2*time.Second) + ctx, done := context.WithTimeout(ctx, 2*time.Second) ok := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName) == nil done() if ok { @@ -107,7 +107,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket }, ) - return prefixUsageCache.Get() + return prefixUsageCache.GetWithCtx(ctx) } func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 1d2192d0e..6616618c9 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1962,8 +1962,8 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions if opts.Cached { listBucketsCache.InitOnce(time.Second, cachevalue.Opts{ReturnLastGood: true, NoWait: true}, - func() ([]BucketInfo, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + func(ctx context.Context) ([]BucketInfo, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() buckets, err = z.s3Peer.ListBuckets(ctx, opts) @@ -1980,7 +1980,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions }, ) - return listBucketsCache.Get() + return listBucketsCache.GetWithCtx(ctx) } buckets, err = z.s3Peer.ListBuckets(ctx, opts) diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 93ea1f6e9..b03860af3 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -361,7 +361,7 @@ type MetricsGroupOpts struct { func (g *MetricsGroupV2) RegisterRead(read func(context.Context) []MetricV2) { g.metricsCache = cachevalue.NewFromFunc(g.cacheInterval, cachevalue.Opts{ReturnLastGood: true}, - func() ([]MetricV2, error) { + func(ctx context.Context) ([]MetricV2, error) { if g.metricsGroupOpts.dependGlobalObjectAPI { objLayer := newObjectLayerFn() // Service not initialized yet diff --git a/cmd/metrics-v3-cache.go b/cmd/metrics-v3-cache.go index ac40681a8..1e8856a78 100644 --- a/cmd/metrics-v3-cache.go +++ b/cmd/metrics-v3-cache.go @@ -18,6 +18,7 @@ package cmd import ( + "context" "sync" "time" @@ -57,7 +58,7 @@ type nodesOnline struct { } func newNodesUpDownCache() *cachevalue.Cache[nodesOnline] { - loadNodesUpDown := func() (v nodesOnline, err error) { + loadNodesUpDown := func(ctx context.Context) (v nodesOnline, err error) { v.Online, v.Offline = globalNotificationSys.GetPeerOnlineCount() return } @@ -84,7 +85,7 @@ type storageMetrics struct { } func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] { - loadDataUsage := func() (u DataUsageInfo, err error) { + loadDataUsage := func(ctx context.Context) (u DataUsageInfo, err error) { objLayer := newObjectLayerFn() if objLayer == nil { return @@ -100,7 +101,7 @@ func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] { } func newESetHealthResultCache() *cachevalue.Cache[HealthResult] { - loadHealth := func() (r HealthResult, err error) { + loadHealth := func(ctx context.Context) (r HealthResult, err error) { objLayer := newObjectLayerFn() if objLayer == nil { return @@ -157,7 +158,7 @@ func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] { prevDriveIOStatsRefreshedAt time.Time ) - loadDriveMetrics := func() (v storageMetrics, err error) { + loadDriveMetrics := func(ctx context.Context) (v storageMetrics, err error) { objLayer := newObjectLayerFn() if objLayer == nil { return @@ -203,7 +204,7 @@ func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] { } func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] { - loadCPUMetrics := func() (v madmin.CPUMetrics, err error) { + loadCPUMetrics := func(ctx context.Context) (v madmin.CPUMetrics, err error) { var types madmin.MetricType = madmin.MetricsCPU m := collectLocalMetrics(types, collectMetricsOpts{ @@ -228,7 +229,7 @@ func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] { } func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] { - loadMemoryMetrics := func() (v madmin.MemInfo, err error) { + loadMemoryMetrics := func(ctx context.Context) (v madmin.MemInfo, err error) { var types madmin.MetricType = madmin.MetricsMem m := collectLocalMetrics(types, collectMetricsOpts{ @@ -253,7 +254,7 @@ func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] { } func newClusterStorageInfoCache() *cachevalue.Cache[storageMetrics] { - loadStorageInfo := func() (v storageMetrics, err error) { + loadStorageInfo := func(ctx context.Context) (v storageMetrics, err error) { objLayer := newObjectLayerFn() if objLayer == nil { return storageMetrics{}, nil diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index a744d772e..4b869b9c0 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -116,7 +116,7 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - if client == nil || !client.IsOnline() { + if client == nil { return nil, errPeerNotReachable } @@ -129,6 +129,10 @@ func (client *peerRESTClient) callWithContext(ctx context.Context, method string return respBody, nil } + if xnet.IsNetworkOrHostDown(err, true) { + return nil, errPeerNotReachable + } + return nil, err } @@ -139,7 +143,11 @@ func (client *peerRESTClient) String() string { // IsOnline returns true if the peer client is online. func (client *peerRESTClient) IsOnline() bool { - return client.restClient.IsOnline() + conn := client.gridConn() + if conn == nil { + return false + } + return client.restClient.IsOnline() || conn.State() == grid.StateConnected } // Close - marks the client as closed. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 294300774..f53e35d74 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -191,8 +191,13 @@ func (client *storageRESTClient) String() string { return client.endpoint.String() } -// IsOnline - returns whether RPC client failed to connect or not. +// IsOnline - returns whether client failed to connect or not. func (client *storageRESTClient) IsOnline() bool { + return client.restClient.IsOnline() || client.IsOnlineWS() +} + +// IsOnlineWS - returns whether websocket client failed to connect or not. +func (client *storageRESTClient) IsOnlineWS() bool { return client.gridConn.State() == grid.StateConnected } @@ -254,7 +259,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC } func (client *storageRESTClient) GetDiskID() (string, error) { - if !client.IsOnline() { + if !client.IsOnlineWS() { // make sure to check if the disk is offline, since the underlying // value is cached we should attempt to invalidate it if such calls // were attempted. This can lead to false success under certain conditions @@ -275,7 +280,7 @@ func (client *storageRESTClient) SetDiskID(id string) { } func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) { - if !client.IsOnline() { + if !client.IsOnlineWS() { // make sure to check if the disk is offline, since the underlying // value is cached we should attempt to invalidate it if such calls // were attempted. This can lead to false success under certain conditions @@ -302,10 +307,9 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti return info, nil } // In all other cases cache the value upto 1sec. - client.diskInfoCache.InitOnce(time.Second, - cachevalue.Opts{CacheError: true}, - func() (info DiskInfo, err error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + client.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{}, + func(ctx context.Context) (info DiskInfo, err error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() nopts := DiskInfoOptions{DiskID: *client.diskID.Load(), Metrics: true} @@ -321,7 +325,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti }, ) - return client.diskInfoCache.Get() + return client.diskInfoCache.GetWithCtx(ctx) } // MakeVolBulk - create multiple volumes in a bulk operation. diff --git a/cmd/veeam-sos-api.go b/cmd/veeam-sos-api.go index 61f5f38a4..cdf8e71a0 100644 --- a/cmd/veeam-sos-api.go +++ b/cmd/veeam-sos-api.go @@ -156,7 +156,7 @@ func veeamSOSAPIGetObject(ctx context.Context, bucket, object string, rs *HTTPRa } q, _ := globalBucketQuotaSys.Get(ctx, bucket) - binfo, _ := globalBucketQuotaSys.GetBucketUsageInfo(bucket) + binfo, _ := globalBucketQuotaSys.GetBucketUsageInfo(ctx, bucket) ci := capacityInfo{ Used: int64(binfo.Size), diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index b987e5eb2..8699ae252 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -99,7 +99,7 @@ type xlStorageDiskIDCheck struct { func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { p.metricsCache.InitOnce(5*time.Second, cachevalue.Opts{}, - func() (DiskMetrics, error) { + func(ctx context.Context) (DiskMetrics, error) { diskMetric := DiskMetrics{ LastMinute: make(map[string]AccElem, len(p.apiLatencies)), APICalls: make(map[string]uint64, len(p.apiCalls)), @@ -114,7 +114,7 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { }, ) - diskMetric, _ := p.metricsCache.Get() + diskMetric, _ := p.metricsCache.GetWithCtx(context.Background()) // Do not need this value to be cached. diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load() diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load() diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1050a9e04..c70c1065b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -331,7 +331,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { // Initialize DiskInfo cache s.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{}, - func() (DiskInfo, error) { + func(ctx context.Context) (DiskInfo, error) { dcinfo := DiskInfo{} di, err := getDiskInfo(s.drivePath) if err != nil { @@ -752,8 +752,8 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error { // DiskInfo provides current information about disk space usage, // total free inodes and underlying filesystem. -func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) { - info, err = s.diskInfoCache.Get() +func (s *xlStorage) DiskInfo(ctx context.Context, _ DiskInfoOptions) (info DiskInfo, err error) { + info, err = s.diskInfoCache.GetWithCtx(ctx) info.NRRequests = s.nrRequests info.Rotational = s.rotational info.MountPath = s.drivePath diff --git a/internal/cachevalue/cache.go b/internal/cachevalue/cache.go index 46b5c756e..064d9dce6 100644 --- a/internal/cachevalue/cache.go +++ b/internal/cachevalue/cache.go @@ -18,6 +18,7 @@ package cachevalue import ( + "context" "sync" "sync/atomic" "time" @@ -30,11 +31,6 @@ type Opts struct { // Returns the last good value AND the error. ReturnLastGood bool - // If CacheError is set, errors will be cached as well - // and not continuously try to update. - // Should not be combined with ReturnLastGood. - CacheError bool - // If NoWait is set, Get() will return the last good value, // if TTL has expired but 2x TTL has not yet passed, // but will fetch a new value in the background. @@ -50,7 +46,7 @@ type Cache[T any] struct { // Only one caller will call this function at any time, others will be blocking. // The returned value can no longer be modified once returned. // Should be set before calling Get(). - updateFn func() (T, error) + updateFn func(ctx context.Context) (T, error) // ttl for a cached value. ttl time.Duration @@ -62,10 +58,7 @@ type Cache[T any] struct { Once sync.Once // Managed values. - valErr atomic.Pointer[struct { - v T - e error - }] + val atomic.Pointer[T] lastUpdateMs atomic.Int64 updating sync.Mutex } @@ -78,7 +71,7 @@ func New[T any]() *Cache[T] { // NewFromFunc allocates a new cached value instance and initializes it with an // update function, making it ready for use. -func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error)) *Cache[T] { +func NewFromFunc[T any](ttl time.Duration, opts Opts, update func(ctx context.Context) (T, error)) *Cache[T] { return &Cache[T]{ ttl: ttl, updateFn: update, @@ -88,7 +81,7 @@ func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error)) // InitOnce initializes the cache with a TTL and an update function. It is // guaranteed to be called only once. -func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, error)) { +func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func(ctx context.Context) (T, error)) { t.Once.Do(func() { t.ttl = ttl t.updateFn = update @@ -96,61 +89,68 @@ func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, erro }) } -// Get will return a cached value or fetch a new one. +// GetWithCtx will return a cached value or fetch a new one. +// passes a caller context, if caller context cancels nothing +// is cached. // Tf the Update function returns an error the value is forwarded as is and not cached. -func (t *Cache[T]) Get() (T, error) { - v := t.valErr.Load() +func (t *Cache[T]) GetWithCtx(ctx context.Context) (T, error) { + v := t.val.Load() ttl := t.ttl vTime := t.lastUpdateMs.Load() tNow := time.Now().UnixMilli() if v != nil && tNow-vTime < ttl.Milliseconds() { - if v.e == nil { - return v.v, nil - } - if v.e != nil && t.opts.CacheError || t.opts.ReturnLastGood { - return v.v, v.e - } + return *v, nil } - // Fetch new value. - if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 && (v.e == nil || t.opts.CacheError) { + // Fetch new value asynchronously, while we do not return an error + // if v != nil value or + if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 { if t.updating.TryLock() { go func() { defer t.updating.Unlock() - t.update() + t.update(context.Background()) }() } - return v.v, v.e + return *v, nil } // Get lock. Either we get it or we wait for it. t.updating.Lock() + defer t.updating.Unlock() + if time.Since(time.UnixMilli(t.lastUpdateMs.Load())) < ttl { // There is a new value, release lock and return it. - v = t.valErr.Load() - t.updating.Unlock() - return v.v, v.e - } - t.update() - v = t.valErr.Load() - t.updating.Unlock() - return v.v, v.e -} - -func (t *Cache[T]) update() { - val, err := t.updateFn() - if err != nil { - if t.opts.ReturnLastGood { - // Keep last good value. - v := t.valErr.Load() - if v != nil { - val = v.v - } + if v = t.val.Load(); v != nil { + return *v, nil } } - t.valErr.Store(&struct { - v T - e error - }{v: val, e: err}) - t.lastUpdateMs.Store(time.Now().UnixMilli()) + + if err := t.update(ctx); err != nil { + var empty T + return empty, err + } + + return *t.val.Load(), nil +} + +// Get will return a cached value or fetch a new one. +// Tf the Update function returns an error the value is forwarded as is and not cached. +func (t *Cache[T]) Get() (T, error) { + return t.GetWithCtx(context.Background()) +} + +func (t *Cache[T]) update(ctx context.Context) error { + val, err := t.updateFn(ctx) + if err != nil { + if t.opts.ReturnLastGood && t.val.Load() != nil { + // Keep last good value, so update + // does not return an error. + return nil + } + return err + } + + t.val.Store(&val) + t.lastUpdateMs.Store(time.Now().UnixMilli()) + return nil } diff --git a/internal/cachevalue/cache_test.go b/internal/cachevalue/cache_test.go index 4be5d5673..023695fbb 100644 --- a/internal/cachevalue/cache_test.go +++ b/internal/cachevalue/cache_test.go @@ -18,15 +18,76 @@ package cachevalue import ( + "context" + "errors" "testing" "time" ) +func slowCaller(ctx context.Context) error { + sl := time.NewTimer(time.Second) + defer sl.Stop() + + select { + case <-sl.C: + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +func TestCacheCtx(t *testing.T) { + cache := New[time.Time]() + t.Parallel() + cache.InitOnce(2*time.Second, Opts{}, + func(ctx context.Context) (time.Time, error) { + return time.Now(), slowCaller(ctx) + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel context to test. + + _, err := cache.GetWithCtx(ctx) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled err, got %v", err) + } + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + t1, err := cache.GetWithCtx(ctx) + if err != nil { + t.Fatalf("expected nil err, got %v", err) + } + + t2, err := cache.GetWithCtx(ctx) + if err != nil { + t.Fatalf("expected nil err, got %v", err) + } + + if !t1.Equal(t2) { + t.Fatalf("expected time to be equal: %s != %s", t1, t2) + } + + time.Sleep(3 * time.Second) + + t3, err := cache.GetWithCtx(ctx) + if err != nil { + t.Fatalf("expected nil err, got %v", err) + } + + if t1.Equal(t3) { + t.Fatalf("expected time to be un-equal: %s == %s", t1, t3) + } +} + func TestCache(t *testing.T) { cache := New[time.Time]() t.Parallel() cache.InitOnce(2*time.Second, Opts{}, - func() (time.Time, error) { + func(ctx context.Context) (time.Time, error) { return time.Now(), nil }, ) @@ -50,7 +111,7 @@ func TestCache(t *testing.T) { func BenchmarkCache(b *testing.B) { cache := New[time.Time]() cache.InitOnce(1*time.Millisecond, Opts{}, - func() (time.Time, error) { + func(ctx context.Context) (time.Time, error) { return time.Now(), nil }, )