From a7577da76854a46fa79003817bd2ba143e0ed54c Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Fri, 1 Mar 2024 21:11:03 -0800 Subject: [PATCH] Improve expiration of tiered objects (#18926) - Use a shared worker pool for all ILM expiry tasks - Free version cleanup executes in a separate goroutine - Add a free version only if removing the remote object fails - Add ILM expiry metrics to the node namespace - Move tier journal tasks to expiryState - Remove unused on-disk journal for tiered objects pending deletion - Distribute expiry tasks across workers such that the expiry of versions of the same object serialized - Ability to resize worker pool without server restart - Make scaling down of expiryState workers' concurrency safe; Thanks @klauspost - Add error logs when expiryState and transition state are not initialized (yet) * metrics: Add missed tier journal entry tasks * Initialize the ILM worker pool after the object layer --- cmd/bucket-lifecycle.go | 327 +++++++++++++++++++++------- cmd/config-current.go | 18 ++ cmd/data-scanner.go | 36 --- cmd/data-scanner_test.go | 16 +- cmd/erasure-metadata.go | 18 ++ cmd/erasure-metadata_test.go | 8 + cmd/erasure-object.go | 6 + cmd/erasure-server-pool.go | 4 - cmd/globals.go | 2 - cmd/handler-api.go | 12 +- cmd/metrics-v2.go | 56 ++++- cmd/object-api-interface.go | 6 + cmd/server-main.go | 8 +- cmd/tier-journal.go | 295 ------------------------- cmd/tier-journal_gen.go | 288 ------------------------ cmd/tier-journal_gen_test.go | 236 -------------------- cmd/tier-journal_test.go | 121 ---------- cmd/tier-mem-journal.go | 56 ----- cmd/tier-sweeper.go | 23 +- cmd/xl-storage-free-version.go | 3 + cmd/xl-storage-free-version_test.go | 57 ++++- cmd/xl-storage.go | 2 +- internal/config/api/api.go | 16 ++ internal/config/api/help.go | 8 +- internal/config/config.go | 3 + internal/config/errors.go | 5 + internal/config/ilm/help.go | 52 +++++ internal/config/ilm/ilm.go | 60 +++++ 28 files changed, 611 insertions(+), 1131 deletions(-) delete mode 100644 cmd/tier-journal.go delete mode 100644 cmd/tier-journal_gen.go delete mode 100644 cmd/tier-journal_gen_test.go delete mode 100644 cmd/tier-journal_test.go delete mode 100644 cmd/tier-mem-journal.go create mode 100644 internal/config/ilm/help.go create mode 100644 internal/config/ilm/ilm.go diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 2f4a50258..48fb59460 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "net/http" - "runtime" "strconv" "strings" "sync" @@ -39,12 +38,9 @@ import ( "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/event" xhttp "github.com/minio/minio/internal/http" - xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/s3select" - "github.com/minio/pkg/v2/env" xnet "github.com/minio/pkg/v2/net" - "github.com/minio/pkg/v2/workers" "github.com/zeebo/xxh3" ) @@ -105,95 +101,280 @@ type expiryTask struct { src lcEventSrc } +// expiryStats records metrics related to ILM expiry activities +type expiryStats struct { + missedExpiryTasks atomic.Int64 + missedFreeVersTasks atomic.Int64 + missedTierJournalTasks atomic.Int64 + workers atomic.Int32 +} + +// MissedTasks returns the number of ILM expiry tasks that were missed since +// there were no available workers. +func (e *expiryStats) MissedTasks() int64 { + return e.missedExpiryTasks.Load() +} + +// MissedFreeVersTasks returns the number of free version collection tasks that +// were missed since there were no available workers. +func (e *expiryStats) MissedFreeVersTasks() int64 { + return e.missedFreeVersTasks.Load() +} + +// MissedTierJournalTasks returns the number of tasks to remove tiered objects +// that were missed since there were no available workers. +func (e *expiryStats) MissedTierJournalTasks() int64 { + return e.missedTierJournalTasks.Load() +} + +// NumWorkers returns the number of active workers executing one of ILM expiry +// tasks or free version collection tasks. +func (e *expiryStats) NumWorkers() int32 { + return e.workers.Load() +} + +type expiryOp interface { + OpHash() uint64 +} + +type freeVersionTask struct { + ObjectInfo +} + +func (f freeVersionTask) OpHash() uint64 { + return xxh3.HashString(f.TransitionedObject.Tier + f.TransitionedObject.Name) +} + +func (n newerNoncurrentTask) OpHash() uint64 { + return xxh3.HashString(n.bucket + n.versions[0].ObjectV.ObjectName) +} + +func (j jentry) OpHash() uint64 { + return xxh3.HashString(j.TierName + j.ObjName) +} + +func (e expiryTask) OpHash() uint64 { + return xxh3.HashString(e.objInfo.Bucket + e.objInfo.Name) +} + +// expiryState manages all ILM related expiration activities. type expiryState struct { - once sync.Once - byDaysCh chan expiryTask - byNewerNoncurrentCh chan newerNoncurrentTask + mu sync.RWMutex + workers atomic.Pointer[[]chan expiryOp] + + ctx context.Context + objAPI ObjectLayer + + stats expiryStats } // PendingTasks returns the number of pending ILM expiry tasks. func (es *expiryState) PendingTasks() int { - return len(es.byDaysCh) + len(es.byNewerNoncurrentCh) + w := es.workers.Load() + if w == nil || len(*w) == 0 { + return 0 + } + var tasks int + for _, wrkr := range *w { + tasks += len(wrkr) + } + return tasks } -// close closes work channels exactly once. -func (es *expiryState) close() { - es.once.Do(func() { - xioutil.SafeClose(es.byDaysCh) - xioutil.SafeClose(es.byNewerNoncurrentCh) - }) +// enqueueTierJournalEntry enqueues a tier journal entry referring to a remote +// object corresponding to a 'replaced' object versions. This applies only to +// non-versioned or version suspended buckets. +func (es *expiryState) enqueueTierJournalEntry(je jentry) { + wrkr := es.getWorkerCh(je.OpHash()) + if wrkr == nil { + es.stats.missedTierJournalTasks.Add(1) + return + } + select { + case <-GlobalContext.Done(): + case wrkr <- je: + default: + es.stats.missedTierJournalTasks.Add(1) + } +} + +// enqueueFreeVersion enqueues a free version to be deleted +func (es *expiryState) enqueueFreeVersion(oi ObjectInfo) { + task := freeVersionTask{ObjectInfo: oi} + wrkr := es.getWorkerCh(task.OpHash()) + if wrkr == nil { + es.stats.missedFreeVersTasks.Add(1) + return + } + select { + case <-GlobalContext.Done(): + case wrkr <- task: + default: + es.stats.missedFreeVersTasks.Add(1) + } } // enqueueByDays enqueues object versions expired by days for expiry. func (es *expiryState) enqueueByDays(oi ObjectInfo, event lifecycle.Event, src lcEventSrc) { + task := expiryTask{objInfo: oi, event: event, src: src} + wrkr := es.getWorkerCh(task.OpHash()) + if wrkr == nil { + es.stats.missedExpiryTasks.Add(1) + return + } select { case <-GlobalContext.Done(): - es.close() - case es.byDaysCh <- expiryTask{objInfo: oi, event: event, src: src}: + case wrkr <- task: default: + es.stats.missedExpiryTasks.Add(1) } } // enqueueByNewerNoncurrent enqueues object versions expired by // NewerNoncurrentVersions limit for expiry. func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []ObjectToDelete, lcEvent lifecycle.Event) { + task := newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent} + wrkr := es.getWorkerCh(task.OpHash()) + if wrkr == nil { + es.stats.missedExpiryTasks.Add(1) + return + } select { case <-GlobalContext.Done(): - es.close() - case es.byNewerNoncurrentCh <- newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent}: + case wrkr <- task: default: + es.stats.missedExpiryTasks.Add(1) } } -var globalExpiryState = newExpiryState() +// globalExpiryState is the per-node instance which manages all ILM expiry tasks. +var globalExpiryState *expiryState -func newExpiryState() *expiryState { - return &expiryState{ - byDaysCh: make(chan expiryTask, 100000), - byNewerNoncurrentCh: make(chan newerNoncurrentTask, 100000), +// newExpiryState creates an expiryState with buffered channels allocated for +// each ILM expiry task type. +func newExpiryState(ctx context.Context, objAPI ObjectLayer, n int) *expiryState { + es := &expiryState{ + ctx: ctx, + objAPI: objAPI, + } + workers := make([]chan expiryOp, 0, n) + es.workers.Store(&workers) + es.ResizeWorkers(n) + return es +} + +func (es *expiryState) getWorkerCh(h uint64) chan<- expiryOp { + w := es.workers.Load() + if w == nil || len(*w) == 0 { + return nil + } + workers := *w + return workers[h%uint64(len(workers))] +} + +func (es *expiryState) ResizeWorkers(n int) { + // Lock to avoid multiple resizes to happen at the same time. + es.mu.Lock() + defer es.mu.Unlock() + var workers []chan expiryOp + if v := es.workers.Load(); v != nil { + // Copy to new array. + workers = append(workers, *v...) + } + + if n == len(workers) || n < 1 { + return + } + + for len(workers) < n { + input := make(chan expiryOp, 10000) + workers = append(workers, input) + go es.Worker(input) + es.stats.workers.Add(1) + } + + for len(workers) > n { + worker := workers[len(workers)-1] + workers = workers[:len(workers)-1] + worker <- expiryOp(nil) + es.stats.workers.Add(-1) + } + // Atomically replace workers. + es.workers.Store(&workers) +} + +// Worker handles 4 types of expiration tasks. +// 1. Expiry of objects, includes regular and transitioned objects +// 2. Expiry of noncurrent versions due to NewerNoncurrentVersions +// 3. Expiry of free-versions, for remote objects of transitioned object which have been expired since. +// 4. Expiry of remote objects corresponding to objects in a +// non-versioned/version suspended buckets +func (es *expiryState) Worker(input <-chan expiryOp) { + for { + select { + case <-es.ctx.Done(): + return + case v, ok := <-input: + if !ok { + return + } + if v == nil { + // ResizeWorkers signaling worker to quit + return + } + switch v := v.(type) { + case expiryTask: + if v.objInfo.TransitionedObject.Status != "" { + applyExpiryOnTransitionedObject(es.ctx, es.objAPI, v.objInfo, v.event, v.src) + } else { + applyExpiryOnNonTransitionedObjects(es.ctx, es.objAPI, v.objInfo, v.event, v.src) + } + case newerNoncurrentTask: + deleteObjectVersions(es.ctx, es.objAPI, v.bucket, v.versions, v.event) + case jentry: + logger.LogIf(es.ctx, deleteObjectFromRemoteTier(es.ctx, v.ObjName, v.VersionID, v.TierName)) + case freeVersionTask: + oi := v.ObjectInfo + traceFn := globalLifecycleSys.trace(oi) + if !oi.TransitionedObject.FreeVersion { + // nothing to be done + return + } + + ignoreNotFoundErr := func(err error) error { + switch { + case isErrVersionNotFound(err), isErrObjectNotFound(err): + return nil + } + return err + } + // Remove the remote object + err := deleteObjectFromRemoteTier(es.ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier) + if ignoreNotFoundErr(err) != nil { + logger.LogIf(es.ctx, err) + return + } + + // Remove this free version + _, err = es.objAPI.DeleteObject(es.ctx, oi.Bucket, oi.Name, ObjectOptions{ + VersionID: oi.VersionID, + InclFreeVersions: true, + }) + if err == nil { + auditLogLifecycle(es.ctx, oi, ILMFreeVersionDelete, nil, traceFn) + } + if ignoreNotFoundErr(err) != nil { + logger.LogIf(es.ctx, err) + } + default: + logger.LogIf(es.ctx, fmt.Errorf("Invalid work type - %v", v)) + } + } } } func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { - workerSize, _ := strconv.Atoi(env.Get("_MINIO_ILM_EXPIRY_WORKERS", strconv.Itoa((runtime.GOMAXPROCS(0)+1)/2))) - if workerSize == 0 { - workerSize = 4 - } - ewk, err := workers.New(workerSize) - if err != nil { - logger.LogIf(ctx, err) - } - - nwk, err := workers.New(workerSize) - if err != nil { - logger.LogIf(ctx, err) - } - - go func() { - for t := range globalExpiryState.byDaysCh { - ewk.Take() - go func(t expiryTask) { - defer ewk.Give() - if t.objInfo.TransitionedObject.Status != "" { - applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.event, t.src) - } else { - applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.event, t.src) - } - }(t) - } - ewk.Wait() - }() - - go func() { - for t := range globalExpiryState.byNewerNoncurrentCh { - nwk.Take() - go func(t newerNoncurrentTask) { - defer nwk.Give() - deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions, t.event) - }(t) - } - nwk.Wait() - }() + globalExpiryState = newExpiryState(ctx, objectAPI, globalAPIConfig.getExpiryWorkers()) } // newerNoncurrentTask encapsulates arguments required by worker to expire objects @@ -417,18 +598,18 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob } return err } - // When an object is past expiry or when a transitioned object is being - // deleted, 'mark' the data in the remote tier for delete. - entry := jentry{ - ObjName: oi.TransitionedObject.Name, - VersionID: oi.TransitionedObject.VersionID, - TierName: oi.TransitionedObject.Tier, + + // Delete remote object from warm-tier + err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier) + if err == nil { + // Skip adding free version since we successfully deleted the + // remote object + opts.SkipFreeVersion = true + } else { + logger.LogIf(ctx, err) } - if err := globalTierJournal.AddEntry(entry); err != nil { - return err - } - // Delete metadata on source, now that data in remote tier has been - // marked for deletion. + + // Now, delete object from hot-tier namespace if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil { return err } diff --git a/cmd/config-current.go b/cmd/config-current.go index 03774f439..852f859d3 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -45,6 +45,7 @@ import ( "github.com/minio/minio/internal/config/identity/openid" idplugin "github.com/minio/minio/internal/config/identity/plugin" xtls "github.com/minio/minio/internal/config/identity/tls" + "github.com/minio/minio/internal/config/ilm" "github.com/minio/minio/internal/config/lambda" "github.com/minio/minio/internal/config/notify" "github.com/minio/minio/internal/config/policy/opa" @@ -78,6 +79,7 @@ func initHelp() { config.SubnetSubSys: subnet.DefaultKVS, config.CallhomeSubSys: callhome.DefaultKVS, config.DriveSubSys: drive.DefaultKVS, + config.ILMSubSys: ilm.DefaultKVS, config.CacheSubSys: cache.DefaultKVS, config.BatchSubSys: batch.DefaultKVS, config.BrowserSubSys: browser.DefaultKVS, @@ -716,6 +718,22 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf return fmt.Errorf("Unable to apply browser config: %w", err) } globalBrowserConfig.Update(browserCfg) + case config.ILMSubSys: + ilmCfg, err := ilm.LookupConfig(s[config.ILMSubSys][config.Default]) + if err != nil { + return fmt.Errorf("Unable to apply ilm config: %w", err) + } + if globalTransitionState != nil { + globalTransitionState.UpdateWorkers(ilmCfg.TransitionWorkers) + } else { + logger.LogIf(ctx, fmt.Errorf("ILM transition subsystem not initialized")) + } + if globalExpiryState != nil { + globalExpiryState.ResizeWorkers(ilmCfg.ExpirationWorkers) + } else { + logger.LogIf(ctx, fmt.Errorf("ILM expiration subsystem not initialized")) + } + } globalServerConfigMu.Lock() defer globalServerConfigMu.Unlock() diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 8ea6613e0..505f51dc1 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -982,42 +982,6 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje return lcEvt.Action, size } -// applyTierObjSweep removes remote object pending deletion and the free-version -// tracking this information. -func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) { - traceFn := globalLifecycleSys.trace(oi) - if !oi.TransitionedObject.FreeVersion { - // nothing to be done - return - } - - ignoreNotFoundErr := func(err error) error { - switch { - case isErrVersionNotFound(err), isErrObjectNotFound(err): - return nil - } - return err - } - // Remove the remote object - err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier) - if ignoreNotFoundErr(err) != nil { - logger.LogIf(ctx, err) - return - } - - // Remove this free version - _, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{ - VersionID: oi.VersionID, - InclFreeVersions: true, - }) - if err == nil { - auditLogLifecycle(ctx, oi, ILMFreeVersionDelete, nil, traceFn) - } - if ignoreNotFoundErr(err) != nil { - logger.LogIf(ctx, err) - } -} - // applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured. // Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return. func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) { diff --git a/cmd/data-scanner_test.go b/cmd/data-scanner_test.go index 464feb805..a55007a67 100644 --- a/cmd/data-scanner_test.go +++ b/cmd/data-scanner_test.go @@ -39,14 +39,20 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { globalBucketMetadataSys = NewBucketMetadataSys() globalBucketObjectLockSys = &BucketObjectLockSys{} globalBucketVersioningSys = &BucketVersioningSys{} - expiryState := newExpiryState() + es := newExpiryState(context.Background(), objAPI, 0) + workers := []chan expiryOp{make(chan expiryOp)} + es.workers.Store(&workers) + globalExpiryState = es var wg sync.WaitGroup wg.Add(1) expired := make([]ObjectToDelete, 0, 5) go func() { defer wg.Done() - for t := range expiryState.byNewerNoncurrentCh { - expired = append(expired, t.versions...) + workers := globalExpiryState.workers.Load() + for t := range (*workers)[0] { + if t, ok := t.(newerNoncurrentTask); ok { + expired = append(expired, t.versions...) + } } }() lc := lifecycle.Lifecycle{ @@ -116,7 +122,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { for i, fi := range fivs[:2] { wants[i] = fi.ToObjectInfo(bucket, obj, versioned) } - gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, expiryState) + gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, es) if err != nil { t.Fatalf("Failed with err: %v", err) } @@ -125,7 +131,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { } // Close expiry state's channel to inspect object versions enqueued for expiration - close(expiryState.byNewerNoncurrentCh) + close(workers[0]) wg.Wait() for _, obj := range expired { switch obj.ObjectV.VersionID { diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 60832aa5d..35d7419cb 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -525,6 +525,7 @@ func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs [] const ( tierFVID = "tier-free-versionID" tierFVMarker = "tier-free-marker" + tierSkipFVID = "tier-skip-fvid" ) // SetTierFreeVersionID sets free-version's versionID. This method is used by @@ -551,6 +552,23 @@ func (fi *FileInfo) SetTierFreeVersion() { fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] = "" } +// SetSkipTierFreeVersion indicates to skip adding a tier free version id. +// Note: Used only when expiring tiered objects and the remote content has +// already been scheduled for deletion +func (fi *FileInfo) SetSkipTierFreeVersion() { + if fi.Metadata == nil { + fi.Metadata = make(map[string]string) + } + fi.Metadata[ReservedMetadataPrefixLower+tierSkipFVID] = "" +} + +// SkipTierFreeVersion returns true if set, false otherwise. +// See SetSkipTierVersion for its purpose. +func (fi *FileInfo) SkipTierFreeVersion() bool { + _, ok := fi.Metadata[ReservedMetadataPrefixLower+tierSkipFVID] + return ok +} + // TierFreeVersion returns true if version is a free-version. func (fi *FileInfo) TierFreeVersion() bool { _, ok := fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] diff --git a/cmd/erasure-metadata_test.go b/cmd/erasure-metadata_test.go index 8cee75dda..6eb518ae4 100644 --- a/cmd/erasure-metadata_test.go +++ b/cmd/erasure-metadata_test.go @@ -314,3 +314,11 @@ func TestTransitionInfoEquals(t *testing.T) { t.Fatalf("Expected to be inequal: fi %v ofi %v", fi, ofi) } } + +func TestSkipTierFreeVersion(t *testing.T) { + fi := newFileInfo("object", 8, 8) + fi.SetSkipTierFreeVersion() + if ok := fi.SkipTierFreeVersion(); !ok { + t.Fatal("Expected SkipTierFreeVersion to be set on FileInfo but wasn't") + } +} diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 49bdd035b..e7e50f3f7 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1975,6 +1975,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string ExpireRestored: opts.Transition.ExpireRestored, } fi.SetTierFreeVersionID(fvID) + if opts.SkipFreeVersion { + fi.SetSkipTierFreeVersion() + } if opts.VersionID != "" { fi.VersionID = opts.VersionID } else if opts.Versioned { @@ -2004,6 +2007,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string ExpireRestored: opts.Transition.ExpireRestored, } dfi.SetTierFreeVersionID(fvID) + if opts.SkipFreeVersion { + dfi.SetSkipTierFreeVersion() + } if err = er.deleteObjectVersion(ctx, bucket, object, dfi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index dd7b06cf1..1dbf2e714 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -192,10 +192,6 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ go globalMRFState.healRoutine(z) }) - bootstrapTrace("initBackgroundExpiry", func() { - initBackgroundExpiry(GlobalContext, z) - }) - // initialize the object layer. defer setObjectLayer(z) diff --git a/cmd/globals.go b/cmd/globals.go index b55ee5b98..1f2f68faa 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -411,8 +411,6 @@ var ( globalTierConfigMgr *TierConfigMgr - globalTierJournal *TierJournal - globalConsoleSrv *consoleapi.Server // handles service freeze or un-freeze S3 API calls. diff --git a/cmd/handler-api.go b/cmd/handler-api.go index f15690e29..7b203ff2c 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -47,6 +47,7 @@ type apiConfig struct { replicationPriority string replicationMaxWorkers int transitionWorkers int + expiryWorkers int staleUploadsExpiry time.Duration staleUploadsCleanupInterval time.Duration @@ -170,7 +171,9 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { } t.replicationPriority = cfg.ReplicationPriority t.replicationMaxWorkers = cfg.ReplicationMaxWorkers - if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers { + + // N B api.transition_workers will be deprecated + if globalTransitionState != nil { globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) } t.transitionWorkers = cfg.TransitionWorkers @@ -365,6 +368,13 @@ func (t *apiConfig) getReplicationOpts() replicationPoolOpts { } } +func (t *apiConfig) getExpiryWorkers() int { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.expiryWorkers +} + func (t *apiConfig) getTransitionWorkers() int { t.mu.RLock() defer t.mu.RUnlock() diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 8ea678a7d..c18ef497c 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -273,10 +273,14 @@ const ( vmemory = "virtual_memory_bytes" cpu = "cpu_total_seconds" - expiryPendingTasks MetricName = "expiry_pending_tasks" - transitionPendingTasks MetricName = "transition_pending_tasks" - transitionActiveTasks MetricName = "transition_active_tasks" - transitionMissedTasks MetricName = "transition_missed_immediate_tasks" + expiryPendingTasks MetricName = "expiry_pending_tasks" + expiryMissedTasks MetricName = "expiry_missed_tasks" + expiryMissedFreeVersions MetricName = "expiry_missed_freeversions" + expiryMissedTierJournalTasks MetricName = "expiry_missed_tierjournal_tasks" + expiryNumWorkers MetricName = "expiry_num_workers" + transitionPendingTasks MetricName = "transition_pending_tasks" + transitionActiveTasks MetricName = "transition_active_tasks" + transitionMissedTasks MetricName = "transition_missed_immediate_tasks" transitionedBytes MetricName = "transitioned_bytes" transitionedObjects MetricName = "transitioned_objects" @@ -2000,6 +2004,42 @@ func getILMNodeMetrics() *MetricsGroup { expPendingTasks := Metric{ Description: getExpiryPendingTasksMD(), } + expMissedTasks := Metric{ + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: expiryMissedTasks, + Help: "Number of object version expiry missed due to busy system", + Type: counterMetric, + }, + } + expMissedFreeVersions := Metric{ + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: expiryMissedFreeVersions, + Help: "Number of free versions expiry missed due to busy system", + Type: counterMetric, + }, + } + expMissedTierJournalTasks := Metric{ + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: expiryMissedTierJournalTasks, + Help: "Number of tier journal entries cleanup missed due to busy system", + Type: counterMetric, + }, + } + expNumWorkers := Metric{ + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: expiryNumWorkers, + Help: "Number of workers expiring object versions currently", + Type: gaugeMetric, + }, + } trPendingTasks := Metric{ Description: getTransitionPendingTasksMD(), } @@ -2011,6 +2051,10 @@ func getILMNodeMetrics() *MetricsGroup { } if globalExpiryState != nil { expPendingTasks.Value = float64(globalExpiryState.PendingTasks()) + expMissedTasks.Value = float64(globalExpiryState.stats.MissedTasks()) + expMissedFreeVersions.Value = float64(globalExpiryState.stats.MissedFreeVersTasks()) + expMissedTierJournalTasks.Value = float64(globalExpiryState.stats.MissedTierJournalTasks()) + expNumWorkers.Value = float64(globalExpiryState.stats.NumWorkers()) } if globalTransitionState != nil { trPendingTasks.Value = float64(globalTransitionState.PendingTasks()) @@ -2019,6 +2063,10 @@ func getILMNodeMetrics() *MetricsGroup { } return []Metric{ expPendingTasks, + expMissedTasks, + expMissedFreeVersions, + expMissedTierJournalTasks, + expNumWorkers, trPendingTasks, trActiveTasks, trMissedTasks, diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 1167659d5..127cc75dd 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -117,7 +117,13 @@ type ObjectOptions struct { // Object must have been read at this point. IndexCB func() []byte + // InclFreeVersions indicates that free versions need to be included + // when looking up a version by fi.VersionID InclFreeVersions bool + // SkipFreeVersion skips adding a free version when a tiered version is + // being 'replaced' + // Note: Used only when a tiered object is being expired. + SkipFreeVersion bool MetadataChg bool // is true if it is a metadata update operation. EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject. diff --git a/cmd/server-main.go b/cmd/server-main.go index d16c8a80c..feaf084c9 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -427,7 +427,6 @@ func initAllSubsystems(ctx context.Context) { // Create new ILM tier configuration subsystem globalTierConfigMgr = NewTierConfigMgr() - globalTierJournal = NewTierJournal() globalTransitionState = newTransitionState(GlobalContext) globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext) @@ -911,6 +910,11 @@ func serverMain(ctx *cli.Context) { initBackgroundReplication(GlobalContext, newObject) }) + // Initialize background ILM worker poool + bootstrapTrace("initBackgroundExpiry", func() { + initBackgroundExpiry(GlobalContext, newObject) + }) + bootstrapTrace("globalTransitionState.Init", func() { globalTransitionState.Init(newObject) }) @@ -930,8 +934,6 @@ func serverMain(ctx *cli.Context) { bootstrapTrace("globalTierConfigMgr.Init", func() { if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil { logger.LogIf(GlobalContext, err) - } else { - logger.FatalIf(globalTierJournal.Init(GlobalContext), "Unable to initialize remote tier pending deletes journal") } }) }() diff --git a/cmd/tier-journal.go b/cmd/tier-journal.go deleted file mode 100644 index dfbad4e26..000000000 --- a/cmd/tier-journal.go +++ /dev/null @@ -1,295 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "context" - "encoding/binary" - "errors" - "fmt" - "io" - "os" - "path/filepath" - "sync" - "time" - - "github.com/minio/minio/internal/logger" -) - -//go:generate msgp -file $GOFILE -unexported -//msgp:ignore TierJournal tierDiskJournal walkfn - -type tierDiskJournal struct { - sync.RWMutex - diskPath string - file *os.File // active journal file -} - -// TierJournal holds an in-memory and an on-disk delete journal of tiered content. -type TierJournal struct { - *tierDiskJournal // for processing legacy journal entries - *tierMemJournal // for processing new journal entries -} - -type jentry struct { - ObjName string `msg:"obj"` - VersionID string `msg:"vid"` - TierName string `msg:"tier"` -} - -const ( - tierJournalVersion = 1 - tierJournalHdrLen = 2 // 2 bytes -) - -var errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version") - -func newTierDiskJournal() *tierDiskJournal { - return &tierDiskJournal{} -} - -// NewTierJournal initializes tier deletion journal -func NewTierJournal() *TierJournal { - j := &TierJournal{ - tierMemJournal: newTierMemJournal(1000), - tierDiskJournal: newTierDiskJournal(), - } - return j -} - -// Init initializes an in-memory journal built using a -// buffered channel for new journal entries. It also initializes the on-disk -// journal only to process existing journal entries made from previous versions. -func (t *TierJournal) Init(ctx context.Context) error { - for _, diskPath := range globalEndpoints.LocalDisksPaths() { - t.diskPath = diskPath - - go t.deletePending(ctx) // for existing journal entries from previous MinIO versions - go t.processEntries(ctx) // for newer journal entries circa free-versions - return nil - } - - return errors.New("no local drive found") -} - -// rotate rotates the journal. If a read-only journal already exists it does -// nothing. Otherwise renames the active journal to a read-only journal and -// opens a new active journal. -func (jd *tierDiskJournal) rotate() error { - // Do nothing if a read-only journal file already exists. - if _, err := os.Stat(jd.ReadOnlyPath()); err == nil { - return nil - } - // Close the active journal if present and delete it. - return jd.Close() -} - -type walkFn func(ctx context.Context, objName, rvID, tierName string) error - -func (jd *tierDiskJournal) ReadOnlyPath() string { - return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin") -} - -func (jd *tierDiskJournal) JournalPath() string { - return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.bin") -} - -func (jd *tierDiskJournal) WalkEntries(ctx context.Context, fn walkFn) { - if err := jd.rotate(); err != nil { - logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to rotate pending deletes journal %s", err)) - return - } - - ro, err := jd.OpenRO() - switch { - case errors.Is(err, os.ErrNotExist): - return // No read-only journal to process; nothing to do. - case err != nil: - logger.LogIf(ctx, fmt.Errorf("tier-journal: failed open read-only journal for processing %s", err)) - return - } - defer ro.Close() - mr := msgpNewReader(ro) - defer readMsgpReaderPoolPut(mr) - - done := false - for { - var entry jentry - err := entry.DecodeMsg(mr) - if errors.Is(err, io.EOF) { - done = true - break - } - if err != nil { - logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to decode journal entry %s", err)) - break - } - err = fn(ctx, entry.ObjName, entry.VersionID, entry.TierName) - if err != nil && !isErrObjectNotFound(err) { - logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err)) - // We add the entry into the active journal to try again - // later. - jd.addEntry(entry) - } - } - if done { - os.Remove(jd.ReadOnlyPath()) - } -} - -func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error { - w, err := globalTierConfigMgr.getDriver(tierName) - if err != nil { - return err - } - err = w.Remove(ctx, objName, remoteVersionID(rvID)) - if err != nil { - return err - } - return nil -} - -func (jd *tierDiskJournal) deletePending(ctx context.Context) { - ticker := time.NewTicker(30 * time.Minute) - defer ticker.Stop() - for { - select { - case <-ticker.C: - jd.WalkEntries(ctx, deleteObjectFromRemoteTier) - - case <-ctx.Done(): - jd.Close() - return - } - } -} - -func (jd *tierDiskJournal) addEntry(je jentry) error { - // Open journal if it hasn't been - err := jd.Open() - if err != nil { - return err - } - - b, err := je.MarshalMsg(nil) - if err != nil { - return err - } - - jd.Lock() - defer jd.Unlock() - _, err = jd.file.Write(b) - if err != nil { - // Do not leak fd here, close the file properly. - Fdatasync(jd.file) - _ = jd.file.Close() - - jd.file = nil // reset to allow subsequent reopen when file/disk is available. - } - return err -} - -// Close closes the active journal and renames it to read-only for pending -// deletes processing. Note: calling Close on a closed journal is a no-op. -func (jd *tierDiskJournal) Close() error { - jd.Lock() - defer jd.Unlock() - if jd.file == nil { // already closed - return nil - } - - var ( - f *os.File - fi os.FileInfo - err error - ) - // Setting j.file to nil - f, jd.file = jd.file, f - if fi, err = f.Stat(); err != nil { - return err - } - f.Close() // close before rename() - - // Skip renaming active journal if empty. - if fi.Size() == tierJournalHdrLen { - return os.Remove(jd.JournalPath()) - } - - jPath := jd.JournalPath() - jroPath := jd.ReadOnlyPath() - // Rotate active journal to perform pending deletes. - return os.Rename(jPath, jroPath) -} - -// Open opens a new active journal. Note: calling Open on an opened journal is a -// no-op. -func (jd *tierDiskJournal) Open() error { - jd.Lock() - defer jd.Unlock() - if jd.file != nil { // already open - return nil - } - - var err error - jd.file, err = OpenFile(jd.JournalPath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY|writeMode, 0o666) - if err != nil { - return err - } - - // write journal version header if active journal is empty - fi, err := jd.file.Stat() - if err != nil { - return err - } - if fi.Size() == 0 { - var data [tierJournalHdrLen]byte - binary.LittleEndian.PutUint16(data[:], tierJournalVersion) - _, err = jd.file.Write(data[:]) - if err != nil { - return err - } - } - return nil -} - -func (jd *tierDiskJournal) OpenRO() (io.ReadCloser, error) { - file, err := Open(jd.ReadOnlyPath()) - if err != nil { - return nil, err - } - - // read journal version header - var data [tierJournalHdrLen]byte - if _, err := io.ReadFull(file, data[:]); err != nil { - return nil, err - } - - switch binary.LittleEndian.Uint16(data[:]) { - case tierJournalVersion: - return file, nil - default: - return nil, errUnsupportedJournalVersion - } -} - -// jentryV1 represents the entry in the journal before RemoteVersionID was -// added. It remains here for use in tests for the struct element addition. -type jentryV1 struct { - ObjName string `msg:"obj"` - TierName string `msg:"tier"` -} diff --git a/cmd/tier-journal_gen.go b/cmd/tier-journal_gen.go deleted file mode 100644 index f62c3af13..000000000 --- a/cmd/tier-journal_gen.go +++ /dev/null @@ -1,288 +0,0 @@ -package cmd - -// Code generated by github.com/tinylib/msgp DO NOT EDIT. - -import ( - "github.com/tinylib/msgp/msgp" -) - -// DecodeMsg implements msgp.Decodable -func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "obj": - z.ObjName, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "ObjName") - return - } - case "vid": - z.VersionID, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "VersionID") - return - } - case "tier": - z.TierName, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "TierName") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 3 - // write "obj" - err = en.Append(0x83, 0xa3, 0x6f, 0x62, 0x6a) - if err != nil { - return - } - err = en.WriteString(z.ObjName) - if err != nil { - err = msgp.WrapError(err, "ObjName") - return - } - // write "vid" - err = en.Append(0xa3, 0x76, 0x69, 0x64) - if err != nil { - return - } - err = en.WriteString(z.VersionID) - if err != nil { - err = msgp.WrapError(err, "VersionID") - return - } - // write "tier" - err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72) - if err != nil { - return - } - err = en.WriteString(z.TierName) - if err != nil { - err = msgp.WrapError(err, "TierName") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 3 - // string "obj" - o = append(o, 0x83, 0xa3, 0x6f, 0x62, 0x6a) - o = msgp.AppendString(o, z.ObjName) - // string "vid" - o = append(o, 0xa3, 0x76, 0x69, 0x64) - o = msgp.AppendString(o, z.VersionID) - // string "tier" - o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72) - o = msgp.AppendString(o, z.TierName) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "obj": - z.ObjName, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ObjName") - return - } - case "vid": - z.VersionID, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "VersionID") - return - } - case "tier": - z.TierName, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "TierName") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z jentry) Msgsize() (s int) { - s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 4 + msgp.StringPrefixSize + len(z.VersionID) + 5 + msgp.StringPrefixSize + len(z.TierName) - return -} - -// DecodeMsg implements msgp.Decodable -func (z *jentryV1) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "obj": - z.ObjName, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "ObjName") - return - } - case "tier": - z.TierName, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "TierName") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z jentryV1) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "obj" - err = en.Append(0x82, 0xa3, 0x6f, 0x62, 0x6a) - if err != nil { - return - } - err = en.WriteString(z.ObjName) - if err != nil { - err = msgp.WrapError(err, "ObjName") - return - } - // write "tier" - err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72) - if err != nil { - return - } - err = en.WriteString(z.TierName) - if err != nil { - err = msgp.WrapError(err, "TierName") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z jentryV1) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "obj" - o = append(o, 0x82, 0xa3, 0x6f, 0x62, 0x6a) - o = msgp.AppendString(o, z.ObjName) - // string "tier" - o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72) - o = msgp.AppendString(o, z.TierName) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *jentryV1) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "obj": - z.ObjName, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ObjName") - return - } - case "tier": - z.TierName, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "TierName") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z jentryV1) Msgsize() (s int) { - s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 5 + msgp.StringPrefixSize + len(z.TierName) - return -} diff --git a/cmd/tier-journal_gen_test.go b/cmd/tier-journal_gen_test.go deleted file mode 100644 index 5cff069a5..000000000 --- a/cmd/tier-journal_gen_test.go +++ /dev/null @@ -1,236 +0,0 @@ -package cmd - -// Code generated by github.com/tinylib/msgp DO NOT EDIT. - -import ( - "bytes" - "testing" - - "github.com/tinylib/msgp/msgp" -) - -func TestMarshalUnmarshaljentry(t *testing.T) { - v := jentry{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgjentry(b *testing.B) { - v := jentry{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgjentry(b *testing.B) { - v := jentry{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshaljentry(b *testing.B) { - v := jentry{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodejentry(t *testing.T) { - v := jentry{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Log("WARNING: TestEncodeDecodejentry Msgsize() is inaccurate") - } - - vn := jentry{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodejentry(b *testing.B) { - v := jentry{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodejentry(b *testing.B) { - v := jentry{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} - -func TestMarshalUnmarshaljentryV1(t *testing.T) { - v := jentryV1{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgjentryV1(b *testing.B) { - v := jentryV1{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgjentryV1(b *testing.B) { - v := jentryV1{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshaljentryV1(b *testing.B) { - v := jentryV1{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodejentryV1(t *testing.T) { - v := jentryV1{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Log("WARNING: TestEncodeDecodejentryV1 Msgsize() is inaccurate") - } - - vn := jentryV1{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodejentryV1(b *testing.B) { - v := jentryV1{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodejentryV1(b *testing.B) { - v := jentryV1{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} diff --git a/cmd/tier-journal_test.go b/cmd/tier-journal_test.go deleted file mode 100644 index 5c6dd0c75..000000000 --- a/cmd/tier-journal_test.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "bytes" - "testing" - - "github.com/tinylib/msgp/msgp" -) - -// TestJEntryReadOldToNew1 - tests that adding the RemoteVersionID parameter to the -// jentry struct does not cause unexpected errors when reading the serialized -// old version into new version. -func TestJEntryReadOldToNew1(t *testing.T) { - readOldToNewCases := []struct { - je jentryV1 - exp jentry - }{ - {jentryV1{"obj1", "tier1"}, jentry{"obj1", "", "tier1"}}, - {jentryV1{"obj1", ""}, jentry{"obj1", "", ""}}, - {jentryV1{"", "tier1"}, jentry{"", "", "tier1"}}, - {jentryV1{"", ""}, jentry{"", "", ""}}, - } - - var b bytes.Buffer - for _, item := range readOldToNewCases { - bs, err := item.je.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - b.Write(bs) - } - - mr := msgp.NewReader(&b) - for i, item := range readOldToNewCases { - var je jentry - err := je.DecodeMsg(mr) - if err != nil { - t.Fatal(err) - } - if je != item.exp { - t.Errorf("Case %d: Expected: %v Got: %v", i, item.exp, je) - } - } -} - -// TestJEntryWriteNewToOldMix1 - tests that adding the RemoteVersionID parameter -// to the jentry struct does not cause unexpected errors when writing. This -// simulates the case when the active journal has entries in the older version -// struct and due to errors new entries are added in the new version of the -// struct. -func TestJEntryWriteNewToOldMix1(t *testing.T) { - oldStructVals := []jentryV1{ - {"obj1", "tier1"}, - {"obj2", "tier2"}, - {"obj3", "tier3"}, - } - newStructVals := []jentry{ - {"obj4", "", "tier1"}, - {"obj5", "ver2", "tier2"}, - {"obj6", "", "tier3"}, - } - - // Write old struct version values followed by new version values. - var b bytes.Buffer - for _, item := range oldStructVals { - bs, err := item.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - b.Write(bs) - } - for _, item := range newStructVals { - bs, err := item.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - b.Write(bs) - } - - // Read into new struct version and check. - mr := msgp.NewReader(&b) - for i := 0; i < len(oldStructVals)+len(newStructVals); i++ { - var je jentry - err := je.DecodeMsg(mr) - if err != nil { - t.Fatal(err) - } - var expectedJe jentry - if i < len(oldStructVals) { - // For old struct values, the RemoteVersionID will be - // empty - expectedJe = jentry{ - ObjName: oldStructVals[i].ObjName, - VersionID: "", - TierName: oldStructVals[i].TierName, - } - } else { - expectedJe = newStructVals[i-len(oldStructVals)] - } - if expectedJe != je { - t.Errorf("Case %d: Expected: %v, Got: %v", i, expectedJe, je) - } - } -} diff --git a/cmd/tier-mem-journal.go b/cmd/tier-mem-journal.go deleted file mode 100644 index 34fbb8ae4..000000000 --- a/cmd/tier-mem-journal.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "context" - "fmt" - - "github.com/minio/minio/internal/logger" -) - -type tierMemJournal struct { - entries chan jentry -} - -func newTierMemJournal(nevents int) *tierMemJournal { - return &tierMemJournal{ - entries: make(chan jentry, nevents), - } -} - -func (j *tierMemJournal) processEntries(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case entry := <-j.entries: - logger.LogIf(ctx, deleteObjectFromRemoteTier(ctx, entry.ObjName, entry.VersionID, entry.TierName)) - } - } -} - -func (j *tierMemJournal) AddEntry(je jentry) error { - select { - case j.entries <- je: - default: - return fmt.Errorf("failed to remove tiered content at %s with version %s from tier %s, will be retried later.", - je.ObjName, je.VersionID, je.TierName) - } - return nil -} diff --git a/cmd/tier-sweeper.go b/cmd/tier-sweeper.go index f70dce234..f48c99718 100644 --- a/cmd/tier-sweeper.go +++ b/cmd/tier-sweeper.go @@ -18,6 +18,8 @@ package cmd import ( + "context" + "github.com/minio/minio/internal/bucket/lifecycle" ) @@ -128,9 +130,26 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) { } // Sweep removes the transitioned object if it's no longer referred to. -func (os *objSweeper) Sweep() error { +func (os *objSweeper) Sweep() { if je, ok := os.shouldRemoveRemoteObject(); ok { - return globalTierJournal.AddEntry(je) + globalExpiryState.enqueueTierJournalEntry(je) + } +} + +type jentry struct { + ObjName string + VersionID string + TierName string +} + +func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error { + w, err := globalTierConfigMgr.getDriver(tierName) + if err != nil { + return err + } + err = w.Remove(ctx, objName, remoteVersionID(rvID)) + if err != nil { + return err } return nil } diff --git a/cmd/xl-storage-free-version.go b/cmd/xl-storage-free-version.go index 344b0966a..abf2b5af8 100644 --- a/cmd/xl-storage-free-version.go +++ b/cmd/xl-storage-free-version.go @@ -30,6 +30,9 @@ const freeVersion = "free-version" // InitFreeVersion creates a free-version to track the tiered-content of j. If j has // no tiered content, it returns false. func (j xlMetaV2Object) InitFreeVersion(fi FileInfo) (xlMetaV2Version, bool) { + if fi.SkipTierFreeVersion() { + return xlMetaV2Version{}, false + } if status, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok && bytes.Equal(status, []byte(lifecycle.TransitionComplete)) { vID, err := uuid.Parse(fi.TierFreeVersionID()) if err != nil { diff --git a/cmd/xl-storage-free-version_test.go b/cmd/xl-storage-free-version_test.go index f8e4df941..b7205160f 100644 --- a/cmd/xl-storage-free-version_test.go +++ b/cmd/xl-storage-free-version_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/minio/minio/internal/bucket/lifecycle" ) @@ -84,9 +85,7 @@ func TestFreeVersion(t *testing.T) { Hash: nil, }}, }, - MarkDeleted: false, - // DeleteMarkerReplicationStatus: "", - // VersionPurgeStatus: "", + MarkDeleted: false, NumVersions: 1, SuccessorModTime: time.Time{}, } @@ -228,3 +227,55 @@ func TestFreeVersion(t *testing.T) { t.Fatalf("Expected zero free version but got %d", len(freeVersions)) } } + +func TestSkipFreeVersion(t *testing.T) { + fi := FileInfo{ + Volume: "volume", + Name: "object-name", + VersionID: "00000000-0000-0000-0000-000000000001", + IsLatest: true, + Deleted: false, + TransitionStatus: "", + DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", + XLV1: false, + ModTime: time.Now(), + Size: 0, + Mode: 0, + Metadata: nil, + Parts: nil, + Erasure: ErasureInfo{ + Algorithm: ReedSolomon.String(), + DataBlocks: 4, + ParityBlocks: 2, + BlockSize: 10000, + Index: 1, + Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8}, + Checksums: []ChecksumInfo{{ + PartNumber: 1, + Algorithm: HighwayHash256S, + Hash: nil, + }}, + }, + MarkDeleted: false, + // DeleteMarkerReplicationStatus: "", + // VersionPurgeStatus: "", + NumVersions: 1, + SuccessorModTime: time.Time{}, + } + fi.SetTierFreeVersionID(uuid.New().String()) + // Test if free version is created when SkipTier wasn't set on fi + j := xlMetaV2Object{} + j.MetaSys = make(map[string][]byte) + j.MetaSys[metaTierName] = []byte("WARM-1") + j.MetaSys[metaTierStatus] = []byte(lifecycle.TransitionComplete) + j.MetaSys[metaTierObjName] = []byte("obj-1") + if _, ok := j.InitFreeVersion(fi); !ok { + t.Fatal("Expected a free version to be created") + } + + // Test if we skip creating a free version if SkipTier was set on fi + fi.SetSkipTierFreeVersion() + if _, ok := j.InitFreeVersion(fi); ok { + t.Fatal("Expected no free version to be created") + } +} diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 8883a6c04..c113c8fe0 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -632,7 +632,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates for _, freeVersion := range fivs.FreeVersions { oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned) done = globalScannerMetrics.time(scannerMetricTierObjSweep) - item.applyTierObjSweep(ctx, objAPI, oi) + globalExpiryState.enqueueFreeVersion(oi) done() } diff --git a/internal/config/api/api.go b/internal/config/api/api.go index dd8508416..d2f4ef443 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -41,6 +41,7 @@ const ( apiReplicationMaxWorkers = "replication_max_workers" apiTransitionWorkers = "transition_workers" + apiExpiryWorkers = "expiry_workers" apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" apiStaleUploadsExpiry = "stale_uploads_expiry" apiDeleteCleanupInterval = "delete_cleanup_interval" @@ -56,6 +57,7 @@ const ( EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" + EnvAPIExpiryWorkers = "MINIO_API_EXPIRY_WORKERS" EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" @@ -117,6 +119,10 @@ var ( Key: apiTransitionWorkers, Value: "100", }, + config.KV{ + Key: apiExpiryWorkers, + Value: "100", + }, config.KV{ Key: apiStaleUploadsCleanupInterval, Value: "6h", @@ -164,6 +170,7 @@ type Config struct { ReplicationPriority string `json:"replication_priority"` ReplicationMaxWorkers int `json:"replication_max_workers"` TransitionWorkers int `json:"transition_workers"` + ExpiryWorkers int `json:"expiry_workers"` StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` DeleteCleanupInterval time.Duration `json:"delete_cleanup_interval"` @@ -281,6 +288,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { } cfg.TransitionWorkers = transitionWorkers + expiryWorkers, err := strconv.Atoi(env.Get(EnvAPIExpiryWorkers, kvs.GetWithDefault(apiExpiryWorkers, DefaultKVS))) + if err != nil { + return cfg, err + } + if expiryWorkers <= 0 || expiryWorkers > 500 { + return cfg, config.ErrInvalidExpiryWorkersValue(nil).Msg("Number of expiry workers should be between 1 and 500") + } + cfg.ExpiryWorkers = expiryWorkers + v := env.Get(EnvAPIDeleteCleanupInterval, kvs.Get(apiDeleteCleanupInterval)) if v == "" { v = env.Get(EnvDeleteCleanupInterval, kvs.GetWithDefault(apiDeleteCleanupInterval, DefaultKVS)) diff --git a/internal/config/api/help.go b/internal/config/api/help.go index 98a5f18dc..c359c2770 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -19,12 +19,12 @@ package api import "github.com/minio/minio/internal/config" -// Help template for storageclass feature. var ( defaultHelpPostfix = func(key string) string { return config.DefaultHelpPostfix(DefaultKVS, key) } + // Help holds configuration keys and their default values for api subsystem. Help = config.HelpKVS{ config.HelpKV{ Key: apiRequestsMax, @@ -80,6 +80,12 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: apiExpiryWorkers, + Description: `set the number of expiry workers` + defaultHelpPostfix(apiExpiryWorkers), + Optional: true, + Type: "number", + }, config.HelpKV{ Key: apiStaleUploadsExpiry, Description: `set to expire stale multipart uploads older than this values` + defaultHelpPostfix(apiStaleUploadsExpiry), diff --git a/internal/config/config.go b/internal/config/config.go index 21733964c..fd8568544 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -120,6 +120,7 @@ const ( DriveSubSys = madmin.DriveSubSys BatchSubSys = madmin.BatchSubSys BrowserSubSys = madmin.BrowserSubSys + ILMSubSys = madmin.ILMSubsys // Add new constants here (similar to above) if you add new fields to config. ) @@ -188,6 +189,7 @@ var SubSystemsDynamic = set.CreateStringSet( AuditKafkaSubSys, StorageClassSubSys, CacheSubSys, + ILMSubSys, BatchSubSys, BrowserSubSys, ) @@ -211,6 +213,7 @@ var SubSystemsSingleTargets = set.CreateStringSet( SubnetSubSys, CallhomeSubSys, DriveSubSys, + ILMSubSys, BatchSubSys, BrowserSubSys, ) diff --git a/internal/config/errors.go b/internal/config/errors.go index 751152081..24a2d0042 100644 --- a/internal/config/errors.go +++ b/internal/config/errors.go @@ -224,6 +224,11 @@ Examples: "", "MINIO_API_TRANSITION_WORKERS: should be >= GOMAXPROCS/2", ) + ErrInvalidExpiryWorkersValue = newErrFn( + "Invalid value for expiry workers", + "", + "MINIO_API_EXPIRY_WORKERS: should be between 1 and 500", + ) ErrInvalidBatchKeyRotationWorkersWait = newErrFn( "Invalid value for batch key rotation workers wait", "Please input a non-negative duration", diff --git a/internal/config/ilm/help.go b/internal/config/ilm/help.go new file mode 100644 index 000000000..d0037d8dc --- /dev/null +++ b/internal/config/ilm/help.go @@ -0,0 +1,52 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ilm + +import "github.com/minio/minio/internal/config" + +const ( + transitionWorkers = "transition_workers" + expirationWorkers = "expiration_workers" + // EnvILMTransitionWorkers env variable to configure number of transition workers + EnvILMTransitionWorkers = "MINIO_ILM_TRANSITION_WORKERS" + // EnvILMExpirationWorkers env variable to configure number of expiration workers + EnvILMExpirationWorkers = "MINIO_ILM_EXPIRATION_WORKERS" +) + +var ( + defaultHelpPostfix = func(key string) string { + return config.DefaultHelpPostfix(DefaultKVS, key) + } + + // HelpILM holds configuration keys and their default values for the ILM + // subsystem + HelpILM = config.HelpKVS{ + config.HelpKV{ + Key: transitionWorkers, + Type: "number", + Description: `set the number of transition workers` + defaultHelpPostfix(transitionWorkers), + Optional: true, + }, + config.HelpKV{ + Key: expirationWorkers, + Type: "number", + Description: `set the number of expiration workers` + defaultHelpPostfix(expirationWorkers), + Optional: true, + }, + } +) diff --git a/internal/config/ilm/ilm.go b/internal/config/ilm/ilm.go new file mode 100644 index 000000000..b677647d5 --- /dev/null +++ b/internal/config/ilm/ilm.go @@ -0,0 +1,60 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ilm + +import ( + "strconv" + + "github.com/minio/minio/internal/config" + "github.com/minio/pkg/v2/env" +) + +// DefaultKVS default configuration values for ILM subsystem +var DefaultKVS = config.KVS{ + config.KV{ + Key: transitionWorkers, + Value: "100", + }, + config.KV{ + Key: expirationWorkers, + Value: "100", + }, +} + +// Config represents the different configuration values for ILM subsystem +type Config struct { + TransitionWorkers int + ExpirationWorkers int +} + +// LookupConfig - lookup ilm config and override with valid environment settings if any. +func LookupConfig(kvs config.KVS) (cfg Config, err error) { + tw, err := strconv.Atoi(env.Get(EnvILMTransitionWorkers, kvs.GetWithDefault(transitionWorkers, DefaultKVS))) + if err != nil { + return cfg, err + } + + ew, err := strconv.Atoi(env.Get(EnvILMExpirationWorkers, kvs.GetWithDefault(expirationWorkers, DefaultKVS))) + if err != nil { + return cfg, err + } + + cfg.TransitionWorkers = tw + cfg.ExpirationWorkers = ew + return cfg, nil +}