From 2ca9c533efb56a45959277da7395fb883d43eb8e Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 19 May 2021 23:38:30 +0200 Subject: [PATCH] feat: implement in-progress partial bucket updates (#12279) --- cmd/data-scanner.go | 79 ++++++++++++++++++++++++++++++--- cmd/data-usage-cache.go | 45 ++++++++++++++++++- cmd/erasure.go | 21 ++++++++- cmd/fs-v1.go | 18 +++++++- cmd/naughty-disk_test.go | 4 +- cmd/storage-interface.go | 2 +- cmd/storage-rest-client.go | 39 +++++++++++++--- cmd/storage-rest-common.go | 2 +- cmd/storage-rest-server.go | 35 ++++++++++++++- cmd/xl-storage-disk-id-check.go | 6 +-- cmd/xl-storage.go | 5 ++- 11 files changed, 229 insertions(+), 27 deletions(-) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 84bc85490..27bc7533e 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -178,17 +178,23 @@ type cachedFolder struct { } type folderScanner struct { - root string - getSize getSizeFn - oldCache dataUsageCache - newCache dataUsageCache - withFilter *bloomFilter + root string + getSize getSizeFn + oldCache dataUsageCache + newCache dataUsageCache + updateCache dataUsageCache + withFilter *bloomFilter dataUsageScannerDebug bool healFolderInclude uint32 // Include a clean folder one in n cycles. healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude disks []StorageAPI + + // If set updates will be sent regularly to this channel. + // Will not be closed when returned. + updates chan<- dataUsageEntry + lastUpdate time.Time } // Cache structure and compaction: @@ -255,9 +261,11 @@ func scanDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize: getSize, oldCache: cache, newCache: dataUsageCache{Info: cache.Info}, + updateCache: dataUsageCache{Info: cache.Info}, dataUsageScannerDebug: intDataUpdateTracker.debug, healFolderInclude: 0, healObjectSelect: 0, + updates: cache.Info.updates, } // Add disks for set healing. @@ -318,6 +326,22 @@ func scanDataFolder(ctx context.Context, basePath string, cache dataUsageCache, return s.newCache, nil } +// sendUpdate() should be called on a regular basis when the newCache contains more recent total than previously. +// May or may not send an update upstream. +func (f *folderScanner) sendUpdate() { + // Send at most an update every minute. + if f.updates == nil || time.Since(f.lastUpdate) < time.Minute { + return + } + if flat := f.updateCache.sizeRecursive(f.newCache.Info.Name); flat != nil { + select { + case f.updates <- *flat: + default: + } + f.lastUpdate = time.Now() + } +} + // scanFolder will scan the provided folder. // Files found in the folders will be added to f.newCache. // If final is provided folders will be put into f.newFolders or f.existingFolders. @@ -328,6 +352,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int thisHash := hashPath(folder.name) // Store initial compaction state. wasCompacted := into.Compacted + for { select { case <-done: @@ -358,6 +383,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int if folder.name != dataUsageRoot && !filter.containsDir(folder.name) { if f.healObjectSelect == 0 || !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) { f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) + f.updateCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) if f.dataUsageScannerDebug { console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name) } @@ -412,6 +438,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int delete(abandonedChildren, h.Key()) // h.Key() already accounted for. if exists { existingFolders = append(existingFolders, this) + f.updateCache.copyWithChildren(&f.oldCache, h, &thisHash) } else { newFolders = append(newFolders, this) } @@ -497,11 +524,44 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int if !into.Compacted { into.addChild(dataUsageHash(folder.name)) } + // We scanned a folder, optionally send update. + f.sendUpdate() } // Scan new... for _, folder := range newFolders { + h := hashPath(folder.name) + // Add new folders to the update tree so totals update for these. + if !into.Compacted { + var foundAny bool + parent := thisHash + for parent != hashPath(f.updateCache.Info.Name) { + e := f.updateCache.find(parent.Key()) + if e == nil || e.Compacted { + foundAny = true + break + } + if next := f.updateCache.searchParent(parent); next == nil { + foundAny = true + break + } else { + parent = *next + } + } + if !foundAny { + // Add non-compacted empty entry. + f.updateCache.replaceHashed(h, &thisHash, dataUsageEntry{}) + } + } scanFolder(folder) + // Add new folders if this is new and we don't have existing. + if !into.Compacted { + parent := f.updateCache.find(thisHash.Key()) + if parent != nil && !parent.Compacted { + f.updateCache.deleteRecursive(h) + f.updateCache.copyWithChildren(&f.newCache, h, &thisHash) + } + } } // Scan existing... @@ -512,7 +572,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int // and the entry itself is compacted. if !into.Compacted && f.oldCache.isCompacted(h) { if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) { - if !h.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) { + if f.healObjectSelect == 0 || !h.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) { // Transfer and add as child... f.newCache.copyWithChildren(&f.oldCache, h, folder.parent) into.addChild(h) @@ -733,6 +793,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int if !into.Compacted { f.newCache.reduceChildrenOf(thisHash, dataScannerCompactAtChildren, f.newCache.Info.Name != folder.name) } + if _, ok := f.updateCache.Cache[thisHash.Key()]; !wasCompacted && ok { + // Replace if existed before. + if flat := f.newCache.sizeRecursive(thisHash.Key()); flat != nil { + f.updateCache.deleteRecursive(thisHash) + f.updateCache.replaceHashed(thisHash, folder.parent, *flat) + } + } return nil } diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index e2f1756da..50f2e703a 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -153,8 +153,15 @@ type dataUsageCacheInfo struct { // indicates if the disk is being healed and scanner // should skip healing the disk SkipHealing bool - BloomFilter []byte `msg:"BloomFilter,omitempty"` - lifeCycle *lifecycle.Lifecycle `msg:"-"` + BloomFilter []byte `msg:"BloomFilter,omitempty"` + + // Active lifecycle, if any on the bucket + lifeCycle *lifecycle.Lifecycle `msg:"-"` + + // optional updates channel. + // If set updates will be sent regularly to this channel. + // Will not be closed when returned. + updates chan<- dataUsageEntry `msg:"-"` } func (e *dataUsageEntry) addSizes(summary sizeSummary) { @@ -259,6 +266,31 @@ func (d *dataUsageCache) findChildrenCopy(h dataUsageHash) dataUsageHashMap { return res } +// searchParent will search for the parent of h. +// This is an O(N*N) operation if there is no parent or it cannot be guessed. +func (d *dataUsageCache) searchParent(h dataUsageHash) *dataUsageHash { + want := h.Key() + if idx := strings.LastIndexByte(want, '/'); idx >= 0 { + if v := d.find(want[:idx]); v != nil { + for child := range v.Children { + if child == want { + found := hashPath(want[:idx]) + return &found + } + } + } + } + for k, v := range d.Cache { + for child := range v.Children { + if child == want { + found := dataUsageHash(k) + return &found + } + } + } + return nil +} + // Returns nil if not found. func (d *dataUsageCache) subCache(path string) dataUsageCache { dst := dataUsageCache{Info: dataUsageCacheInfo{ @@ -281,6 +313,15 @@ func (d *dataUsageCache) deleteRecursive(h dataUsageHash) { } } +// deleteChildren will delete any children, but not the entry itself. +func (d *dataUsageCache) deleteChildren(h dataUsageHash) { + if existing, ok := d.Cache[h.String()]; ok { + for child := range existing.Children { + d.deleteRecursive(dataUsageHash(child)) + } + } +} + // replaceRootChild will replace the child of root in d with the root of 'other'. func (d *dataUsageCache) replaceRootChild(other dataUsageCache) { otherRoot := other.root() diff --git a/cmd/erasure.go b/cmd/erasure.go index e5815b9b7..1c3e9e696 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -31,6 +31,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/console" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -475,11 +476,28 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf NextCycle: 0, } } + // Collect updates. + updates := make(chan dataUsageEntry, 1) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for update := range updates { + bucketResults <- dataUsageEntryInfo{ + Name: cache.Info.Name, + Parent: dataUsageRoot, + Entry: update, + } + if intDataUpdateTracker.debug { + console.Debugln("bucket", bucket.Name, "got update", update) + } + } + }() // Calc usage before := cache.Info.LastUpdate var err error - cache, err = disk.NSScanner(ctx, cache) + cache, err = disk.NSScanner(ctx, cache, updates) cache.Info.BloomFilter = nil if err != nil { if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) { @@ -490,6 +508,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf continue } + wg.Wait() var root dataUsageEntry if r := cache.root(); r != nil { root = cache.flatten(*r) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index d7a14fe27..f5694ce27 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -239,6 +239,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) { // NSScanner returns data usage stats of the current FS deployment func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error { + defer close(updates) // Load bucket totals var totalCache dataUsageCache err := totalCache.load(ctx, fs, dataUsageCacheName) @@ -273,7 +274,21 @@ func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates cha bCache.Info.Name = b.Name } bCache.Info.BloomFilter = totalCache.Info.BloomFilter - + upds := make(chan dataUsageEntry, 1) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for update := range upds { + totalCache.replace(b.Name, dataUsageRoot, update) + if intDataUpdateTracker.debug { + logger.Info(color.Green("NSScanner:")+" Got update:", len(totalCache.Cache)) + } + cloned := totalCache.clone() + updates <- cloned.dui(dataUsageRoot, buckets) + } + }() + bCache.Info.updates = upds cache, err := fs.scanBucket(ctx, b.Name, bCache) select { case <-ctx.Done(): @@ -282,6 +297,7 @@ func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates cha } logger.LogIf(ctx, err) cache.Info.BloomFilter = nil + wg.Wait() if cache.root() == nil { if intDataUpdateTracker.debug { diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 1fd83c0d8..9ce2f377a 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -110,8 +110,8 @@ func (d *naughtyDisk) SetDiskID(id string) { d.disk.SetDiskID(id) } -func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache) (info dataUsageCache, err error) { - return d.disk.NSScanner(ctx, cache) +func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (info dataUsageCache, err error) { + return d.disk.NSScanner(ctx, cache, updates) } func (d *naughtyDisk) DiskInfo(ctx context.Context) (info DiskInfo, err error) { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 12875c0c3..ac2fdf48d 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -43,7 +43,7 @@ type StorageAPI interface { Healing() *healingTracker // Returns nil if disk is not healing. DiskInfo(ctx context.Context) (info DiskInfo, err error) - NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) + NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) // Volume operations. MakeVol(ctx context.Context, volume string) (err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 55258f338..9d373d182 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -206,7 +206,8 @@ func (client *storageRESTClient) Healing() *healingTracker { return val.(*healingTracker) } -func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { +func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) { + defer close(updates) pr, pw := io.Pipe() go func() { pw.CloseWithError(cache.serializeTo(pw)) @@ -218,14 +219,38 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC return cache, err } - var newCache dataUsageCache - pr, pw = io.Pipe() + rr, rw := io.Pipe() go func() { - pw.CloseWithError(waitForHTTPStream(respBody, pw)) + rw.CloseWithError(waitForHTTPStream(respBody, rw)) }() - err = newCache.deserialize(pr) - pr.CloseWithError(err) - return newCache, err + + ms := msgp.NewReader(rr) + for { + // Read whether it is an update. + upd, err := ms.ReadBool() + if err != nil { + rr.CloseWithError(err) + return cache, err + } + if !upd { + // No more updates... New cache follows. + break + } + var update dataUsageEntry + err = update.DecodeMsg(ms) + if err != nil || err == io.EOF { + rr.CloseWithError(err) + return cache, err + } + updates <- update + } + var newCache dataUsageCache + err = newCache.DecodeMsg(ms) + rr.CloseWithError(err) + if err == io.EOF { + err = nil + } + return cache, err } func (client *storageRESTClient) GetDiskID() (string, error) { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 6b9bb181e..7099a57b6 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - storageRESTVersion = "v33" // Added transition related information to FileInfo + storageRESTVersion = "v34" // Streaming Usage Updates storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 719a901f0..2911c6c19 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -19,6 +19,7 @@ package cmd import ( "bufio" + "context" "encoding/binary" "encoding/gob" "encoding/hex" @@ -174,13 +175,43 @@ func (s *storageRESTServer) NSScannerHandler(w http.ResponseWriter, r *http.Requ return } + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() resp := streamHTTPResponse(w) - usageInfo, err := s.storage.NSScanner(r.Context(), cache) + respW := msgp.NewWriter(resp) + + // Collect updates, stream them before the full cache is sent. + updates := make(chan dataUsageEntry, 1) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for update := range updates { + // Write true bool to indicate update. + if err = respW.WriteBool(true); err == nil { + err = update.EncodeMsg(respW) + } + respW.Flush() + if err != nil { + cancel() + resp.CloseWithError(err) + return + } + } + }() + usageInfo, err := s.storage.NSScanner(ctx, cache, updates) if err != nil { + respW.Flush() resp.CloseWithError(err) return } - resp.CloseWithError(usageInfo.serializeTo(resp)) + + // Write false bool to indicate we finished. + wg.Wait() + if err = respW.WriteBool(false); err == nil { + err = usageInfo.EncodeMsg(respW) + } + resp.CloseWithError(respW.Flush()) } // MakeVolHandler - make a volume. diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 88bed34fa..46fb035ca 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -25,7 +25,7 @@ import ( "sync/atomic" "time" - ewma "github.com/VividCortex/ewma" + "github.com/VividCortex/ewma" "github.com/minio/madmin-go" ) @@ -158,7 +158,7 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker { return p.storage.Healing() } -func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { +func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) { select { case <-ctx.Done(): return dataUsageCache{}, ctx.Err() @@ -168,7 +168,7 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac if err := p.checkDiskStale(); err != nil { return dataUsageCache{}, err } - return p.storage.NSScanner(ctx, cache) + return p.storage.NSScanner(ctx, cache, updates) } func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 533e794b3..4f8e5dfa0 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -384,7 +384,9 @@ func (s *xlStorage) Healing() *healingTracker { return &h } -func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { +func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) { + // Updates must be closed before we return. + defer close(updates) var lc *lifecycle.Lifecycle var err error @@ -409,6 +411,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUs globalHealConfigMu.Lock() healOpts := globalHealConfig globalHealConfigMu.Unlock() + cache.Info.updates = updates dataUsageInfo, err := scanDataFolder(ctx, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) { // Look for `xl.meta/xl.json' at the leaf.