diff --git a/cmd/daily-lifecycle-ops.go b/cmd/daily-lifecycle-ops.go deleted file mode 100644 index 2a21af005..000000000 --- a/cmd/daily-lifecycle-ops.go +++ /dev/null @@ -1,133 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "time" - - "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/bucket/lifecycle" - "github.com/minio/minio/pkg/event" -) - -const ( - bgLifecycleInterval = 24 * time.Hour -) - -// initDailyLifecycle starts the routine that receives the daily -// listing of all objects and applies any matching bucket lifecycle -// rules. -func initDailyLifecycle(ctx context.Context, objAPI ObjectLayer) { - go startDailyLifecycle(ctx, objAPI) -} - -func startDailyLifecycle(ctx context.Context, objAPI ObjectLayer) { - for { - select { - case <-ctx.Done(): - return - case <-time.NewTimer(bgLifecycleInterval).C: - // Perform one lifecycle operation - logger.LogIf(ctx, lifecycleRound(ctx, objAPI)) - } - - } -} - -func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error { - buckets, err := objAPI.ListBuckets(ctx) - if err != nil { - return err - } - - for _, bucket := range buckets { - // Check if the current bucket has a configured lifecycle policy, skip otherwise - l, err := globalLifecycleSys.Get(bucket.Name) - if err != nil { - if _, ok := err.(BucketLifecycleNotFound); !ok { - logger.LogIf(ctx, err) - } - continue - } - - rcfg, _ := globalBucketObjectLockSys.Get(bucket.Name) - - // Calculate the common prefix of all lifecycle rules - var prefixes []string - for _, rule := range l.Rules { - prefixes = append(prefixes, rule.Prefix()) - } - commonPrefix := lcp(prefixes) - - // Allocate new results channel to receive ObjectInfo. - objInfoCh := make(chan ObjectInfo) - - // Walk through all objects - if err := objAPI.Walk(ctx, bucket.Name, commonPrefix, objInfoCh); err != nil { - return err - } - - for { - var objects []string - for obj := range objInfoCh { - if len(objects) == maxDeleteList { - // Reached maximum delete requests, attempt a delete for now. - break - } - // Find the action that need to be executed - if l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime) == lifecycle.DeleteAction { - if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { - continue - } - objects = append(objects, obj.Name) - } - } - - // Nothing to do. - if len(objects) == 0 { - break - } - - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints())) - - // Deletes a list of objects. - deleteErrs, err := objAPI.DeleteObjects(ctx, bucket.Name, objects) - if err != nil { - logger.LogIf(ctx, err) - } else { - for i := range deleteErrs { - if deleteErrs[i] != nil { - logger.LogIf(ctx, deleteErrs[i]) - continue - } - // Notify object deleted event. - sendEvent(eventArgs{ - EventName: event.ObjectRemovedDelete, - BucketName: bucket.Name, - Object: ObjectInfo{ - Name: objects[i], - }, - Host: "Internal: [ILM-EXPIRY]", - }) - } - } - } - } - - return nil -} diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go new file mode 100644 index 000000000..46b1e405d --- /dev/null +++ b/cmd/data-crawler.go @@ -0,0 +1,600 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/minio/minio/cmd/config" + xhttp "github.com/minio/minio/cmd/http" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" + "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/env" + "github.com/minio/minio/pkg/event" + "github.com/minio/minio/pkg/hash" + "github.com/willf/bloom" +) + +const ( + dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders. + dataCrawlSleepDefMult = 10.0 // Default multiplier for waits between operations. + dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. + dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. + +) + +// initDataCrawler will start the crawler unless disabled. +func initDataCrawler(ctx context.Context, objAPI ObjectLayer) { + if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { + go runDataCrawler(ctx, objAPI) + } +} + +// runDataCrawler will start a data crawler. +// The function will block until the context is canceled. +// There should only ever be one crawler running per cluster. +func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { + // Load current bloom cycle + nextBloomCycle := intDataUpdateTracker.current() + 1 + var buf bytes.Buffer + err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageBloomName, 0, -1, &buf, "", ObjectOptions{}) + if err != nil { + if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { + logger.LogIf(ctx, err) + } + } else { + if buf.Len() == 8 { + nextBloomCycle = binary.LittleEndian.Uint64(buf.Bytes()) + } + } + + for { + select { + case <-ctx.Done(): + return + case <-time.NewTimer(dataCrawlStartDelay).C: + // Wait before starting next cycle and wait on startup. + results := make(chan DataUsageInfo, 1) + go storeDataUsageInBackend(ctx, objAPI, results) + bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle) + logger.LogIf(ctx, err) + err = objAPI.CrawlAndGetDataUsage(ctx, bf, results) + close(results) + logger.LogIf(ctx, err) + if err == nil { + // Store new cycle... + nextBloomCycle++ + if nextBloomCycle%dataUpdateTrackerResetEvery == 0 { + if intDataUpdateTracker.debug { + logger.Info(color.Green("runDataCrawler:") + " Resetting bloom filter for next runs.") + } + nextBloomCycle++ + } + var tmp [8]byte + binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle) + r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + _, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r, nil, nil), ObjectOptions{}) + if !isErrBucketNotFound(err) { + logger.LogIf(ctx, err) + } + } + } + } +} + +type cachedFolder struct { + name string + parent *dataUsageHash +} + +type folderScanner struct { + root string + getSize getSizeFn + oldCache dataUsageCache + newCache dataUsageCache + withFilter *bloomFilter + waitForLowActiveIO func() + + dataUsageCrawlMult float64 + dataUsageCrawlDebug bool + + newFolders []cachedFolder + existingFolders []cachedFolder +} + +// crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache. +// The returned cache will always be valid, but may not be updated from the existing. +// Before each operation waitForLowActiveIO is called which can be used to temporarily halt the crawler. +// If the supplied context is canceled the function will return at the first chance. +func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, waitForLowActiveIO func(), getSize getSizeFn) (dataUsageCache, error) { + t := UTCNow() + + logPrefix := color.Green("data-usage: ") + logSuffix := color.Blue(" - %v + %v", basePath, cache.Info.Name) + if intDataUpdateTracker.debug { + defer func() { + logger.Info(logPrefix+" Crawl time: %v"+logSuffix, time.Since(t)) + }() + + } + + switch cache.Info.Name { + case "", dataUsageRoot: + return cache, errors.New("internal error: root scan attempted") + } + + delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64) + if err != nil { + logger.LogIf(ctx, err) + delayMult = dataCrawlSleepDefMult + } + + s := folderScanner{ + root: basePath, + getSize: getSize, + oldCache: cache, + newCache: dataUsageCache{Info: cache.Info}, + waitForLowActiveIO: waitForLowActiveIO, + newFolders: nil, + existingFolders: nil, + dataUsageCrawlMult: delayMult, + dataUsageCrawlDebug: intDataUpdateTracker.debug, + } + + if len(cache.Info.BloomFilter) > 0 { + s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} + _, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter)) + if err != nil { + logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter") + s.withFilter = nil + } + } + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Start crawling. Bloom filter: %v"+logSuffix, s.withFilter != nil) + } + + done := ctx.Done() + var flattenLevels = 2 + + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Cycle: %v, Entries: %v"+logSuffix, cache.Info.NextCycle, len(cache.Cache)) + } + + // Always scan flattenLevels deep. Cache root is level 0. + todo := []cachedFolder{{name: cache.Info.Name}} + for i := 0; i < flattenLevels; i++ { + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo)) + } + select { + case <-done: + return cache, ctx.Err() + default: + } + var err error + todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1) + if err != nil { + // No useful information... + return cache, err + } + } + + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"New folders: %v"+logSuffix, s.newFolders) + } + + // Add new folders first + for _, folder := range s.newFolders { + select { + case <-done: + return s.newCache, ctx.Err() + default: + } + du, err := s.deepScanFolder(ctx, folder.name) + if err != nil { + logger.LogIf(ctx, err) + continue + } + if du == nil { + logger.Info(logPrefix + "no disk usage provided" + logSuffix) + continue + } + + s.newCache.replace(folder.name, "", *du) + // Add to parent manually + if folder.parent != nil { + parent := s.newCache.Cache[folder.parent.Key()] + parent.addChildString(folder.name) + } + } + + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Existing folders: %v"+logSuffix, len(s.existingFolders)) + } + + // Do selective scanning of existing folders. + for _, folder := range s.existingFolders { + select { + case <-done: + return s.newCache, ctx.Err() + default: + } + h := hashPath(folder.name) + if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) { + s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()]) + continue + } + + if s.withFilter != nil { + _, prefix := path2BucketObjectWithBasePath(basePath, folder.name) + if s.oldCache.Info.lifeCycle == nil || !s.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) { + // If folder isn't in filter, skip it completely. + if !s.withFilter.containsDir(folder.name) { + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder) + } + s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()]) + continue + } + } + } + + // Update on this cycle... + du, err := s.deepScanFolder(ctx, folder.name) + if err != nil { + logger.LogIf(ctx, err) + continue + } + if du == nil { + logger.LogIf(ctx, errors.New("data-usage: no disk usage provided")) + continue + } + s.newCache.replaceHashed(h, folder.parent, *du) + } + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Finished crawl, %v entries"+logSuffix, len(s.newCache.Cache)) + } + s.newCache.Info.LastUpdate = UTCNow() + s.newCache.Info.NextCycle++ + return s.newCache, nil +} + +// scanQueuedLevels will scan the provided folders. +// Files found in the folders will be added to f.newCache. +// If final is provided folders will be put into f.newFolders or f.existingFolders. +// If final is not provided the folders found are returned from the function. +func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) { + var nextFolders []cachedFolder + done := ctx.Done() + for _, folder := range folders { + select { + case <-done: + return nil, ctx.Err() + default: + } + thisHash := hashPath(folder.name) + + // If there are lifecycle rules for the prefix, remove the filter. + filter := f.withFilter + var activeLifeCycle *lifecycle.Lifecycle + if f.oldCache.Info.lifeCycle != nil && filter != nil { + _, prefix := path2BucketObjectWithBasePath(f.root, folder.name) + if f.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" Prefix %q has active rules", prefix) + } + activeLifeCycle = f.oldCache.Info.lifeCycle + filter = nil + } + } + + if _, ok := f.oldCache.Cache[thisHash.Key()]; filter != nil && ok { + // If folder isn't in filter and we have data, skip it completely. + if folder.name != dataUsageRoot && !filter.containsDir(folder.name) { + f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name) + } + continue + } + } + f.waitForLowActiveIO() + sleepDuration(dataCrawlSleepPerFolder, f.dataUsageCrawlMult) + + cache := dataUsageEntry{} + + err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error { + // Parse + entName = path.Clean(path.Join(folder.name, entName)) + bucket, prefix := path2BucketObjectWithBasePath(f.root, entName) + if bucket == "" { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName) + } + return nil + } + + if isReservedOrInvalidBucket(bucket, false) { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName) + } + return nil + } + + select { + case <-done: + return ctx.Err() + default: + } + + if typ&os.ModeDir != 0 { + h := hashPath(entName) + _, exists := f.oldCache.Cache[h.Key()] + cache.addChildString(entName) + + this := cachedFolder{name: entName, parent: &thisHash} + cache.addChild(h) + if final { + if exists { + f.existingFolders = append(f.existingFolders, this) + } else { + f.newFolders = append(f.newFolders, this) + } + } else { + nextFolders = append(nextFolders, this) + } + return nil + } + f.waitForLowActiveIO() + // Dynamic time delay. + t := UTCNow() + + // Get file size, ignore errors. + item := crawlItem{ + Path: path.Join(f.root, entName), + Typ: typ, + bucket: bucket, + prefix: path.Dir(prefix), + objectName: path.Base(entName), + debug: f.dataUsageCrawlDebug, + lifeCycle: activeLifeCycle, + } + size, err := f.getSize(item) + + sleepDuration(time.Since(t), f.dataUsageCrawlMult) + if err == errSkipFile || err == errFileNotFound { + return nil + } + logger.LogIf(ctx, err) + cache.Size += size + cache.Objects++ + cache.ObjSizes.add(size) + + return nil + }) + if err != nil { + return nil, err + } + f.newCache.replaceHashed(thisHash, folder.parent, cache) + } + return nextFolders, nil +} + +// deepScanFolder will deep scan a folder and return the size if no error occurs. +func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) { + var cache dataUsageEntry + + done := ctx.Done() + + var addDir func(entName string, typ os.FileMode) error + var dirStack = []string{f.root, folder} + + addDir = func(entName string, typ os.FileMode) error { + select { + case <-done: + return ctx.Err() + default: + } + + f.waitForLowActiveIO() + if typ&os.ModeDir != 0 { + dirStack = append(dirStack, entName) + err := readDirFn(path.Join(dirStack...), addDir) + dirStack = dirStack[:len(dirStack)-1] + sleepDuration(dataCrawlSleepPerFolder, f.dataUsageCrawlMult) + return err + } + + // Dynamic time delay. + t := UTCNow() + + // Get file size, ignore errors. + dirStack = append(dirStack, entName) + fileName := path.Join(dirStack...) + dirStack = dirStack[:len(dirStack)-1] + + bucket, prefix := path2BucketObjectWithBasePath(f.root, fileName) + var activeLifeCycle *lifecycle.Lifecycle + if f.oldCache.Info.lifeCycle != nil { + if f.oldCache.Info.lifeCycle.HasActiveRules(prefix, false) { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" Prefix %q has active rules", prefix) + } + activeLifeCycle = f.oldCache.Info.lifeCycle + } + } + + size, err := f.getSize( + crawlItem{ + Path: fileName, + Typ: typ, + bucket: bucket, + prefix: path.Dir(prefix), + objectName: path.Base(entName), + debug: f.dataUsageCrawlDebug, + lifeCycle: activeLifeCycle, + }) + + // Don't sleep for really small amount of time + sleepDuration(time.Since(t), f.dataUsageCrawlMult) + + if err == errSkipFile { + return nil + } + logger.LogIf(ctx, err) + cache.Size += size + cache.Objects++ + cache.ObjSizes.add(size) + return nil + } + err := readDirFn(path.Join(dirStack...), addDir) + if err != nil { + return nil, err + } + return &cache, nil +} + +// crawlItem represents each file while walking. +type crawlItem struct { + Path string + Typ os.FileMode + + bucket string // Bucket. + prefix string // Only the prefix if any, does not have final object name. + objectName string // Only the object name without prefixes. + lifeCycle *lifecycle.Lifecycle + debug bool +} + +type getSizeFn func(item crawlItem) (int64, error) + +// transformMetaDir will transform a directory to prefix/file.ext +func (i *crawlItem) transformMetaDir() { + split := strings.Split(i.prefix, SlashSeparator) + if len(split) > 1 { + i.prefix = path.Join(split[:len(split)-1]...) + } else { + i.prefix = "" + } + // Object name is last element + i.objectName = split[len(split)-1] +} + +// actionMeta contains information used to apply actions. +type actionMeta struct { + oi ObjectInfo + trustOI bool // Set true if oi can be trusted and has been read with quorum. + meta map[string]string +} + +// applyActions will apply lifecycle checks on to a scanned item. +// The resulting size on disk will always be returned. +// The metadata will be compared to consensus on the object layer before any changes are applied. +// If no metadata is supplied, -1 is returned if no action is taken. +func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) { + size, err := meta.oi.GetActualSize() + if i.debug { + logger.LogIf(ctx, err) + } + if i.lifeCycle == nil { + return size + } + + action := i.lifeCycle.ComputeAction(i.objectPath(), meta.meta[xhttp.AmzObjectTagging], meta.oi.ModTime) + if i.debug { + logger.Info(color.Green("applyActions:")+" lifecycle: %q, Initial scan: %v", i.objectPath(), action) + } + switch action { + case lifecycle.DeleteAction: + default: + // No action. + return size + } + + // These (expensive) operations should only run on items we are likely to delete. + // Load to ensure that we have the correct version and not an unsynced version. + if !meta.trustOI { + obj, err := o.GetObjectInfo(ctx, i.bucket, i.objectPath(), ObjectOptions{}) + if err != nil { + // Do nothing - heal in the future. + logger.LogIf(ctx, err) + return size + } + size = obj.Size + + // Recalculate action. + action = i.lifeCycle.ComputeAction(i.objectPath(), obj.UserTags, obj.ModTime) + if i.debug { + logger.Info(color.Green("applyActions:")+" lifecycle: Secondary scan: %v", action) + } + switch action { + case lifecycle.DeleteAction: + default: + // No action. + return size + } + } + + err = o.DeleteObject(ctx, i.bucket, i.objectPath()) + if err != nil { + // Assume it is still there. + logger.LogIf(ctx, err) + return size + } + + // Notify object deleted event. + sendEvent(eventArgs{ + EventName: event.ObjectRemovedDelete, + BucketName: i.bucket, + Object: ObjectInfo{ + Name: i.objectPath(), + }, + Host: "Internal: [ILM-EXPIRY]", + }) + return 0 +} + +// objectPath returns the prefix and object name. +func (i *crawlItem) objectPath() string { + return path.Join(i.prefix, i.objectName) +} + +// sleepDuration multiplies the duration d by x and sleeps if is more than 100 micro seconds. +// sleep is limited to max 1 second. +func sleepDuration(d time.Duration, x float64) { + // Don't sleep for really small amount of time + if d := time.Duration(float64(d) * x); d > time.Microsecond*100 { + if d > time.Second { + d = time.Second + } + time.Sleep(d) + } +} diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index 033b13feb..be96e133c 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -46,7 +46,7 @@ const ( dataUpdateTrackerQueueSize = 10000 dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin" - dataUpdateTrackerVersion = 1 + dataUpdateTrackerVersion = 2 dataUpdateTrackerSaveInterval = 5 * time.Minute // Reset bloom filters every n cycle @@ -116,9 +116,7 @@ func (b bloomFilter) containsDir(in string) bool { if len(split) == 0 { return false } - var tmp [dataUsageHashLen]byte - hashPath(path.Join(split...)).bytes(tmp[:]) - return b.Test(tmp[:]) + return b.TestString(hashPath(path.Join(split...)).String()) } // bytes returns the bloom filter serialized as a byte slice. @@ -366,6 +364,9 @@ func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) erro return err } switch tmp[0] { + case 1: + logger.Info(color.Green("dataUpdateTracker: ") + "deprecated data version, updating.") + return nil case dataUpdateTrackerVersion: default: return errors.New("dataUpdateTracker: Unknown data version") @@ -435,7 +436,6 @@ func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) erro // start a collector that picks up entries from objectUpdatedCh // and adds them to the current bloom filter. func (d *dataUpdateTracker) startCollector(ctx context.Context) { - var tmp [dataUsageHashLen]byte for { select { case <-ctx.Done(): @@ -463,8 +463,7 @@ func (d *dataUpdateTracker) startCollector(ctx context.Context) { if d.debug && false { logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) } - hashPath(path.Join(split[:i+1]...)).bytes(tmp[:]) - d.Current.bf.Add(tmp[:]) + d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String()) } d.dirty = d.dirty || len(split) > 0 d.mu.Unlock() diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 832353e36..667a60a20 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "encoding/binary" "errors" "fmt" "io" @@ -29,17 +28,17 @@ import ( "time" "github.com/cespare/xxhash/v2" + "github.com/klauspost/compress/zstd" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/hash" "github.com/tinylib/msgp/msgp" ) -const dataUsageHashLen = 8 - //go:generate msgp -file $GOFILE -unexported // dataUsageHash is the hash type used. -type dataUsageHash uint64 +type dataUsageHash string // sizeHistogram is a size histogram. type sizeHistogram [dataUsageBucketLen]uint64 @@ -54,6 +53,12 @@ type dataUsageEntry struct { Children dataUsageHashMap } +// dataUsageCache contains a cache of data usage entries. +type dataUsageCache struct { + Info dataUsageCacheInfo + Cache map[string]dataUsageEntry +} + //msgp:ignore dataUsageEntryInfo type dataUsageEntryInfo struct { Name string @@ -66,7 +71,8 @@ type dataUsageCacheInfo struct { Name string LastUpdate time.Time NextCycle uint32 - BloomFilter []byte `msg:"BloomFilter,omitempty"` + BloomFilter []byte `msg:"BloomFilter,omitempty"` + lifeCycle *lifecycle.Lifecycle `msg:"-"` } // merge other data usage entry into this, excluding children. @@ -80,7 +86,7 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) { // mod returns true if the hash mod cycles == cycle. func (h dataUsageHash) mod(cycle uint32, cycles uint32) bool { - return uint32(h)%cycles == cycle%cycles + return uint32(xxhash.Sum64String(string(h)))%cycles == cycle%cycles } // addChildString will add a child based on its name. @@ -92,25 +98,95 @@ func (e *dataUsageEntry) addChildString(name string) { // addChild will add a child based on its hash. // If it already exists it will not be added again. func (e *dataUsageEntry) addChild(hash dataUsageHash) { - if _, ok := e.Children[hash]; ok { + if _, ok := e.Children[hash.Key()]; ok { return } if e.Children == nil { e.Children = make(dataUsageHashMap, 1) } - e.Children[hash] = struct{}{} + e.Children[hash.Key()] = struct{}{} } // find a path in the cache. // Returns nil if not found. func (d *dataUsageCache) find(path string) *dataUsageEntry { - due, ok := d.Cache[hashPath(path)] + due, ok := d.Cache[hashPath(path).Key()] if !ok { return nil } return &due } +// Returns nil if not found. +func (d *dataUsageCache) subCache(path string) dataUsageCache { + dst := dataUsageCache{Info: dataUsageCacheInfo{ + Name: path, + LastUpdate: d.Info.LastUpdate, + BloomFilter: d.Info.BloomFilter, + }} + dst.copyWithChildren(d, dataUsageHash(hashPath(path).Key()), nil) + return dst +} + +func (d *dataUsageCache) deleteRecursive(h dataUsageHash) { + if existing, ok := d.Cache[h.String()]; ok { + // Delete first if there should be a loop. + delete(d.Cache, h.Key()) + for child := range existing.Children { + d.deleteRecursive(dataUsageHash(child)) + } + } +} + +// replaceRootChild will replace the child of root in d with the root of 'other'. +func (d *dataUsageCache) replaceRootChild(other dataUsageCache) { + otherRoot := other.root() + if otherRoot == nil { + logger.LogIf(GlobalContext, errors.New("replaceRootChild: Source has no root")) + return + } + thisRoot := d.root() + if thisRoot == nil { + logger.LogIf(GlobalContext, errors.New("replaceRootChild: Root of current not found")) + return + } + thisRootHash := d.rootHash() + otherRootHash := other.rootHash() + if thisRootHash == otherRootHash { + logger.LogIf(GlobalContext, errors.New("replaceRootChild: Root of child matches root of destination")) + return + } + d.deleteRecursive(other.rootHash()) + d.copyWithChildren(&other, other.rootHash(), &thisRootHash) +} + +// keepBuckets will keep only the buckets specified specified by delete all others. +func (d *dataUsageCache) keepBuckets(b []BucketInfo) { + lu := make(map[dataUsageHash]struct{}) + for _, v := range b { + lu[hashPath(v.Name)] = struct{}{} + } + d.keepRootChildren(lu) +} + +// keepRootChildren will keep the root children specified by delete all others. +func (d *dataUsageCache) keepRootChildren(list map[dataUsageHash]struct{}) { + if d.root() == nil { + return + } + rh := d.rootHash() + for k := range d.Cache { + h := dataUsageHash(k) + if h == rh { + continue + } + if _, ok := list[h]; !ok { + delete(d.Cache, k) + d.deleteRecursive(h) + } + } +} + // dui converts the flattened version of the path to DataUsageInfo. // As a side effect d will be flattened, use a clone if this is not ok. func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo { @@ -135,14 +211,14 @@ func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo { func (d *dataUsageCache) replace(path, parent string, e dataUsageEntry) { hash := hashPath(path) if d.Cache == nil { - d.Cache = make(map[dataUsageHash]dataUsageEntry, 100) + d.Cache = make(map[string]dataUsageEntry, 100) } - d.Cache[hash] = e + d.Cache[hash.Key()] = e if parent != "" { phash := hashPath(parent) - p := d.Cache[phash] + p := d.Cache[phash.Key()] p.addChild(hash) - d.Cache[phash] = p + d.Cache[phash.Key()] = p } } @@ -151,13 +227,13 @@ func (d *dataUsageCache) replace(path, parent string, e dataUsageEntry) { // If the parent does not exist, it will be added. func (d *dataUsageCache) replaceHashed(hash dataUsageHash, parent *dataUsageHash, e dataUsageEntry) { if d.Cache == nil { - d.Cache = make(map[dataUsageHash]dataUsageEntry, 100) + d.Cache = make(map[string]dataUsageEntry, 100) } - d.Cache[hash] = e + d.Cache[hash.Key()] = e if parent != nil { - p := d.Cache[*parent] + p := d.Cache[parent.Key()] p.addChild(hash) - d.Cache[*parent] = p + d.Cache[parent.Key()] = p } } @@ -166,24 +242,24 @@ func (d *dataUsageCache) replaceHashed(hash dataUsageHash, parent *dataUsageHash // If the parent does not exist, it will be added. func (d *dataUsageCache) copyWithChildren(src *dataUsageCache, hash dataUsageHash, parent *dataUsageHash) { if d.Cache == nil { - d.Cache = make(map[dataUsageHash]dataUsageEntry, 100) + d.Cache = make(map[string]dataUsageEntry, 100) } - e, ok := src.Cache[hash] + e, ok := src.Cache[hash.String()] if !ok { return } - d.Cache[hash] = e + d.Cache[hash.Key()] = e for ch := range e.Children { - if ch == hash { + if ch == hash.Key() { logger.LogIf(GlobalContext, errors.New("dataUsageCache.copyWithChildren: Circular reference")) return } - d.copyWithChildren(src, ch, &hash) + d.copyWithChildren(src, dataUsageHash(ch), &hash) } if parent != nil { - p := d.Cache[*parent] + p := d.Cache[parent.Key()] p.addChild(hash) - d.Cache[*parent] = p + d.Cache[parent.Key()] = p } } @@ -196,15 +272,14 @@ func (d *dataUsageCache) StringAll() string { return strings.TrimSpace(s) } -// insert the hash into dst. -// dst must be at least dataUsageHashLen bytes long. -func (h dataUsageHash) bytes(dst []byte) { - binary.LittleEndian.PutUint64(dst, uint64(h)) +// String returns a human readable representation of the string. +func (h dataUsageHash) String() string { + return string(h) } // String returns a human readable representation of the string. -func (h dataUsageHash) String() string { - return fmt.Sprintf("%x", uint64(h)) +func (h dataUsageHash) Key() string { + return string(h) } // flatten all children of the root into the root element and return it. @@ -270,13 +345,6 @@ func (d *dataUsageCache) sizeRecursive(path string) *dataUsageEntry { return &flat } -// dataUsageCache contains a cache of data usage entries. -//msgp:ignore dataUsageCache -type dataUsageCache struct { - Info dataUsageCacheInfo - Cache map[dataUsageHash]dataUsageEntry -} - // root returns the root of the cache. func (d *dataUsageCache) root() *dataUsageEntry { return d.find(d.Info.Name) @@ -291,7 +359,7 @@ func (d *dataUsageCache) rootHash() dataUsageHash { func (d *dataUsageCache) clone() dataUsageCache { clone := dataUsageCache{ Info: d.Info, - Cache: make(map[dataUsageHash]dataUsageEntry, len(d.Cache)), + Cache: make(map[string]dataUsageEntry, len(d.Cache)), } for k, v := range d.Cache { clone.Cache[k] = v @@ -326,7 +394,7 @@ func (d *dataUsageCache) merge(other dataUsageCache) { existing := d.Cache[key] // If not found, merging simply adds. existing.merge(flat) - d.replaceHashed(key, &eHash, existing) + d.replaceHashed(dataUsageHash(key), &eHash, existing) } } @@ -337,13 +405,13 @@ func (d *dataUsageCache) load(ctx context.Context, store ObjectLayer, name strin var buf bytes.Buffer err := store.GetObject(ctx, dataUsageBucket, name, 0, -1, &buf, "", ObjectOptions{}) if err != nil { - if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { + if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) && !errors.Is(err, InsufficientReadQuorum{}) { return toObjectErr(err, dataUsageBucket, name) } *d = dataUsageCache{} return nil } - err = d.deserialize(buf.Bytes()) + err = d.deserialize(&buf) if err != nil { *d = dataUsageCache{} logger.LogIf(ctx, err) @@ -374,89 +442,60 @@ func (d *dataUsageCache) save(ctx context.Context, store ObjectLayer, name strin // dataUsageCacheVer indicates the cache version. // Bumping the cache version will drop data from previous versions // and write new data with the new version. -const dataUsageCacheVer = 1 +const dataUsageCacheVer = 2 // serialize the contents of the cache. func (d *dataUsageCache) serialize() []byte { - // Alloc pessimistically - // dataUsageCacheVer - due := dataUsageEntry{} - msgLen := 1 - msgLen += d.Info.Msgsize() - // len(d.Cache) - msgLen += binary.MaxVarintLen64 - // Hashes (one for key, assume 1 child/node) - msgLen += len(d.Cache) * dataUsageHashLen * 2 - msgLen += len(d.Cache) * due.Msgsize() - - // Create destination buffer... - dst := make([]byte, 0, msgLen) - - var n int - tmp := make([]byte, 1024) - // byte: version. + // Prepend version and compress. + dst := make([]byte, 0, d.Msgsize()+1) dst = append(dst, dataUsageCacheVer) - // Info... - dst, err := d.Info.MarshalMsg(dst) + buf := bytes.NewBuffer(dst) + enc, err := zstd.NewWriter(buf, + zstd.WithEncoderLevel(zstd.SpeedFastest), + zstd.WithWindowSize(1<<20), + zstd.WithEncoderConcurrency(2)) if err != nil { - panic(err) + logger.LogIf(GlobalContext, err) + return nil } - n = binary.PutUvarint(tmp, uint64(len(d.Cache))) - dst = append(dst, tmp[:n]...) - - for k, v := range d.Cache { - // Put key - binary.LittleEndian.PutUint64(tmp[:dataUsageHashLen], uint64(k)) - dst = append(dst, tmp[:8]...) - tmp, err = v.MarshalMsg(tmp[:0]) - if err != nil { - panic(err) - } - // key, value pairs. - dst = append(dst, tmp...) - + mEnc := msgp.NewWriter(enc) + err = d.EncodeMsg(mEnc) + if err != nil { + logger.LogIf(GlobalContext, err) + return nil } - return dst + mEnc.Flush() + err = enc.Close() + if err != nil { + logger.LogIf(GlobalContext, err) + return nil + } + return buf.Bytes() } // deserialize the supplied byte slice into the cache. -func (d *dataUsageCache) deserialize(b []byte) error { - if len(b) < 1 { +func (d *dataUsageCache) deserialize(r io.Reader) error { + var b [1]byte + n, _ := r.Read(b[:]) + if n != 1 { return io.ErrUnexpectedEOF } switch b[0] { case 1: + return errors.New("cache version deprecated (will autoupdate)") + case dataUsageCacheVer: default: return fmt.Errorf("dataUsageCache: unknown version: %d", int(b[0])) } - b = b[1:] - // Info... - b, err := d.Info.UnmarshalMsg(b) + // Zstd compressed. + dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2)) if err != nil { return err } - cacheLen, n := binary.Uvarint(b) - if n <= 0 { - return fmt.Errorf("dataUsageCache: reading cachelen, n <= 0 ") - } - b = b[n:] - d.Cache = make(map[dataUsageHash]dataUsageEntry, cacheLen) + defer dec.Close() - for i := 0; i < int(cacheLen); i++ { - if len(b) <= dataUsageHashLen { - return io.ErrUnexpectedEOF - } - k := binary.LittleEndian.Uint64(b[:dataUsageHashLen]) - b = b[dataUsageHashLen:] - var v dataUsageEntry - b, err = v.UnmarshalMsg(b) - if err != nil { - return err - } - d.Cache[dataUsageHash(k)] = v - } - return nil + return d.DecodeMsg(msgp.NewReader(dec)) } // Trim this from start+end of hashes. @@ -473,88 +512,91 @@ func hashPath(data string) dataUsageHash { if data != dataUsageRoot { data = strings.Trim(data, hashPathCutSet) } - data = path.Clean(data) - return dataUsageHash(xxhash.Sum64String(data)) + return dataUsageHash(path.Clean(data)) } -//msgp:ignore dataUsageEntryInfo -type dataUsageHashMap map[dataUsageHash]struct{} +//msgp:ignore dataUsageHashMap +type dataUsageHashMap map[string]struct{} -// MarshalMsg implements msgp.Marshaler -func (d dataUsageHashMap) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, d.Msgsize()) - - // Write bin header manually - const mbin32 uint8 = 0xc6 - sz := uint32(len(d)) * dataUsageHashLen - o = append(o, mbin32, byte(sz>>24), byte(sz>>16), byte(sz>>8), byte(sz)) - - var tmp [dataUsageHashLen]byte - for k := range d { - binary.LittleEndian.PutUint64(tmp[:], uint64(k)) - o = append(o, tmp[:]...) +// DecodeMsg implements msgp.Decodable +func (z *dataUsageHashMap) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + *z = make(dataUsageHashMap, zb0002) + for i := uint32(0); i < zb0002; i++ { + { + var zb0003 string + zb0003, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z)[zb0003] = struct{}{} + } } return } -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (d dataUsageHashMap) Msgsize() (s int) { - s = 5 + len(d)*dataUsageHashLen +// EncodeMsg implements msgp.Encodable +func (z dataUsageHashMap) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteArrayHeader(uint32(len(z))) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0004 := range z { + err = en.WriteString(zb0004) + if err != nil { + err = msgp.WrapError(err, zb0004) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z dataUsageHashMap) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(len(z))) + for zb0004 := range z { + o = msgp.AppendString(o, zb0004) + } return } // UnmarshalMsg implements msgp.Unmarshaler -func (d *dataUsageHashMap) UnmarshalMsg(bts []byte) (o []byte, err error) { - var hashes []byte - hashes, bts, err = msgp.ReadBytesZC(bts) +func (z *dataUsageHashMap) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, "dataUsageHashMap") + err = msgp.WrapError(err) return } - - var dst = make(dataUsageHashMap, len(hashes)/dataUsageHashLen) - for len(hashes) >= dataUsageHashLen { - dst[dataUsageHash(binary.LittleEndian.Uint64(hashes[:dataUsageHashLen]))] = struct{}{} - hashes = hashes[dataUsageHashLen:] + *z = make(dataUsageHashMap, zb0002) + for i := uint32(0); i < zb0002; i++ { + { + var zb0003 string + zb0003, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z)[zb0003] = struct{}{} + } } - *d = dst o = bts return } -func (d *dataUsageHashMap) DecodeMsg(dc *msgp.Reader) (err error) { - var zb0001 uint32 - zb0001, err = dc.ReadBytesHeader() - if err != nil { - err = msgp.WrapError(err) - return +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z dataUsageHashMap) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for zb0004 := range z { + s += msgp.StringPrefixSize + len(zb0004) } - var dst = make(dataUsageHashMap, zb0001) - var tmp [8]byte - for i := uint32(0); i < zb0001; i++ { - _, err = io.ReadFull(dc, tmp[:]) - if err != nil { - err = msgp.WrapError(err, "dataUsageHashMap") - return - } - dst[dataUsageHash(binary.LittleEndian.Uint64(tmp[:]))] = struct{}{} - } - return nil -} -func (d dataUsageHashMap) EncodeMsg(en *msgp.Writer) (err error) { - err = en.WriteBytesHeader(uint32(len(d)) * dataUsageHashLen) - if err != nil { - err = msgp.WrapError(err) - return - } - var tmp [dataUsageHashLen]byte - for k := range d { - binary.LittleEndian.PutUint64(tmp[:], uint64(k)) - _, err = en.Write(tmp[:]) - if err != nil { - err = msgp.WrapError(err) - return - } - } - return nil + return } diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index 2c4209cd7..be5cdd9cc 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -6,6 +6,212 @@ import ( "github.com/tinylib/msgp/msgp" ) +// DecodeMsg implements msgp.Decodable +func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Info": + err = z.Info.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + case "Cache": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + if z.Cache == nil { + z.Cache = make(map[string]dataUsageEntry, zb0002) + } else if len(z.Cache) > 0 { + for key := range z.Cache { + delete(z.Cache, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 dataUsageEntry + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + err = za0002.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Cache", za0001) + return + } + z.Cache[za0001] = za0002 + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Info" + err = en.Append(0x82, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + if err != nil { + return + } + err = z.Info.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + // write "Cache" + err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Cache))) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + for za0001, za0002 := range z.Cache { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + err = za0002.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Cache", za0001) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *dataUsageCache) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "Info" + o = append(o, 0x82, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + o, err = z.Info.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + // string "Cache" + o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) + o = msgp.AppendMapHeader(o, uint32(len(z.Cache))) + for za0001, za0002 := range z.Cache { + o = msgp.AppendString(o, za0001) + o, err = za0002.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Cache", za0001) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Info": + bts, err = z.Info.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + case "Cache": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + if z.Cache == nil { + z.Cache = make(map[string]dataUsageEntry, zb0002) + } else if len(z.Cache) > 0 { + for key := range z.Cache { + delete(z.Cache, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 dataUsageEntry + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + bts, err = za0002.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Cache", za0001) + return + } + z.Cache[za0001] = za0002 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *dataUsageCache) Msgsize() (s int) { + s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.MapHeaderSize + if z.Cache != nil { + for za0001, za0002 := range z.Cache { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() + } + } + return +} + // DecodeMsg implements msgp.Decodable func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -373,8 +579,8 @@ func (z *dataUsageEntry) Msgsize() (s int) { // DecodeMsg implements msgp.Decodable func (z *dataUsageHash) DecodeMsg(dc *msgp.Reader) (err error) { { - var zb0001 uint64 - zb0001, err = dc.ReadUint64() + var zb0001 string + zb0001, err = dc.ReadString() if err != nil { err = msgp.WrapError(err) return @@ -386,7 +592,7 @@ func (z *dataUsageHash) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z dataUsageHash) EncodeMsg(en *msgp.Writer) (err error) { - err = en.WriteUint64(uint64(z)) + err = en.WriteString(string(z)) if err != nil { err = msgp.WrapError(err) return @@ -397,15 +603,15 @@ func (z dataUsageHash) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z dataUsageHash) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - o = msgp.AppendUint64(o, uint64(z)) + o = msgp.AppendString(o, string(z)) return } // UnmarshalMsg implements msgp.Unmarshaler func (z *dataUsageHash) UnmarshalMsg(bts []byte) (o []byte, err error) { { - var zb0001 uint64 - zb0001, bts, err = msgp.ReadUint64Bytes(bts) + var zb0001 string + zb0001, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err) return @@ -418,7 +624,7 @@ func (z *dataUsageHash) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z dataUsageHash) Msgsize() (s int) { - s = msgp.Uint64Size + s = msgp.StringPrefixSize + len(string(z)) return } diff --git a/cmd/data-usage-cache_gen_test.go b/cmd/data-usage-cache_gen_test.go index 9c3edc3af..8922e16fa 100644 --- a/cmd/data-usage-cache_gen_test.go +++ b/cmd/data-usage-cache_gen_test.go @@ -9,6 +9,119 @@ import ( "github.com/tinylib/msgp/msgp" ) +func TestMarshalUnmarshaldataUsageCache(t *testing.T) { + v := dataUsageCache{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgdataUsageCache(b *testing.B) { + v := dataUsageCache{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgdataUsageCache(b *testing.B) { + v := dataUsageCache{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaldataUsageCache(b *testing.B) { + v := dataUsageCache{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodedataUsageCache(t *testing.T) { + v := dataUsageCache{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodedataUsageCache Msgsize() is inaccurate") + } + + vn := dataUsageCache{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodedataUsageCache(b *testing.B) { + v := dataUsageCache{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodedataUsageCache(b *testing.B) { + v := dataUsageCache{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshaldataUsageCacheInfo(t *testing.T) { v := dataUsageCacheInfo{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 6b0e753ca..bcec9acb9 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -19,21 +19,11 @@ package cmd import ( "bytes" "context" - "encoding/binary" "encoding/json" - "errors" - "os" - "path" - "strconv" - "time" jsoniter "github.com/json-iterator/go" - "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/color" - "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/hash" - "github.com/willf/bloom" ) const ( @@ -44,76 +34,11 @@ const ( dataUsageRoot = SlashSeparator dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix - dataUsageObjName = ".usage.json" - dataUsageCacheName = ".usage-cache.bin" - dataUsageBloomName = ".bloomcycle.bin" - dataUsageSleepPerFolder = 1 * time.Millisecond - dataUsageSleepDefMult = 10.0 - dataUsageUpdateDirCycles = 16 - dataUsageStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. + dataUsageObjName = ".usage.json" + dataUsageCacheName = ".usage-cache.bin" + dataUsageBloomName = ".bloomcycle.bin" ) -// initDataUsageStats will start the crawler unless disabled. -func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) { - if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { - go runDataUsageInfo(ctx, objAPI) - } -} - -func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) { - // Load current bloom cycle - nextBloomCycle := intDataUpdateTracker.current() + 1 - var buf bytes.Buffer - err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageBloomName, 0, -1, &buf, "", ObjectOptions{}) - if err != nil { - if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { - logger.LogIf(ctx, err) - } - } else { - if buf.Len() == 8 { - nextBloomCycle = binary.LittleEndian.Uint64(buf.Bytes()) - } - } - - for { - select { - case <-ctx.Done(): - return - case <-time.NewTimer(dataUsageStartDelay).C: - // Wait before starting next cycle and wait on startup. - results := make(chan DataUsageInfo, 1) - go storeDataUsageInBackend(ctx, objAPI, results) - bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle) - logger.LogIf(ctx, err) - err = objAPI.CrawlAndGetDataUsage(ctx, bf, results) - close(results) - logger.LogIf(ctx, err) - if err == nil { - // Store new cycle... - nextBloomCycle++ - if nextBloomCycle%dataUpdateTrackerResetEvery == 0 { - if intDataUpdateTracker.debug { - logger.Info(color.Green("runDataUsageInfo:") + " Resetting bloom filter for next runs.") - } - nextBloomCycle++ - } - var tmp [8]byte - binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle) - r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false) - if err != nil { - logger.LogIf(ctx, err) - continue - } - - _, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r, nil, nil), ObjectOptions{}) - if !isErrBucketNotFound(err) { - logger.LogIf(ctx, err) - } - } - } - } -} - // storeDataUsageInBackend will store all objects sent on the gui channel until closed. func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, gui <-chan DataUsageInfo) { for dataUsageInfo := range gui { @@ -172,354 +97,3 @@ func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsag return dataUsageInfo, nil } - -// Item represents each file while walking. -type Item struct { - Path string - Typ os.FileMode -} - -type getSizeFn func(item Item) (int64, error) - -type cachedFolder struct { - name string - parent *dataUsageHash -} - -type folderScanner struct { - root string - getSize getSizeFn - oldCache dataUsageCache - newCache dataUsageCache - withFilter *bloomFilter - waitForLowActiveIO func() - - dataUsageCrawlMult float64 - dataUsageCrawlDebug bool - - newFolders []cachedFolder - existingFolders []cachedFolder -} - -// sleepDuration multiplies the duration d by x and sleeps if is more than 100 micro seconds. -// sleep is limited to max 1 second. -func sleepDuration(d time.Duration, x float64) { - // Don't sleep for really small amount of time - if d := time.Duration(float64(d) * x); d > time.Microsecond*100 { - if d > time.Second { - d = time.Second - } - time.Sleep(d) - } -} - -// scanQueuedLevels will scan the provided folders. -// Files found in the folders will be added to f.newCache. -// If final is provided folders will be put into f.newFolders or f.existingFolders. -// If final is not provided the folders found are returned from the function. -func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) { - var nextFolders []cachedFolder - done := ctx.Done() - for _, folder := range folders { - select { - case <-done: - return nil, ctx.Err() - default: - } - thisHash := hashPath(folder.name) - - if _, ok := f.oldCache.Cache[thisHash]; f.withFilter != nil && ok { - // If folder isn't in filter and we have data, skip it completely. - if folder.name != dataUsageRoot && !f.withFilter.containsDir(folder.name) { - f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) - if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name) - } - continue - } - } - f.waitForLowActiveIO() - sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult) - - cache := dataUsageEntry{} - - err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error { - // Parse - entName = path.Clean(path.Join(folder.name, entName)) - bucket, _ := path2BucketObjectWithBasePath(f.root, entName) - if bucket == "" { - if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName) - } - return nil - } - - if isReservedOrInvalidBucket(bucket, false) { - if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName) - } - return nil - } - - select { - case <-done: - return ctx.Err() - default: - } - - if typ&os.ModeDir != 0 { - h := hashPath(entName) - _, exists := f.oldCache.Cache[h] - cache.addChildString(entName) - - this := cachedFolder{name: entName, parent: &thisHash} - cache.addChild(h) - if final { - if exists { - f.existingFolders = append(f.existingFolders, this) - } else { - f.newFolders = append(f.newFolders, this) - } - } else { - nextFolders = append(nextFolders, this) - } - return nil - } - f.waitForLowActiveIO() - // Dynamic time delay. - t := UTCNow() - - // Get file size, ignore errors. - size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ}) - - sleepDuration(time.Since(t), f.dataUsageCrawlMult) - if err == errSkipFile || err == errFileNotFound { - return nil - } - logger.LogIf(ctx, err) - cache.Size += size - cache.Objects++ - cache.ObjSizes.add(size) - - return nil - }) - if err != nil { - return nil, err - } - f.newCache.replaceHashed(thisHash, folder.parent, cache) - } - return nextFolders, nil -} - -// deepScanFolder will deep scan a folder and return the size if no error occurs. -func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) { - var cache dataUsageEntry - - done := ctx.Done() - - var addDir func(entName string, typ os.FileMode) error - var dirStack = []string{f.root, folder} - - addDir = func(entName string, typ os.FileMode) error { - select { - case <-done: - return ctx.Err() - default: - } - - f.waitForLowActiveIO() - if typ&os.ModeDir != 0 { - dirStack = append(dirStack, entName) - err := readDirFn(path.Join(dirStack...), addDir) - dirStack = dirStack[:len(dirStack)-1] - sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult) - return err - } - - // Dynamic time delay. - t := UTCNow() - - // Get file size, ignore errors. - dirStack = append(dirStack, entName) - fileName := path.Join(dirStack...) - dirStack = dirStack[:len(dirStack)-1] - - size, err := f.getSize(Item{Path: fileName, Typ: typ}) - - // Don't sleep for really small amount of time - sleepDuration(time.Since(t), f.dataUsageCrawlMult) - - if err == errSkipFile { - return nil - } - logger.LogIf(ctx, err) - cache.Size += size - cache.Objects++ - cache.ObjSizes.add(size) - return nil - } - err := readDirFn(path.Join(dirStack...), addDir) - if err != nil { - return nil, err - } - return &cache, nil -} - -// updateUsage will crawl the basepath+cache.Info.Name and return an updated cache. -// The returned cache will always be valid, but may not be updated from the existing. -// Before each operation waitForLowActiveIO is called which can be used to temporarily halt the crawler. -// If the supplied context is canceled the function will return at the first chance. -func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, waitForLowActiveIO func(), getSize getSizeFn) (dataUsageCache, error) { - t := UTCNow() - - dataUsageDebug := env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn - logPrefix := color.Green("data-usage: ") - logSuffix := color.Blue(" - %v + %v", basePath, cache.Info.Name) - if dataUsageDebug { - defer func() { - logger.Info(logPrefix+" Crawl time: %v"+logSuffix, time.Since(t)) - }() - - } - - if cache.Info.Name == "" { - cache.Info.Name = dataUsageRoot - } - - delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64) - if err != nil { - logger.LogIf(ctx, err) - delayMult = dataUsageSleepDefMult - } - - s := folderScanner{ - root: basePath, - getSize: getSize, - oldCache: cache, - newCache: dataUsageCache{Info: cache.Info}, - waitForLowActiveIO: waitForLowActiveIO, - newFolders: nil, - existingFolders: nil, - dataUsageCrawlMult: delayMult, - dataUsageCrawlDebug: dataUsageDebug, - } - - if len(cache.Info.BloomFilter) > 0 { - s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} - _, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter)) - if err != nil { - logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter") - s.withFilter = nil - } - } - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Start crawling. Bloom filter: %v"+logSuffix, s.withFilter != nil) - } - - done := ctx.Done() - var flattenLevels = 3 - - // If we are scanning inside a bucket reduce depth by 1. - if cache.Info.Name != dataUsageRoot { - flattenLevels-- - } - - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Cycle: %v, Entries: %v"+logSuffix, cache.Info.NextCycle, len(cache.Cache)) - } - - // Always scan flattenLevels deep. Cache root is level 0. - todo := []cachedFolder{{name: cache.Info.Name}} - for i := 0; i < flattenLevels; i++ { - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo)) - } - select { - case <-done: - return cache, ctx.Err() - default: - } - var err error - todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1) - if err != nil { - // No useful information... - return cache, err - } - } - - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"New folders: %v"+logSuffix, s.newFolders) - } - - // Add new folders first - for _, folder := range s.newFolders { - select { - case <-done: - return s.newCache, ctx.Err() - default: - } - du, err := s.deepScanFolder(ctx, folder.name) - if err != nil { - logger.LogIf(ctx, err) - continue - } - if du == nil { - logger.Info(logPrefix + "no disk usage provided" + logSuffix) - continue - } - - s.newCache.replace(folder.name, "", *du) - // Add to parent manually - if folder.parent != nil { - parent := s.newCache.Cache[*folder.parent] - parent.addChildString(folder.name) - } - } - - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Existing folders: %v"+logSuffix, len(s.existingFolders)) - } - - // Do selective scanning of existing folders. - for _, folder := range s.existingFolders { - select { - case <-done: - return s.newCache, ctx.Err() - default: - } - h := hashPath(folder.name) - if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) { - s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h]) - continue - } - - if s.withFilter != nil { - // If folder isn't in filter, skip it completely. - if !s.withFilter.containsDir(folder.name) { - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder) - } - s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h]) - continue - } - } - - // Update on this cycle... - du, err := s.deepScanFolder(ctx, folder.name) - if err != nil { - logger.LogIf(ctx, err) - continue - } - if du == nil { - logger.LogIf(ctx, errors.New("data-usage: no disk usage provided")) - continue - } - s.newCache.replaceHashed(h, folder.parent, *du) - } - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Finished crawl, %v entries"+logSuffix, len(s.newCache.Cache)) - } - s.newCache.Info.LastUpdate = UTCNow() - s.newCache.Info.NextCycle++ - return s.newCache, nil -} diff --git a/cmd/data-usage_test.go b/cmd/data-usage_test.go index 483a126de..5255459ee 100644 --- a/cmd/data-usage_test.go +++ b/cmd/data-usage_test.go @@ -17,9 +17,12 @@ package cmd import ( + "bytes" "context" + "fmt" "io/ioutil" "os" + "path" "path/filepath" "testing" ) @@ -34,6 +37,7 @@ func TestDataUsageUpdate(t *testing.T) { if err != nil { t.Skip(err) } + const bucket = "bucket" defer os.RemoveAll(base) var files = []usageTestFile{ {name: "rootfile", size: 10000}, @@ -45,9 +49,9 @@ func TestDataUsageUpdate(t *testing.T) { {name: "dir1/dira/dirasub/dcfile", size: 1000000}, {name: "dir1/dira/dirasub/sublevel3/dccccfile", size: 10}, } - createUsageTestFiles(t, base, files) + createUsageTestFiles(t, base, bucket, files) - getSize := func(item Item) (i int64, err error) { + getSize := func(item crawlItem) (i int64, err error) { if item.Typ&os.ModeDir == 0 { s, err := os.Stat(item.Path) if err != nil { @@ -58,7 +62,7 @@ func TestDataUsageUpdate(t *testing.T) { return 0, nil } - got, err := updateUsage(context.Background(), base, dataUsageCache{}, func() {}, getSize) + got, err := crawlDataFolder(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, func() {}, getSize) if err != nil { t.Fatal(err) } @@ -105,16 +109,17 @@ func TestDataUsageUpdate(t *testing.T) { oSizes: sizeHistogram{0: 1, 1: 3}, }, { - path: "/dir1/dira", - size: 300000, - objs: 2, - oSizes: sizeHistogram{0: 0, 1: 2}, + path: "/dir1", + size: 2000, + objs: 1, + oSizes: sizeHistogram{0: 0, 1: 1}, }, { + // Children are flattened path: "/dir1/dira/", - size: 300000, - objs: 2, - oSizes: sizeHistogram{0: 0, 1: 2}, + size: 1300010, + objs: 4, + oSizes: sizeHistogram{0: 1, 1: 3}, }, { path: "/nonexistying", @@ -123,8 +128,9 @@ func TestDataUsageUpdate(t *testing.T) { } for _, w := range want { - t.Run(w.path, func(t *testing.T) { - e := got.find(w.path) + p := path.Join(bucket, w.path) + t.Run(p, func(t *testing.T) { + e := got.find(p) if w.isNil { if e != nil { t.Error("want nil, got", e) @@ -134,6 +140,7 @@ func TestDataUsageUpdate(t *testing.T) { if e == nil { t.Fatal("got nil result") } + t.Log(e.Children) if w.flatten { *e = got.flatten(*e) } @@ -175,8 +182,8 @@ func TestDataUsageUpdate(t *testing.T) { size: 1000, }, } - createUsageTestFiles(t, base, files) - got, err = updateUsage(context.Background(), base, got, func() {}, getSize) + createUsageTestFiles(t, base, bucket, files) + got, err = crawlDataFolder(context.Background(), base, got, func() {}, getSize) if err != nil { t.Fatal(err) } @@ -222,7 +229,7 @@ func TestDataUsageUpdate(t *testing.T) { for _, w := range want { t.Run(w.path, func(t *testing.T) { - e := got.find(w.path) + e := got.find(path.Join(bucket, w.path)) if w.isNil { if e != nil { t.Error("want nil, got", e) @@ -233,6 +240,7 @@ func TestDataUsageUpdate(t *testing.T) { t.Fatal("got nil result") } if w.flatten { + t.Log(e.Children) *e = got.flatten(*e) } if e.Size != int64(w.size) { @@ -254,14 +262,14 @@ func TestDataUsageUpdate(t *testing.T) { }, } - createUsageTestFiles(t, base, files) - err = os.RemoveAll(filepath.Join(base, "dir1/dira/dirasub/dcfile")) + createUsageTestFiles(t, base, bucket, files) + err = os.RemoveAll(filepath.Join(base, bucket, "dir1/dira/dirasub/dcfile")) if err != nil { t.Fatal(err) } // Changed dir must be picked up in this many cycles. for i := 0; i < dataUsageUpdateDirCycles; i++ { - got, err = updateUsage(context.Background(), base, got, func() {}, getSize) + got, err = crawlDataFolder(context.Background(), base, got, func() {}, getSize) if err != nil { t.Fatal(err) } @@ -291,8 +299,9 @@ func TestDataUsageUpdate(t *testing.T) { } for _, w := range want { - t.Run(w.path, func(t *testing.T) { - e := got.find(w.path) + p := path.Join(bucket, w.path) + t.Run(p, func(t *testing.T) { + e := got.find(p) if w.isNil { if e != nil { t.Error("want nil, got", e) @@ -335,9 +344,9 @@ func TestDataUsageUpdatePrefix(t *testing.T) { {name: "bucket/dir1/dira/dirasub/dcfile", size: 1000000}, {name: "bucket/dir1/dira/dirasub/sublevel3/dccccfile", size: 10}, } - createUsageTestFiles(t, base, files) + createUsageTestFiles(t, base, "", files) - getSize := func(item Item) (i int64, err error) { + getSize := func(item crawlItem) (i int64, err error) { if item.Typ&os.ModeDir == 0 { s, err := os.Stat(item.Path) if err != nil { @@ -347,7 +356,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) { } return 0, nil } - got, err := updateUsage(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, func() {}, getSize) + got, err := crawlDataFolder(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, func() {}, getSize) if err != nil { t.Fatal(err) } @@ -449,8 +458,8 @@ func TestDataUsageUpdatePrefix(t *testing.T) { size: 1000, }, } - createUsageTestFiles(t, base, files) - got, err = updateUsage(context.Background(), base, got, func() {}, getSize) + createUsageTestFiles(t, base, "", files) + got, err = crawlDataFolder(context.Background(), base, got, func() {}, getSize) if err != nil { t.Fatal(err) } @@ -526,14 +535,14 @@ func TestDataUsageUpdatePrefix(t *testing.T) { }, } - createUsageTestFiles(t, base, files) + createUsageTestFiles(t, base, "", files) err = os.RemoveAll(filepath.Join(base, "bucket/dir1/dira/dirasub/dcfile")) if err != nil { t.Fatal(err) } // Changed dir must be picked up in this many cycles. for i := 0; i < dataUsageUpdateDirCycles; i++ { - got, err = updateUsage(context.Background(), base, got, func() {}, getSize) + got, err = crawlDataFolder(context.Background(), base, got, func() {}, getSize) if err != nil { t.Fatal(err) } @@ -588,13 +597,13 @@ func TestDataUsageUpdatePrefix(t *testing.T) { } } -func createUsageTestFiles(t *testing.T, base string, files []usageTestFile) { +func createUsageTestFiles(t *testing.T, base, bucket string, files []usageTestFile) { for _, f := range files { - err := os.MkdirAll(filepath.Dir(filepath.Join(base, f.name)), os.ModePerm) + err := os.MkdirAll(filepath.Dir(filepath.Join(base, bucket, f.name)), os.ModePerm) if err != nil { t.Fatal(err) } - err = ioutil.WriteFile(filepath.Join(base, f.name), make([]byte, f.size), os.ModePerm) + err = ioutil.WriteFile(filepath.Join(base, bucket, f.name), make([]byte, f.size), os.ModePerm) if err != nil { t.Fatal(err) } @@ -606,20 +615,28 @@ func TestDataUsageCacheSerialize(t *testing.T) { if err != nil { t.Skip(err) } + const bucket = "abucket" defer os.RemoveAll(base) var files = []usageTestFile{ {name: "rootfile", size: 10000}, {name: "rootfile2", size: 10000}, {name: "dir1/d1file", size: 2000}, {name: "dir2/d2file", size: 300}, + {name: "dir2/d2file2", size: 300}, + {name: "dir2/d2file3/", size: 300}, + {name: "dir2/d2file4/", size: 300}, + {name: "dir2/d2file5", size: 300}, {name: "dir1/dira/dafile", size: 100000}, {name: "dir1/dira/dbfile", size: 200000}, {name: "dir1/dira/dirasub/dcfile", size: 1000000}, {name: "dir1/dira/dirasub/sublevel3/dccccfile", size: 10}, + {name: "dir1/dira/dirasub/sublevel3/dccccfile20", size: 20}, + {name: "dir1/dira/dirasub/sublevel3/dccccfile30", size: 30}, + {name: "dir1/dira/dirasub/sublevel3/dccccfile40", size: 40}, } - createUsageTestFiles(t, base, files) + createUsageTestFiles(t, base, bucket, files) - getSize := func(item Item) (i int64, err error) { + getSize := func(item crawlItem) (i int64, err error) { if item.Typ&os.ModeDir == 0 { s, err := os.Stat(item.Path) if err != nil { @@ -629,23 +646,33 @@ func TestDataUsageCacheSerialize(t *testing.T) { } return 0, nil } - want, err := updateUsage(context.Background(), base, dataUsageCache{}, func() {}, getSize) + want, err := crawlDataFolder(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, func() {}, getSize) if err != nil { t.Fatal(err) } b := want.serialize() var got dataUsageCache - err = got.deserialize(b) + err = got.deserialize(bytes.NewBuffer(b)) if err != nil { t.Fatal(err) } - + t.Log("serialized size:", len(b), "bytes") if got.Info.LastUpdate.IsZero() { t.Error("lastupdate not set") } if !want.Info.LastUpdate.Equal(got.Info.LastUpdate) { - t.Fatalf("deserialize mismatch\nwant: %+v\ngot: %+v", want, got) + t.Fatalf("deserialize LastUpdate mismatch\nwant: %+v\ngot: %+v", want, got) } + if len(want.Cache) != len(got.Cache) { + t.Errorf("deserialize mismatch length\nwant: %+v\ngot: %+v", len(want.Cache), len(got.Cache)) + } + for wkey, wval := range want.Cache { + gotv := got.Cache[wkey] + if fmt.Sprint(gotv) != fmt.Sprint(wval) { + t.Errorf("deserialize mismatch, key %v\nwant: %+v\ngot: %+v", wkey, wval, gotv) + } + } + } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index efeec4645..07c385ec2 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "encoding/json" "fmt" "io" "io/ioutil" @@ -239,66 +238,123 @@ func (fs *FSObjects) waitForLowActiveIO() { // CrawlAndGetDataUsage returns data usage stats of the current FS deployment func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { // Load bucket totals - var oldCache dataUsageCache - err := oldCache.load(ctx, fs, dataUsageCacheName) + var totalCache dataUsageCache + err := totalCache.load(ctx, fs, dataUsageCacheName) if err != nil { return err } - if oldCache.Info.Name == "" { - oldCache.Info.Name = dataUsageRoot - } + totalCache.Info.Name = dataUsageRoot buckets, err := fs.ListBuckets(ctx) if err != nil { return err } - oldCache.Info.BloomFilter = nil + totalCache.Info.BloomFilter = nil if bf != nil { - oldCache.Info.BloomFilter = bf.bytes() + totalCache.Info.BloomFilter = bf.bytes() } - if false && intDataUpdateTracker.debug { - b, _ := json.MarshalIndent(bf, "", " ") - logger.Info("Bloom filter: %v", string(b)) + // Clear totals. + var root dataUsageEntry + if r := totalCache.root(); r != nil { + root.Children = r.Children } - cache, err := updateUsage(ctx, fs.fsPath, oldCache, fs.waitForLowActiveIO, func(item Item) (int64, error) { - bucket, object := path2BucketObject(strings.TrimPrefix(item.Path, fs.fsPath)) + totalCache.replace(dataUsageRoot, "", root) + + // Delete all buckets that does not exist anymore. + totalCache.keepBuckets(buckets) + + for _, b := range buckets { + // Load bucket cache. + var bCache dataUsageCache + err := bCache.load(ctx, fs, path.Join(b.Name, dataUsageCacheName)) + if err != nil { + return err + } + if bCache.Info.Name == "" { + bCache.Info.Name = b.Name + } + bCache.Info.BloomFilter = totalCache.Info.BloomFilter + + cache, err := fs.crawlBucket(ctx, b.Name, bCache) + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + logger.LogIf(ctx, err) + cache.Info.BloomFilter = nil + if cache.Info.LastUpdate.After(bCache.Info.LastUpdate) { + if intDataUpdateTracker.debug { + logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving bucket %q cache with %d entries", b.Name, len(cache.Cache)) + } + logger.LogIf(ctx, cache.save(ctx, fs, path.Join(b.Name, dataUsageCacheName))) + } + // Merge, save and send update. + // We do it even if unchanged. + cl := cache.clone() + entry := cl.flatten(*cl.root()) + totalCache.replace(cl.Info.Name, dataUsageRoot, entry) + if intDataUpdateTracker.debug { + logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving totals cache with %d entries", len(totalCache.Cache)) + } + logger.LogIf(ctx, totalCache.save(ctx, fs, dataUsageCacheName)) + cloned := totalCache.clone() + updates <- cloned.dui(dataUsageRoot, buckets) + } + + return nil +} + +// crawlBucket crawls a single bucket in FS mode. +// The updated cache for the bucket is returned. +// A partially updated bucket may be returned. +func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataUsageCache) (dataUsageCache, error) { + // Get bucket policy + // Check if the current bucket has a configured lifecycle policy + lc, err := globalLifecycleSys.Get(bucket) + if err == nil && lc.HasActiveRules("", true) { + if intDataUpdateTracker.debug { + logger.Info(color.Green("crawlBucket:") + " lifecycle: Active rules found") + } + cache.Info.lifeCycle = lc + } + + // Load bucket info. + cache, err = crawlDataFolder(ctx, fs.fsPath, cache, fs.waitForLowActiveIO, func(item crawlItem) (int64, error) { + bucket, object := item.bucket, item.objectPath() fsMetaBytes, err := ioutil.ReadFile(pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)) if err != nil && !os.IsNotExist(err) { return 0, errSkipFile } - // Get file size, symlinks which cannot be - // followed are automatically filtered by fastwalk. - fi, err := os.Stat(item.Path) - if err != nil { + + fsMeta := newFSMetaV1() + metaOk := false + if len(fsMetaBytes) > 0 { + var json = jsoniter.ConfigCompatibleWithStandardLibrary + if err = json.Unmarshal(fsMetaBytes, &fsMeta); err == nil { + metaOk = true + } + } + if !metaOk { + fsMeta = fs.defaultFsJSON(object) + } + + // Stat the file. + fi, fiErr := os.Stat(item.Path) + if fiErr != nil { return 0, errSkipFile } - if len(fsMetaBytes) > 0 { - fsMeta := newFSMetaV1() - var json = jsoniter.ConfigCompatibleWithStandardLibrary - if err = json.Unmarshal(fsMetaBytes, &fsMeta); err != nil { - return 0, errSkipFile - } - return fsMeta.ToObjectInfo(bucket, object, fi).GetActualSize() + + oi := fsMeta.ToObjectInfo(bucket, object, fi) + sz := item.applyActions(ctx, fs, actionMeta{oi: oi, meta: fsMeta.Meta}) + if sz >= 0 { + return sz, nil } + return fi.Size(), nil - }) - cache.Info.BloomFilter = nil - // Even if there was an error, the new cache may have better info. - if cache.Info.LastUpdate.After(oldCache.Info.LastUpdate) { - if intDataUpdateTracker.debug { - logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving cache with %d entries", len(cache.Cache)) - } - logger.LogIf(ctx, cache.save(ctx, fs, dataUsageCacheName)) - updates <- cache.dui(dataUsageRoot, buckets) - } else { - if intDataUpdateTracker.debug { - logger.Info(color.Green("CrawlAndGetDataUsage:")+" Cache not updated, %d entries", len(cache.Cache)) - } - } - - return err + return cache, err } /// Bucket operations @@ -381,6 +437,7 @@ func (fs *FSObjects) SetBucketPolicy(ctx context.Context, bucket string, p *poli return err } + var json = jsoniter.ConfigCompatibleWithStandardLibrary configData, err := json.Marshal(p) if err != nil { return err diff --git a/cmd/posix.go b/cmd/posix.go index d9a196f35..cd96bdd8a 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -18,6 +18,7 @@ package cmd import ( "bufio" + "bytes" "context" "crypto/rand" "encoding/hex" @@ -35,8 +36,6 @@ import ( "syscall" "time" - "bytes" - humanize "github.com/dustin/go-humanize" jsoniter "github.com/json-iterator/go" "github.com/klauspost/readahead" @@ -366,7 +365,18 @@ func (s *posix) waitForLowActiveIO() { } func (s *posix) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { - dataUsageInfo, err := updateUsage(ctx, s.diskPath, cache, s.waitForLowActiveIO, func(item Item) (int64, error) { + // Check if the current bucket has a configured lifecycle policy + lc, err := globalLifecycleSys.Get(cache.Info.Name) + if err == nil && lc.HasActiveRules("", true) { + cache.Info.lifeCycle = lc + } + + // Get object api + objAPI := newObjectLayerWithoutSafeModeFn() + if objAPI == nil { + return cache, errors.New("object layer not initialized") + } + dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, s.waitForLowActiveIO, func(item crawlItem) (int64, error) { // Look for `xl.json' at the leaf. if !strings.HasSuffix(item.Path, SlashSeparator+xlMetaJSONFile) { // if no xl.json found, skip the file. @@ -379,23 +389,22 @@ func (s *posix) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) } meta, err := xlMetaV1UnmarshalJSON(ctx, xlMetaBuf) - if err != nil { - return 0, nil - } - - // we don't necessarily care about the names - // of bucket and object, only interested in size. - // so use some dummy names. - size, err := meta.ToObjectInfo("dummy", "dummy").GetActualSize() if err != nil { return 0, errSkipFile } - return size, nil + // Remove filename which is the meta file. + item.transformMetaDir() + + return item.applyActions(ctx, objAPI, + actionMeta{oi: meta.ToObjectInfo(item.bucket, item.objectPath()), + meta: meta.Meta, + }), nil }) if err != nil { return dataUsageInfo, err } + dataUsageInfo.Info.LastUpdate = time.Now() total := dataUsageInfo.sizeRecursive(dataUsageInfo.Info.Name) if total == nil { diff --git a/cmd/server-main.go b/cmd/server-main.go index 3c984b009..f02de2528 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -350,8 +350,7 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) { initGlobalHeal(ctx, objAPI) } - initDataUsageStats(ctx, objAPI) - initDailyLifecycle(ctx, objAPI) + initDataCrawler(ctx, objAPI) initQuotaEnforcement(ctx, objAPI) } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 3e9b48093..d21934f0c 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -168,12 +168,8 @@ func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache if err != nil { return cache, err } - b, err = ioutil.ReadAll(reader) - if err != nil { - return cache, err - } var newCache dataUsageCache - return newCache, newCache.deserialize(b) + return newCache, newCache.deserialize(reader) } func (client *storageRESTClient) GetDiskID() (string, error) { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 62abd6287..9411c96c5 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -138,13 +138,8 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r } w.Header().Set(xhttp.ContentType, "text/event-stream") - b, err := ioutil.ReadAll(r.Body) - if err != nil { - s.writeErrorResponse(w, err) - return - } var cache dataUsageCache - err = cache.deserialize(b) + err := cache.deserialize(r.Body) if err != nil { logger.LogIf(r.Context(), err) s.writeErrorResponse(w, err) diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 02dca619a..b5c154390 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -214,7 +214,7 @@ func (xl xlObjects) GetMetrics(ctx context.Context) (*Metrics, error) { // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. func (xl xlObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { - // This should only be called from runDataUsageInfo and this setup should not happen (zones). + // This should only be called from runDataCrawler and this setup should not happen (zones). return errors.New("xlObjects CrawlAndGetDataUsage not implemented") } @@ -246,7 +246,7 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn Name: dataUsageRoot, NextCycle: oldCache.Info.NextCycle, }, - Cache: make(map[dataUsageHash]dataUsageEntry, len(oldCache.Cache)), + Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)), } // Put all buckets into channel. diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 0be9df8de..12d6b7ef1 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -261,6 +261,7 @@ func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, upd // Start crawler. Blocks until done. err := xl.crawlAndGetDataUsage(ctx, buckets, bf, updates) if err != nil { + logger.LogIf(ctx, err) mu.Lock() if firstErr == nil { firstErr = err @@ -313,9 +314,11 @@ func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, upd wg.Wait() ch := make(chan struct{}) - updateCloser <- ch - <-ch - + select { + case updateCloser <- ch: + <-ch + case <-ctx.Done(): + } return firstErr } diff --git a/pkg/bucket/lifecycle/action_string.go b/pkg/bucket/lifecycle/action_string.go new file mode 100644 index 000000000..5b22b9af7 --- /dev/null +++ b/pkg/bucket/lifecycle/action_string.go @@ -0,0 +1,24 @@ +// Code generated by "stringer -type Action lifecycle.go"; DO NOT EDIT. + +package lifecycle + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[NoneAction-0] + _ = x[DeleteAction-1] +} + +const _Action_name = "NoneActionDeleteAction" + +var _Action_index = [...]uint8{0, 10, 22} + +func (i Action) String() string { + if i < 0 || i >= Action(len(_Action_index)-1) { + return "Action(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Action_name[_Action_index[i]:_Action_index[i+1]] +} diff --git a/pkg/bucket/lifecycle/expiration.go b/pkg/bucket/lifecycle/expiration.go index 961b41c9a..2748d7efd 100644 --- a/pkg/bucket/lifecycle/expiration.go +++ b/pkg/bucket/lifecycle/expiration.go @@ -124,7 +124,7 @@ func (e Expiration) IsDaysNull() bool { // IsDateNull returns true if date field is null func (e Expiration) IsDateNull() bool { - return e.Date == ExpirationDate{time.Time{}} + return e.Date.Time.IsZero() } // IsNull returns true if both date and days fields are null diff --git a/pkg/bucket/lifecycle/lifecycle.go b/pkg/bucket/lifecycle/lifecycle.go index 383e27bff..fd401d3c1 100644 --- a/pkg/bucket/lifecycle/lifecycle.go +++ b/pkg/bucket/lifecycle/lifecycle.go @@ -33,6 +33,8 @@ var ( // actions that will be implemented later. type Action int +//go:generate stringer -type Action $GOFILE + const ( // NoneAction means no action required after evaluting lifecycle rules NoneAction Action = iota @@ -46,9 +48,44 @@ type Lifecycle struct { Rules []Rule `xml:"Rule"` } -// IsEmpty - returns whether policy is empty or not. -func (lc Lifecycle) IsEmpty() bool { - return len(lc.Rules) == 0 +// HasActiveRules - returns whether policy has active rules for. +// Optionally a prefix can be supplied. +// If recursive is specified the function will also return true if any level below the +// prefix has active rules. If no prefix is specified recursive is effectively true. +func (lc Lifecycle) HasActiveRules(prefix string, recursive bool) bool { + if len(lc.Rules) == 0 { + return false + } + for _, rule := range lc.Rules { + if rule.Status == Disabled { + continue + } + if len(prefix) > 0 && len(rule.Filter.Prefix) > 0 { + // incoming prefix must be in rule prefix + if !recursive && !strings.HasPrefix(prefix, rule.Filter.Prefix) { + continue + } + // If recursive, we can skip this rule if it doesn't match the tested prefix. + if recursive && !strings.HasPrefix(rule.Filter.Prefix, prefix) { + continue + } + } + + if rule.NoncurrentVersionExpiration.NoncurrentDays > 0 { + return true + } + if rule.NoncurrentVersionTransition.NoncurrentDays > 0 { + return true + } + if rule.Expiration.IsNull() { + continue + } + if !rule.Expiration.IsDateNull() && rule.Expiration.Date.After(time.Now()) { + continue + } + return true + } + return false } // ParseLifecycleConfig - parses data in given reader to Lifecycle.