diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go
index 2f4a50258..48fb59460 100644
--- a/cmd/bucket-lifecycle.go
+++ b/cmd/bucket-lifecycle.go
@@ -24,7 +24,6 @@ import (
"fmt"
"io"
"net/http"
- "runtime"
"strconv"
"strings"
"sync"
@@ -39,12 +38,9 @@ import (
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/event"
xhttp "github.com/minio/minio/internal/http"
- xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/s3select"
- "github.com/minio/pkg/v2/env"
xnet "github.com/minio/pkg/v2/net"
- "github.com/minio/pkg/v2/workers"
"github.com/zeebo/xxh3"
)
@@ -105,95 +101,280 @@ type expiryTask struct {
src lcEventSrc
}
+// expiryStats records metrics related to ILM expiry activities
+type expiryStats struct {
+ missedExpiryTasks atomic.Int64
+ missedFreeVersTasks atomic.Int64
+ missedTierJournalTasks atomic.Int64
+ workers atomic.Int32
+}
+
+// MissedTasks returns the number of ILM expiry tasks that were missed since
+// there were no available workers.
+func (e *expiryStats) MissedTasks() int64 {
+ return e.missedExpiryTasks.Load()
+}
+
+// MissedFreeVersTasks returns the number of free version collection tasks that
+// were missed since there were no available workers.
+func (e *expiryStats) MissedFreeVersTasks() int64 {
+ return e.missedFreeVersTasks.Load()
+}
+
+// MissedTierJournalTasks returns the number of tasks to remove tiered objects
+// that were missed since there were no available workers.
+func (e *expiryStats) MissedTierJournalTasks() int64 {
+ return e.missedTierJournalTasks.Load()
+}
+
+// NumWorkers returns the number of active workers executing one of ILM expiry
+// tasks or free version collection tasks.
+func (e *expiryStats) NumWorkers() int32 {
+ return e.workers.Load()
+}
+
+type expiryOp interface {
+ OpHash() uint64
+}
+
+type freeVersionTask struct {
+ ObjectInfo
+}
+
+func (f freeVersionTask) OpHash() uint64 {
+ return xxh3.HashString(f.TransitionedObject.Tier + f.TransitionedObject.Name)
+}
+
+func (n newerNoncurrentTask) OpHash() uint64 {
+ return xxh3.HashString(n.bucket + n.versions[0].ObjectV.ObjectName)
+}
+
+func (j jentry) OpHash() uint64 {
+ return xxh3.HashString(j.TierName + j.ObjName)
+}
+
+func (e expiryTask) OpHash() uint64 {
+ return xxh3.HashString(e.objInfo.Bucket + e.objInfo.Name)
+}
+
+// expiryState manages all ILM related expiration activities.
type expiryState struct {
- once sync.Once
- byDaysCh chan expiryTask
- byNewerNoncurrentCh chan newerNoncurrentTask
+ mu sync.RWMutex
+ workers atomic.Pointer[[]chan expiryOp]
+
+ ctx context.Context
+ objAPI ObjectLayer
+
+ stats expiryStats
}
// PendingTasks returns the number of pending ILM expiry tasks.
func (es *expiryState) PendingTasks() int {
- return len(es.byDaysCh) + len(es.byNewerNoncurrentCh)
+ w := es.workers.Load()
+ if w == nil || len(*w) == 0 {
+ return 0
+ }
+ var tasks int
+ for _, wrkr := range *w {
+ tasks += len(wrkr)
+ }
+ return tasks
}
-// close closes work channels exactly once.
-func (es *expiryState) close() {
- es.once.Do(func() {
- xioutil.SafeClose(es.byDaysCh)
- xioutil.SafeClose(es.byNewerNoncurrentCh)
- })
+// enqueueTierJournalEntry enqueues a tier journal entry referring to a remote
+// object corresponding to a 'replaced' object versions. This applies only to
+// non-versioned or version suspended buckets.
+func (es *expiryState) enqueueTierJournalEntry(je jentry) {
+ wrkr := es.getWorkerCh(je.OpHash())
+ if wrkr == nil {
+ es.stats.missedTierJournalTasks.Add(1)
+ return
+ }
+ select {
+ case <-GlobalContext.Done():
+ case wrkr <- je:
+ default:
+ es.stats.missedTierJournalTasks.Add(1)
+ }
+}
+
+// enqueueFreeVersion enqueues a free version to be deleted
+func (es *expiryState) enqueueFreeVersion(oi ObjectInfo) {
+ task := freeVersionTask{ObjectInfo: oi}
+ wrkr := es.getWorkerCh(task.OpHash())
+ if wrkr == nil {
+ es.stats.missedFreeVersTasks.Add(1)
+ return
+ }
+ select {
+ case <-GlobalContext.Done():
+ case wrkr <- task:
+ default:
+ es.stats.missedFreeVersTasks.Add(1)
+ }
}
// enqueueByDays enqueues object versions expired by days for expiry.
func (es *expiryState) enqueueByDays(oi ObjectInfo, event lifecycle.Event, src lcEventSrc) {
+ task := expiryTask{objInfo: oi, event: event, src: src}
+ wrkr := es.getWorkerCh(task.OpHash())
+ if wrkr == nil {
+ es.stats.missedExpiryTasks.Add(1)
+ return
+ }
select {
case <-GlobalContext.Done():
- es.close()
- case es.byDaysCh <- expiryTask{objInfo: oi, event: event, src: src}:
+ case wrkr <- task:
default:
+ es.stats.missedExpiryTasks.Add(1)
}
}
// enqueueByNewerNoncurrent enqueues object versions expired by
// NewerNoncurrentVersions limit for expiry.
func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []ObjectToDelete, lcEvent lifecycle.Event) {
+ task := newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent}
+ wrkr := es.getWorkerCh(task.OpHash())
+ if wrkr == nil {
+ es.stats.missedExpiryTasks.Add(1)
+ return
+ }
select {
case <-GlobalContext.Done():
- es.close()
- case es.byNewerNoncurrentCh <- newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent}:
+ case wrkr <- task:
default:
+ es.stats.missedExpiryTasks.Add(1)
}
}
-var globalExpiryState = newExpiryState()
+// globalExpiryState is the per-node instance which manages all ILM expiry tasks.
+var globalExpiryState *expiryState
-func newExpiryState() *expiryState {
- return &expiryState{
- byDaysCh: make(chan expiryTask, 100000),
- byNewerNoncurrentCh: make(chan newerNoncurrentTask, 100000),
+// newExpiryState creates an expiryState with buffered channels allocated for
+// each ILM expiry task type.
+func newExpiryState(ctx context.Context, objAPI ObjectLayer, n int) *expiryState {
+ es := &expiryState{
+ ctx: ctx,
+ objAPI: objAPI,
+ }
+ workers := make([]chan expiryOp, 0, n)
+ es.workers.Store(&workers)
+ es.ResizeWorkers(n)
+ return es
+}
+
+func (es *expiryState) getWorkerCh(h uint64) chan<- expiryOp {
+ w := es.workers.Load()
+ if w == nil || len(*w) == 0 {
+ return nil
+ }
+ workers := *w
+ return workers[h%uint64(len(workers))]
+}
+
+func (es *expiryState) ResizeWorkers(n int) {
+ // Lock to avoid multiple resizes to happen at the same time.
+ es.mu.Lock()
+ defer es.mu.Unlock()
+ var workers []chan expiryOp
+ if v := es.workers.Load(); v != nil {
+ // Copy to new array.
+ workers = append(workers, *v...)
+ }
+
+ if n == len(workers) || n < 1 {
+ return
+ }
+
+ for len(workers) < n {
+ input := make(chan expiryOp, 10000)
+ workers = append(workers, input)
+ go es.Worker(input)
+ es.stats.workers.Add(1)
+ }
+
+ for len(workers) > n {
+ worker := workers[len(workers)-1]
+ workers = workers[:len(workers)-1]
+ worker <- expiryOp(nil)
+ es.stats.workers.Add(-1)
+ }
+ // Atomically replace workers.
+ es.workers.Store(&workers)
+}
+
+// Worker handles 4 types of expiration tasks.
+// 1. Expiry of objects, includes regular and transitioned objects
+// 2. Expiry of noncurrent versions due to NewerNoncurrentVersions
+// 3. Expiry of free-versions, for remote objects of transitioned object which have been expired since.
+// 4. Expiry of remote objects corresponding to objects in a
+// non-versioned/version suspended buckets
+func (es *expiryState) Worker(input <-chan expiryOp) {
+ for {
+ select {
+ case <-es.ctx.Done():
+ return
+ case v, ok := <-input:
+ if !ok {
+ return
+ }
+ if v == nil {
+ // ResizeWorkers signaling worker to quit
+ return
+ }
+ switch v := v.(type) {
+ case expiryTask:
+ if v.objInfo.TransitionedObject.Status != "" {
+ applyExpiryOnTransitionedObject(es.ctx, es.objAPI, v.objInfo, v.event, v.src)
+ } else {
+ applyExpiryOnNonTransitionedObjects(es.ctx, es.objAPI, v.objInfo, v.event, v.src)
+ }
+ case newerNoncurrentTask:
+ deleteObjectVersions(es.ctx, es.objAPI, v.bucket, v.versions, v.event)
+ case jentry:
+ logger.LogIf(es.ctx, deleteObjectFromRemoteTier(es.ctx, v.ObjName, v.VersionID, v.TierName))
+ case freeVersionTask:
+ oi := v.ObjectInfo
+ traceFn := globalLifecycleSys.trace(oi)
+ if !oi.TransitionedObject.FreeVersion {
+ // nothing to be done
+ return
+ }
+
+ ignoreNotFoundErr := func(err error) error {
+ switch {
+ case isErrVersionNotFound(err), isErrObjectNotFound(err):
+ return nil
+ }
+ return err
+ }
+ // Remove the remote object
+ err := deleteObjectFromRemoteTier(es.ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
+ if ignoreNotFoundErr(err) != nil {
+ logger.LogIf(es.ctx, err)
+ return
+ }
+
+ // Remove this free version
+ _, err = es.objAPI.DeleteObject(es.ctx, oi.Bucket, oi.Name, ObjectOptions{
+ VersionID: oi.VersionID,
+ InclFreeVersions: true,
+ })
+ if err == nil {
+ auditLogLifecycle(es.ctx, oi, ILMFreeVersionDelete, nil, traceFn)
+ }
+ if ignoreNotFoundErr(err) != nil {
+ logger.LogIf(es.ctx, err)
+ }
+ default:
+ logger.LogIf(es.ctx, fmt.Errorf("Invalid work type - %v", v))
+ }
+ }
}
}
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
- workerSize, _ := strconv.Atoi(env.Get("_MINIO_ILM_EXPIRY_WORKERS", strconv.Itoa((runtime.GOMAXPROCS(0)+1)/2)))
- if workerSize == 0 {
- workerSize = 4
- }
- ewk, err := workers.New(workerSize)
- if err != nil {
- logger.LogIf(ctx, err)
- }
-
- nwk, err := workers.New(workerSize)
- if err != nil {
- logger.LogIf(ctx, err)
- }
-
- go func() {
- for t := range globalExpiryState.byDaysCh {
- ewk.Take()
- go func(t expiryTask) {
- defer ewk.Give()
- if t.objInfo.TransitionedObject.Status != "" {
- applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.event, t.src)
- } else {
- applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.event, t.src)
- }
- }(t)
- }
- ewk.Wait()
- }()
-
- go func() {
- for t := range globalExpiryState.byNewerNoncurrentCh {
- nwk.Take()
- go func(t newerNoncurrentTask) {
- defer nwk.Give()
- deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions, t.event)
- }(t)
- }
- nwk.Wait()
- }()
+ globalExpiryState = newExpiryState(ctx, objectAPI, globalAPIConfig.getExpiryWorkers())
}
// newerNoncurrentTask encapsulates arguments required by worker to expire objects
@@ -417,18 +598,18 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob
}
return err
}
- // When an object is past expiry or when a transitioned object is being
- // deleted, 'mark' the data in the remote tier for delete.
- entry := jentry{
- ObjName: oi.TransitionedObject.Name,
- VersionID: oi.TransitionedObject.VersionID,
- TierName: oi.TransitionedObject.Tier,
+
+ // Delete remote object from warm-tier
+ err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
+ if err == nil {
+ // Skip adding free version since we successfully deleted the
+ // remote object
+ opts.SkipFreeVersion = true
+ } else {
+ logger.LogIf(ctx, err)
}
- if err := globalTierJournal.AddEntry(entry); err != nil {
- return err
- }
- // Delete metadata on source, now that data in remote tier has been
- // marked for deletion.
+
+ // Now, delete object from hot-tier namespace
if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil {
return err
}
diff --git a/cmd/config-current.go b/cmd/config-current.go
index 03774f439..852f859d3 100644
--- a/cmd/config-current.go
+++ b/cmd/config-current.go
@@ -45,6 +45,7 @@ import (
"github.com/minio/minio/internal/config/identity/openid"
idplugin "github.com/minio/minio/internal/config/identity/plugin"
xtls "github.com/minio/minio/internal/config/identity/tls"
+ "github.com/minio/minio/internal/config/ilm"
"github.com/minio/minio/internal/config/lambda"
"github.com/minio/minio/internal/config/notify"
"github.com/minio/minio/internal/config/policy/opa"
@@ -78,6 +79,7 @@ func initHelp() {
config.SubnetSubSys: subnet.DefaultKVS,
config.CallhomeSubSys: callhome.DefaultKVS,
config.DriveSubSys: drive.DefaultKVS,
+ config.ILMSubSys: ilm.DefaultKVS,
config.CacheSubSys: cache.DefaultKVS,
config.BatchSubSys: batch.DefaultKVS,
config.BrowserSubSys: browser.DefaultKVS,
@@ -716,6 +718,22 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
return fmt.Errorf("Unable to apply browser config: %w", err)
}
globalBrowserConfig.Update(browserCfg)
+ case config.ILMSubSys:
+ ilmCfg, err := ilm.LookupConfig(s[config.ILMSubSys][config.Default])
+ if err != nil {
+ return fmt.Errorf("Unable to apply ilm config: %w", err)
+ }
+ if globalTransitionState != nil {
+ globalTransitionState.UpdateWorkers(ilmCfg.TransitionWorkers)
+ } else {
+ logger.LogIf(ctx, fmt.Errorf("ILM transition subsystem not initialized"))
+ }
+ if globalExpiryState != nil {
+ globalExpiryState.ResizeWorkers(ilmCfg.ExpirationWorkers)
+ } else {
+ logger.LogIf(ctx, fmt.Errorf("ILM expiration subsystem not initialized"))
+ }
+
}
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()
diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go
index 8ea6613e0..505f51dc1 100644
--- a/cmd/data-scanner.go
+++ b/cmd/data-scanner.go
@@ -982,42 +982,6 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
return lcEvt.Action, size
}
-// applyTierObjSweep removes remote object pending deletion and the free-version
-// tracking this information.
-func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
- traceFn := globalLifecycleSys.trace(oi)
- if !oi.TransitionedObject.FreeVersion {
- // nothing to be done
- return
- }
-
- ignoreNotFoundErr := func(err error) error {
- switch {
- case isErrVersionNotFound(err), isErrObjectNotFound(err):
- return nil
- }
- return err
- }
- // Remove the remote object
- err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
- if ignoreNotFoundErr(err) != nil {
- logger.LogIf(ctx, err)
- return
- }
-
- // Remove this free version
- _, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{
- VersionID: oi.VersionID,
- InclFreeVersions: true,
- })
- if err == nil {
- auditLogLifecycle(ctx, oi, ILMFreeVersionDelete, nil, traceFn)
- }
- if ignoreNotFoundErr(err) != nil {
- logger.LogIf(ctx, err)
- }
-}
-
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) {
diff --git a/cmd/data-scanner_test.go b/cmd/data-scanner_test.go
index 464feb805..a55007a67 100644
--- a/cmd/data-scanner_test.go
+++ b/cmd/data-scanner_test.go
@@ -39,14 +39,20 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
globalBucketMetadataSys = NewBucketMetadataSys()
globalBucketObjectLockSys = &BucketObjectLockSys{}
globalBucketVersioningSys = &BucketVersioningSys{}
- expiryState := newExpiryState()
+ es := newExpiryState(context.Background(), objAPI, 0)
+ workers := []chan expiryOp{make(chan expiryOp)}
+ es.workers.Store(&workers)
+ globalExpiryState = es
var wg sync.WaitGroup
wg.Add(1)
expired := make([]ObjectToDelete, 0, 5)
go func() {
defer wg.Done()
- for t := range expiryState.byNewerNoncurrentCh {
- expired = append(expired, t.versions...)
+ workers := globalExpiryState.workers.Load()
+ for t := range (*workers)[0] {
+ if t, ok := t.(newerNoncurrentTask); ok {
+ expired = append(expired, t.versions...)
+ }
}
}()
lc := lifecycle.Lifecycle{
@@ -116,7 +122,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
for i, fi := range fivs[:2] {
wants[i] = fi.ToObjectInfo(bucket, obj, versioned)
}
- gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, expiryState)
+ gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, es)
if err != nil {
t.Fatalf("Failed with err: %v", err)
}
@@ -125,7 +131,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
}
// Close expiry state's channel to inspect object versions enqueued for expiration
- close(expiryState.byNewerNoncurrentCh)
+ close(workers[0])
wg.Wait()
for _, obj := range expired {
switch obj.ObjectV.VersionID {
diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go
index 60832aa5d..35d7419cb 100644
--- a/cmd/erasure-metadata.go
+++ b/cmd/erasure-metadata.go
@@ -525,6 +525,7 @@ func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []
const (
tierFVID = "tier-free-versionID"
tierFVMarker = "tier-free-marker"
+ tierSkipFVID = "tier-skip-fvid"
)
// SetTierFreeVersionID sets free-version's versionID. This method is used by
@@ -551,6 +552,23 @@ func (fi *FileInfo) SetTierFreeVersion() {
fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] = ""
}
+// SetSkipTierFreeVersion indicates to skip adding a tier free version id.
+// Note: Used only when expiring tiered objects and the remote content has
+// already been scheduled for deletion
+func (fi *FileInfo) SetSkipTierFreeVersion() {
+ if fi.Metadata == nil {
+ fi.Metadata = make(map[string]string)
+ }
+ fi.Metadata[ReservedMetadataPrefixLower+tierSkipFVID] = ""
+}
+
+// SkipTierFreeVersion returns true if set, false otherwise.
+// See SetSkipTierVersion for its purpose.
+func (fi *FileInfo) SkipTierFreeVersion() bool {
+ _, ok := fi.Metadata[ReservedMetadataPrefixLower+tierSkipFVID]
+ return ok
+}
+
// TierFreeVersion returns true if version is a free-version.
func (fi *FileInfo) TierFreeVersion() bool {
_, ok := fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker]
diff --git a/cmd/erasure-metadata_test.go b/cmd/erasure-metadata_test.go
index 8cee75dda..6eb518ae4 100644
--- a/cmd/erasure-metadata_test.go
+++ b/cmd/erasure-metadata_test.go
@@ -314,3 +314,11 @@ func TestTransitionInfoEquals(t *testing.T) {
t.Fatalf("Expected to be inequal: fi %v ofi %v", fi, ofi)
}
}
+
+func TestSkipTierFreeVersion(t *testing.T) {
+ fi := newFileInfo("object", 8, 8)
+ fi.SetSkipTierFreeVersion()
+ if ok := fi.SkipTierFreeVersion(); !ok {
+ t.Fatal("Expected SkipTierFreeVersion to be set on FileInfo but wasn't")
+ }
+}
diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go
index 49bdd035b..e7e50f3f7 100644
--- a/cmd/erasure-object.go
+++ b/cmd/erasure-object.go
@@ -1975,6 +1975,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
ExpireRestored: opts.Transition.ExpireRestored,
}
fi.SetTierFreeVersionID(fvID)
+ if opts.SkipFreeVersion {
+ fi.SetSkipTierFreeVersion()
+ }
if opts.VersionID != "" {
fi.VersionID = opts.VersionID
} else if opts.Versioned {
@@ -2004,6 +2007,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
ExpireRestored: opts.Transition.ExpireRestored,
}
dfi.SetTierFreeVersionID(fvID)
+ if opts.SkipFreeVersion {
+ dfi.SetSkipTierFreeVersion()
+ }
if err = er.deleteObjectVersion(ctx, bucket, object, dfi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go
index dd7b06cf1..1dbf2e714 100644
--- a/cmd/erasure-server-pool.go
+++ b/cmd/erasure-server-pool.go
@@ -192,10 +192,6 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
go globalMRFState.healRoutine(z)
})
- bootstrapTrace("initBackgroundExpiry", func() {
- initBackgroundExpiry(GlobalContext, z)
- })
-
// initialize the object layer.
defer setObjectLayer(z)
diff --git a/cmd/globals.go b/cmd/globals.go
index b55ee5b98..1f2f68faa 100644
--- a/cmd/globals.go
+++ b/cmd/globals.go
@@ -411,8 +411,6 @@ var (
globalTierConfigMgr *TierConfigMgr
- globalTierJournal *TierJournal
-
globalConsoleSrv *consoleapi.Server
// handles service freeze or un-freeze S3 API calls.
diff --git a/cmd/handler-api.go b/cmd/handler-api.go
index f15690e29..7b203ff2c 100644
--- a/cmd/handler-api.go
+++ b/cmd/handler-api.go
@@ -47,6 +47,7 @@ type apiConfig struct {
replicationPriority string
replicationMaxWorkers int
transitionWorkers int
+ expiryWorkers int
staleUploadsExpiry time.Duration
staleUploadsCleanupInterval time.Duration
@@ -170,7 +171,9 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
}
t.replicationPriority = cfg.ReplicationPriority
t.replicationMaxWorkers = cfg.ReplicationMaxWorkers
- if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers {
+
+ // N B api.transition_workers will be deprecated
+ if globalTransitionState != nil {
globalTransitionState.UpdateWorkers(cfg.TransitionWorkers)
}
t.transitionWorkers = cfg.TransitionWorkers
@@ -365,6 +368,13 @@ func (t *apiConfig) getReplicationOpts() replicationPoolOpts {
}
}
+func (t *apiConfig) getExpiryWorkers() int {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+
+ return t.expiryWorkers
+}
+
func (t *apiConfig) getTransitionWorkers() int {
t.mu.RLock()
defer t.mu.RUnlock()
diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go
index 8ea678a7d..c18ef497c 100644
--- a/cmd/metrics-v2.go
+++ b/cmd/metrics-v2.go
@@ -273,10 +273,14 @@ const (
vmemory = "virtual_memory_bytes"
cpu = "cpu_total_seconds"
- expiryPendingTasks MetricName = "expiry_pending_tasks"
- transitionPendingTasks MetricName = "transition_pending_tasks"
- transitionActiveTasks MetricName = "transition_active_tasks"
- transitionMissedTasks MetricName = "transition_missed_immediate_tasks"
+ expiryPendingTasks MetricName = "expiry_pending_tasks"
+ expiryMissedTasks MetricName = "expiry_missed_tasks"
+ expiryMissedFreeVersions MetricName = "expiry_missed_freeversions"
+ expiryMissedTierJournalTasks MetricName = "expiry_missed_tierjournal_tasks"
+ expiryNumWorkers MetricName = "expiry_num_workers"
+ transitionPendingTasks MetricName = "transition_pending_tasks"
+ transitionActiveTasks MetricName = "transition_active_tasks"
+ transitionMissedTasks MetricName = "transition_missed_immediate_tasks"
transitionedBytes MetricName = "transitioned_bytes"
transitionedObjects MetricName = "transitioned_objects"
@@ -2000,6 +2004,42 @@ func getILMNodeMetrics() *MetricsGroup {
expPendingTasks := Metric{
Description: getExpiryPendingTasksMD(),
}
+ expMissedTasks := Metric{
+ Description: MetricDescription{
+ Namespace: nodeMetricNamespace,
+ Subsystem: ilmSubsystem,
+ Name: expiryMissedTasks,
+ Help: "Number of object version expiry missed due to busy system",
+ Type: counterMetric,
+ },
+ }
+ expMissedFreeVersions := Metric{
+ Description: MetricDescription{
+ Namespace: nodeMetricNamespace,
+ Subsystem: ilmSubsystem,
+ Name: expiryMissedFreeVersions,
+ Help: "Number of free versions expiry missed due to busy system",
+ Type: counterMetric,
+ },
+ }
+ expMissedTierJournalTasks := Metric{
+ Description: MetricDescription{
+ Namespace: nodeMetricNamespace,
+ Subsystem: ilmSubsystem,
+ Name: expiryMissedTierJournalTasks,
+ Help: "Number of tier journal entries cleanup missed due to busy system",
+ Type: counterMetric,
+ },
+ }
+ expNumWorkers := Metric{
+ Description: MetricDescription{
+ Namespace: nodeMetricNamespace,
+ Subsystem: ilmSubsystem,
+ Name: expiryNumWorkers,
+ Help: "Number of workers expiring object versions currently",
+ Type: gaugeMetric,
+ },
+ }
trPendingTasks := Metric{
Description: getTransitionPendingTasksMD(),
}
@@ -2011,6 +2051,10 @@ func getILMNodeMetrics() *MetricsGroup {
}
if globalExpiryState != nil {
expPendingTasks.Value = float64(globalExpiryState.PendingTasks())
+ expMissedTasks.Value = float64(globalExpiryState.stats.MissedTasks())
+ expMissedFreeVersions.Value = float64(globalExpiryState.stats.MissedFreeVersTasks())
+ expMissedTierJournalTasks.Value = float64(globalExpiryState.stats.MissedTierJournalTasks())
+ expNumWorkers.Value = float64(globalExpiryState.stats.NumWorkers())
}
if globalTransitionState != nil {
trPendingTasks.Value = float64(globalTransitionState.PendingTasks())
@@ -2019,6 +2063,10 @@ func getILMNodeMetrics() *MetricsGroup {
}
return []Metric{
expPendingTasks,
+ expMissedTasks,
+ expMissedFreeVersions,
+ expMissedTierJournalTasks,
+ expNumWorkers,
trPendingTasks,
trActiveTasks,
trMissedTasks,
diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go
index 1167659d5..127cc75dd 100644
--- a/cmd/object-api-interface.go
+++ b/cmd/object-api-interface.go
@@ -117,7 +117,13 @@ type ObjectOptions struct {
// Object must have been read at this point.
IndexCB func() []byte
+ // InclFreeVersions indicates that free versions need to be included
+ // when looking up a version by fi.VersionID
InclFreeVersions bool
+ // SkipFreeVersion skips adding a free version when a tiered version is
+ // being 'replaced'
+ // Note: Used only when a tiered object is being expired.
+ SkipFreeVersion bool
MetadataChg bool // is true if it is a metadata update operation.
EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject.
diff --git a/cmd/server-main.go b/cmd/server-main.go
index d16c8a80c..feaf084c9 100644
--- a/cmd/server-main.go
+++ b/cmd/server-main.go
@@ -427,7 +427,6 @@ func initAllSubsystems(ctx context.Context) {
// Create new ILM tier configuration subsystem
globalTierConfigMgr = NewTierConfigMgr()
- globalTierJournal = NewTierJournal()
globalTransitionState = newTransitionState(GlobalContext)
globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext)
@@ -911,6 +910,11 @@ func serverMain(ctx *cli.Context) {
initBackgroundReplication(GlobalContext, newObject)
})
+ // Initialize background ILM worker poool
+ bootstrapTrace("initBackgroundExpiry", func() {
+ initBackgroundExpiry(GlobalContext, newObject)
+ })
+
bootstrapTrace("globalTransitionState.Init", func() {
globalTransitionState.Init(newObject)
})
@@ -930,8 +934,6 @@ func serverMain(ctx *cli.Context) {
bootstrapTrace("globalTierConfigMgr.Init", func() {
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
logger.LogIf(GlobalContext, err)
- } else {
- logger.FatalIf(globalTierJournal.Init(GlobalContext), "Unable to initialize remote tier pending deletes journal")
}
})
}()
diff --git a/cmd/tier-journal.go b/cmd/tier-journal.go
deleted file mode 100644
index dfbad4e26..000000000
--- a/cmd/tier-journal.go
+++ /dev/null
@@ -1,295 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "context"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "sync"
- "time"
-
- "github.com/minio/minio/internal/logger"
-)
-
-//go:generate msgp -file $GOFILE -unexported
-//msgp:ignore TierJournal tierDiskJournal walkfn
-
-type tierDiskJournal struct {
- sync.RWMutex
- diskPath string
- file *os.File // active journal file
-}
-
-// TierJournal holds an in-memory and an on-disk delete journal of tiered content.
-type TierJournal struct {
- *tierDiskJournal // for processing legacy journal entries
- *tierMemJournal // for processing new journal entries
-}
-
-type jentry struct {
- ObjName string `msg:"obj"`
- VersionID string `msg:"vid"`
- TierName string `msg:"tier"`
-}
-
-const (
- tierJournalVersion = 1
- tierJournalHdrLen = 2 // 2 bytes
-)
-
-var errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version")
-
-func newTierDiskJournal() *tierDiskJournal {
- return &tierDiskJournal{}
-}
-
-// NewTierJournal initializes tier deletion journal
-func NewTierJournal() *TierJournal {
- j := &TierJournal{
- tierMemJournal: newTierMemJournal(1000),
- tierDiskJournal: newTierDiskJournal(),
- }
- return j
-}
-
-// Init initializes an in-memory journal built using a
-// buffered channel for new journal entries. It also initializes the on-disk
-// journal only to process existing journal entries made from previous versions.
-func (t *TierJournal) Init(ctx context.Context) error {
- for _, diskPath := range globalEndpoints.LocalDisksPaths() {
- t.diskPath = diskPath
-
- go t.deletePending(ctx) // for existing journal entries from previous MinIO versions
- go t.processEntries(ctx) // for newer journal entries circa free-versions
- return nil
- }
-
- return errors.New("no local drive found")
-}
-
-// rotate rotates the journal. If a read-only journal already exists it does
-// nothing. Otherwise renames the active journal to a read-only journal and
-// opens a new active journal.
-func (jd *tierDiskJournal) rotate() error {
- // Do nothing if a read-only journal file already exists.
- if _, err := os.Stat(jd.ReadOnlyPath()); err == nil {
- return nil
- }
- // Close the active journal if present and delete it.
- return jd.Close()
-}
-
-type walkFn func(ctx context.Context, objName, rvID, tierName string) error
-
-func (jd *tierDiskJournal) ReadOnlyPath() string {
- return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin")
-}
-
-func (jd *tierDiskJournal) JournalPath() string {
- return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.bin")
-}
-
-func (jd *tierDiskJournal) WalkEntries(ctx context.Context, fn walkFn) {
- if err := jd.rotate(); err != nil {
- logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to rotate pending deletes journal %s", err))
- return
- }
-
- ro, err := jd.OpenRO()
- switch {
- case errors.Is(err, os.ErrNotExist):
- return // No read-only journal to process; nothing to do.
- case err != nil:
- logger.LogIf(ctx, fmt.Errorf("tier-journal: failed open read-only journal for processing %s", err))
- return
- }
- defer ro.Close()
- mr := msgpNewReader(ro)
- defer readMsgpReaderPoolPut(mr)
-
- done := false
- for {
- var entry jentry
- err := entry.DecodeMsg(mr)
- if errors.Is(err, io.EOF) {
- done = true
- break
- }
- if err != nil {
- logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to decode journal entry %s", err))
- break
- }
- err = fn(ctx, entry.ObjName, entry.VersionID, entry.TierName)
- if err != nil && !isErrObjectNotFound(err) {
- logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err))
- // We add the entry into the active journal to try again
- // later.
- jd.addEntry(entry)
- }
- }
- if done {
- os.Remove(jd.ReadOnlyPath())
- }
-}
-
-func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error {
- w, err := globalTierConfigMgr.getDriver(tierName)
- if err != nil {
- return err
- }
- err = w.Remove(ctx, objName, remoteVersionID(rvID))
- if err != nil {
- return err
- }
- return nil
-}
-
-func (jd *tierDiskJournal) deletePending(ctx context.Context) {
- ticker := time.NewTicker(30 * time.Minute)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- jd.WalkEntries(ctx, deleteObjectFromRemoteTier)
-
- case <-ctx.Done():
- jd.Close()
- return
- }
- }
-}
-
-func (jd *tierDiskJournal) addEntry(je jentry) error {
- // Open journal if it hasn't been
- err := jd.Open()
- if err != nil {
- return err
- }
-
- b, err := je.MarshalMsg(nil)
- if err != nil {
- return err
- }
-
- jd.Lock()
- defer jd.Unlock()
- _, err = jd.file.Write(b)
- if err != nil {
- // Do not leak fd here, close the file properly.
- Fdatasync(jd.file)
- _ = jd.file.Close()
-
- jd.file = nil // reset to allow subsequent reopen when file/disk is available.
- }
- return err
-}
-
-// Close closes the active journal and renames it to read-only for pending
-// deletes processing. Note: calling Close on a closed journal is a no-op.
-func (jd *tierDiskJournal) Close() error {
- jd.Lock()
- defer jd.Unlock()
- if jd.file == nil { // already closed
- return nil
- }
-
- var (
- f *os.File
- fi os.FileInfo
- err error
- )
- // Setting j.file to nil
- f, jd.file = jd.file, f
- if fi, err = f.Stat(); err != nil {
- return err
- }
- f.Close() // close before rename()
-
- // Skip renaming active journal if empty.
- if fi.Size() == tierJournalHdrLen {
- return os.Remove(jd.JournalPath())
- }
-
- jPath := jd.JournalPath()
- jroPath := jd.ReadOnlyPath()
- // Rotate active journal to perform pending deletes.
- return os.Rename(jPath, jroPath)
-}
-
-// Open opens a new active journal. Note: calling Open on an opened journal is a
-// no-op.
-func (jd *tierDiskJournal) Open() error {
- jd.Lock()
- defer jd.Unlock()
- if jd.file != nil { // already open
- return nil
- }
-
- var err error
- jd.file, err = OpenFile(jd.JournalPath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY|writeMode, 0o666)
- if err != nil {
- return err
- }
-
- // write journal version header if active journal is empty
- fi, err := jd.file.Stat()
- if err != nil {
- return err
- }
- if fi.Size() == 0 {
- var data [tierJournalHdrLen]byte
- binary.LittleEndian.PutUint16(data[:], tierJournalVersion)
- _, err = jd.file.Write(data[:])
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-func (jd *tierDiskJournal) OpenRO() (io.ReadCloser, error) {
- file, err := Open(jd.ReadOnlyPath())
- if err != nil {
- return nil, err
- }
-
- // read journal version header
- var data [tierJournalHdrLen]byte
- if _, err := io.ReadFull(file, data[:]); err != nil {
- return nil, err
- }
-
- switch binary.LittleEndian.Uint16(data[:]) {
- case tierJournalVersion:
- return file, nil
- default:
- return nil, errUnsupportedJournalVersion
- }
-}
-
-// jentryV1 represents the entry in the journal before RemoteVersionID was
-// added. It remains here for use in tests for the struct element addition.
-type jentryV1 struct {
- ObjName string `msg:"obj"`
- TierName string `msg:"tier"`
-}
diff --git a/cmd/tier-journal_gen.go b/cmd/tier-journal_gen.go
deleted file mode 100644
index f62c3af13..000000000
--- a/cmd/tier-journal_gen.go
+++ /dev/null
@@ -1,288 +0,0 @@
-package cmd
-
-// Code generated by github.com/tinylib/msgp DO NOT EDIT.
-
-import (
- "github.com/tinylib/msgp/msgp"
-)
-
-// DecodeMsg implements msgp.Decodable
-func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) {
- var field []byte
- _ = field
- var zb0001 uint32
- zb0001, err = dc.ReadMapHeader()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- for zb0001 > 0 {
- zb0001--
- field, err = dc.ReadMapKeyPtr()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- switch msgp.UnsafeString(field) {
- case "obj":
- z.ObjName, err = dc.ReadString()
- if err != nil {
- err = msgp.WrapError(err, "ObjName")
- return
- }
- case "vid":
- z.VersionID, err = dc.ReadString()
- if err != nil {
- err = msgp.WrapError(err, "VersionID")
- return
- }
- case "tier":
- z.TierName, err = dc.ReadString()
- if err != nil {
- err = msgp.WrapError(err, "TierName")
- return
- }
- default:
- err = dc.Skip()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- }
- }
- return
-}
-
-// EncodeMsg implements msgp.Encodable
-func (z jentry) EncodeMsg(en *msgp.Writer) (err error) {
- // map header, size 3
- // write "obj"
- err = en.Append(0x83, 0xa3, 0x6f, 0x62, 0x6a)
- if err != nil {
- return
- }
- err = en.WriteString(z.ObjName)
- if err != nil {
- err = msgp.WrapError(err, "ObjName")
- return
- }
- // write "vid"
- err = en.Append(0xa3, 0x76, 0x69, 0x64)
- if err != nil {
- return
- }
- err = en.WriteString(z.VersionID)
- if err != nil {
- err = msgp.WrapError(err, "VersionID")
- return
- }
- // write "tier"
- err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72)
- if err != nil {
- return
- }
- err = en.WriteString(z.TierName)
- if err != nil {
- err = msgp.WrapError(err, "TierName")
- return
- }
- return
-}
-
-// MarshalMsg implements msgp.Marshaler
-func (z jentry) MarshalMsg(b []byte) (o []byte, err error) {
- o = msgp.Require(b, z.Msgsize())
- // map header, size 3
- // string "obj"
- o = append(o, 0x83, 0xa3, 0x6f, 0x62, 0x6a)
- o = msgp.AppendString(o, z.ObjName)
- // string "vid"
- o = append(o, 0xa3, 0x76, 0x69, 0x64)
- o = msgp.AppendString(o, z.VersionID)
- // string "tier"
- o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72)
- o = msgp.AppendString(o, z.TierName)
- return
-}
-
-// UnmarshalMsg implements msgp.Unmarshaler
-func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) {
- var field []byte
- _ = field
- var zb0001 uint32
- zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- for zb0001 > 0 {
- zb0001--
- field, bts, err = msgp.ReadMapKeyZC(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- switch msgp.UnsafeString(field) {
- case "obj":
- z.ObjName, bts, err = msgp.ReadStringBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "ObjName")
- return
- }
- case "vid":
- z.VersionID, bts, err = msgp.ReadStringBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "VersionID")
- return
- }
- case "tier":
- z.TierName, bts, err = msgp.ReadStringBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "TierName")
- return
- }
- default:
- bts, err = msgp.Skip(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- }
- }
- o = bts
- return
-}
-
-// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
-func (z jentry) Msgsize() (s int) {
- s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 4 + msgp.StringPrefixSize + len(z.VersionID) + 5 + msgp.StringPrefixSize + len(z.TierName)
- return
-}
-
-// DecodeMsg implements msgp.Decodable
-func (z *jentryV1) DecodeMsg(dc *msgp.Reader) (err error) {
- var field []byte
- _ = field
- var zb0001 uint32
- zb0001, err = dc.ReadMapHeader()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- for zb0001 > 0 {
- zb0001--
- field, err = dc.ReadMapKeyPtr()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- switch msgp.UnsafeString(field) {
- case "obj":
- z.ObjName, err = dc.ReadString()
- if err != nil {
- err = msgp.WrapError(err, "ObjName")
- return
- }
- case "tier":
- z.TierName, err = dc.ReadString()
- if err != nil {
- err = msgp.WrapError(err, "TierName")
- return
- }
- default:
- err = dc.Skip()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- }
- }
- return
-}
-
-// EncodeMsg implements msgp.Encodable
-func (z jentryV1) EncodeMsg(en *msgp.Writer) (err error) {
- // map header, size 2
- // write "obj"
- err = en.Append(0x82, 0xa3, 0x6f, 0x62, 0x6a)
- if err != nil {
- return
- }
- err = en.WriteString(z.ObjName)
- if err != nil {
- err = msgp.WrapError(err, "ObjName")
- return
- }
- // write "tier"
- err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72)
- if err != nil {
- return
- }
- err = en.WriteString(z.TierName)
- if err != nil {
- err = msgp.WrapError(err, "TierName")
- return
- }
- return
-}
-
-// MarshalMsg implements msgp.Marshaler
-func (z jentryV1) MarshalMsg(b []byte) (o []byte, err error) {
- o = msgp.Require(b, z.Msgsize())
- // map header, size 2
- // string "obj"
- o = append(o, 0x82, 0xa3, 0x6f, 0x62, 0x6a)
- o = msgp.AppendString(o, z.ObjName)
- // string "tier"
- o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72)
- o = msgp.AppendString(o, z.TierName)
- return
-}
-
-// UnmarshalMsg implements msgp.Unmarshaler
-func (z *jentryV1) UnmarshalMsg(bts []byte) (o []byte, err error) {
- var field []byte
- _ = field
- var zb0001 uint32
- zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- for zb0001 > 0 {
- zb0001--
- field, bts, err = msgp.ReadMapKeyZC(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- switch msgp.UnsafeString(field) {
- case "obj":
- z.ObjName, bts, err = msgp.ReadStringBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "ObjName")
- return
- }
- case "tier":
- z.TierName, bts, err = msgp.ReadStringBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "TierName")
- return
- }
- default:
- bts, err = msgp.Skip(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- }
- }
- o = bts
- return
-}
-
-// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
-func (z jentryV1) Msgsize() (s int) {
- s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 5 + msgp.StringPrefixSize + len(z.TierName)
- return
-}
diff --git a/cmd/tier-journal_gen_test.go b/cmd/tier-journal_gen_test.go
deleted file mode 100644
index 5cff069a5..000000000
--- a/cmd/tier-journal_gen_test.go
+++ /dev/null
@@ -1,236 +0,0 @@
-package cmd
-
-// Code generated by github.com/tinylib/msgp DO NOT EDIT.
-
-import (
- "bytes"
- "testing"
-
- "github.com/tinylib/msgp/msgp"
-)
-
-func TestMarshalUnmarshaljentry(t *testing.T) {
- v := jentry{}
- bts, err := v.MarshalMsg(nil)
- if err != nil {
- t.Fatal(err)
- }
- left, err := v.UnmarshalMsg(bts)
- if err != nil {
- t.Fatal(err)
- }
- if len(left) > 0 {
- t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
- }
-
- left, err = msgp.Skip(bts)
- if err != nil {
- t.Fatal(err)
- }
- if len(left) > 0 {
- t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
- }
-}
-
-func BenchmarkMarshalMsgjentry(b *testing.B) {
- v := jentry{}
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- v.MarshalMsg(nil)
- }
-}
-
-func BenchmarkAppendMsgjentry(b *testing.B) {
- v := jentry{}
- bts := make([]byte, 0, v.Msgsize())
- bts, _ = v.MarshalMsg(bts[0:0])
- b.SetBytes(int64(len(bts)))
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- bts, _ = v.MarshalMsg(bts[0:0])
- }
-}
-
-func BenchmarkUnmarshaljentry(b *testing.B) {
- v := jentry{}
- bts, _ := v.MarshalMsg(nil)
- b.ReportAllocs()
- b.SetBytes(int64(len(bts)))
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- _, err := v.UnmarshalMsg(bts)
- if err != nil {
- b.Fatal(err)
- }
- }
-}
-
-func TestEncodeDecodejentry(t *testing.T) {
- v := jentry{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
-
- m := v.Msgsize()
- if buf.Len() > m {
- t.Log("WARNING: TestEncodeDecodejentry Msgsize() is inaccurate")
- }
-
- vn := jentry{}
- err := msgp.Decode(&buf, &vn)
- if err != nil {
- t.Error(err)
- }
-
- buf.Reset()
- msgp.Encode(&buf, &v)
- err = msgp.NewReader(&buf).Skip()
- if err != nil {
- t.Error(err)
- }
-}
-
-func BenchmarkEncodejentry(b *testing.B) {
- v := jentry{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
- b.SetBytes(int64(buf.Len()))
- en := msgp.NewWriter(msgp.Nowhere)
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- v.EncodeMsg(en)
- }
- en.Flush()
-}
-
-func BenchmarkDecodejentry(b *testing.B) {
- v := jentry{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
- b.SetBytes(int64(buf.Len()))
- rd := msgp.NewEndlessReader(buf.Bytes(), b)
- dc := msgp.NewReader(rd)
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- err := v.DecodeMsg(dc)
- if err != nil {
- b.Fatal(err)
- }
- }
-}
-
-func TestMarshalUnmarshaljentryV1(t *testing.T) {
- v := jentryV1{}
- bts, err := v.MarshalMsg(nil)
- if err != nil {
- t.Fatal(err)
- }
- left, err := v.UnmarshalMsg(bts)
- if err != nil {
- t.Fatal(err)
- }
- if len(left) > 0 {
- t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
- }
-
- left, err = msgp.Skip(bts)
- if err != nil {
- t.Fatal(err)
- }
- if len(left) > 0 {
- t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
- }
-}
-
-func BenchmarkMarshalMsgjentryV1(b *testing.B) {
- v := jentryV1{}
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- v.MarshalMsg(nil)
- }
-}
-
-func BenchmarkAppendMsgjentryV1(b *testing.B) {
- v := jentryV1{}
- bts := make([]byte, 0, v.Msgsize())
- bts, _ = v.MarshalMsg(bts[0:0])
- b.SetBytes(int64(len(bts)))
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- bts, _ = v.MarshalMsg(bts[0:0])
- }
-}
-
-func BenchmarkUnmarshaljentryV1(b *testing.B) {
- v := jentryV1{}
- bts, _ := v.MarshalMsg(nil)
- b.ReportAllocs()
- b.SetBytes(int64(len(bts)))
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- _, err := v.UnmarshalMsg(bts)
- if err != nil {
- b.Fatal(err)
- }
- }
-}
-
-func TestEncodeDecodejentryV1(t *testing.T) {
- v := jentryV1{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
-
- m := v.Msgsize()
- if buf.Len() > m {
- t.Log("WARNING: TestEncodeDecodejentryV1 Msgsize() is inaccurate")
- }
-
- vn := jentryV1{}
- err := msgp.Decode(&buf, &vn)
- if err != nil {
- t.Error(err)
- }
-
- buf.Reset()
- msgp.Encode(&buf, &v)
- err = msgp.NewReader(&buf).Skip()
- if err != nil {
- t.Error(err)
- }
-}
-
-func BenchmarkEncodejentryV1(b *testing.B) {
- v := jentryV1{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
- b.SetBytes(int64(buf.Len()))
- en := msgp.NewWriter(msgp.Nowhere)
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- v.EncodeMsg(en)
- }
- en.Flush()
-}
-
-func BenchmarkDecodejentryV1(b *testing.B) {
- v := jentryV1{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
- b.SetBytes(int64(buf.Len()))
- rd := msgp.NewEndlessReader(buf.Bytes(), b)
- dc := msgp.NewReader(rd)
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- err := v.DecodeMsg(dc)
- if err != nil {
- b.Fatal(err)
- }
- }
-}
diff --git a/cmd/tier-journal_test.go b/cmd/tier-journal_test.go
deleted file mode 100644
index 5c6dd0c75..000000000
--- a/cmd/tier-journal_test.go
+++ /dev/null
@@ -1,121 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "bytes"
- "testing"
-
- "github.com/tinylib/msgp/msgp"
-)
-
-// TestJEntryReadOldToNew1 - tests that adding the RemoteVersionID parameter to the
-// jentry struct does not cause unexpected errors when reading the serialized
-// old version into new version.
-func TestJEntryReadOldToNew1(t *testing.T) {
- readOldToNewCases := []struct {
- je jentryV1
- exp jentry
- }{
- {jentryV1{"obj1", "tier1"}, jentry{"obj1", "", "tier1"}},
- {jentryV1{"obj1", ""}, jentry{"obj1", "", ""}},
- {jentryV1{"", "tier1"}, jentry{"", "", "tier1"}},
- {jentryV1{"", ""}, jentry{"", "", ""}},
- }
-
- var b bytes.Buffer
- for _, item := range readOldToNewCases {
- bs, err := item.je.MarshalMsg(nil)
- if err != nil {
- t.Fatal(err)
- }
- b.Write(bs)
- }
-
- mr := msgp.NewReader(&b)
- for i, item := range readOldToNewCases {
- var je jentry
- err := je.DecodeMsg(mr)
- if err != nil {
- t.Fatal(err)
- }
- if je != item.exp {
- t.Errorf("Case %d: Expected: %v Got: %v", i, item.exp, je)
- }
- }
-}
-
-// TestJEntryWriteNewToOldMix1 - tests that adding the RemoteVersionID parameter
-// to the jentry struct does not cause unexpected errors when writing. This
-// simulates the case when the active journal has entries in the older version
-// struct and due to errors new entries are added in the new version of the
-// struct.
-func TestJEntryWriteNewToOldMix1(t *testing.T) {
- oldStructVals := []jentryV1{
- {"obj1", "tier1"},
- {"obj2", "tier2"},
- {"obj3", "tier3"},
- }
- newStructVals := []jentry{
- {"obj4", "", "tier1"},
- {"obj5", "ver2", "tier2"},
- {"obj6", "", "tier3"},
- }
-
- // Write old struct version values followed by new version values.
- var b bytes.Buffer
- for _, item := range oldStructVals {
- bs, err := item.MarshalMsg(nil)
- if err != nil {
- t.Fatal(err)
- }
- b.Write(bs)
- }
- for _, item := range newStructVals {
- bs, err := item.MarshalMsg(nil)
- if err != nil {
- t.Fatal(err)
- }
- b.Write(bs)
- }
-
- // Read into new struct version and check.
- mr := msgp.NewReader(&b)
- for i := 0; i < len(oldStructVals)+len(newStructVals); i++ {
- var je jentry
- err := je.DecodeMsg(mr)
- if err != nil {
- t.Fatal(err)
- }
- var expectedJe jentry
- if i < len(oldStructVals) {
- // For old struct values, the RemoteVersionID will be
- // empty
- expectedJe = jentry{
- ObjName: oldStructVals[i].ObjName,
- VersionID: "",
- TierName: oldStructVals[i].TierName,
- }
- } else {
- expectedJe = newStructVals[i-len(oldStructVals)]
- }
- if expectedJe != je {
- t.Errorf("Case %d: Expected: %v, Got: %v", i, expectedJe, je)
- }
- }
-}
diff --git a/cmd/tier-mem-journal.go b/cmd/tier-mem-journal.go
deleted file mode 100644
index 34fbb8ae4..000000000
--- a/cmd/tier-mem-journal.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "context"
- "fmt"
-
- "github.com/minio/minio/internal/logger"
-)
-
-type tierMemJournal struct {
- entries chan jentry
-}
-
-func newTierMemJournal(nevents int) *tierMemJournal {
- return &tierMemJournal{
- entries: make(chan jentry, nevents),
- }
-}
-
-func (j *tierMemJournal) processEntries(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- case entry := <-j.entries:
- logger.LogIf(ctx, deleteObjectFromRemoteTier(ctx, entry.ObjName, entry.VersionID, entry.TierName))
- }
- }
-}
-
-func (j *tierMemJournal) AddEntry(je jentry) error {
- select {
- case j.entries <- je:
- default:
- return fmt.Errorf("failed to remove tiered content at %s with version %s from tier %s, will be retried later.",
- je.ObjName, je.VersionID, je.TierName)
- }
- return nil
-}
diff --git a/cmd/tier-sweeper.go b/cmd/tier-sweeper.go
index f70dce234..f48c99718 100644
--- a/cmd/tier-sweeper.go
+++ b/cmd/tier-sweeper.go
@@ -18,6 +18,8 @@
package cmd
import (
+ "context"
+
"github.com/minio/minio/internal/bucket/lifecycle"
)
@@ -128,9 +130,26 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) {
}
// Sweep removes the transitioned object if it's no longer referred to.
-func (os *objSweeper) Sweep() error {
+func (os *objSweeper) Sweep() {
if je, ok := os.shouldRemoveRemoteObject(); ok {
- return globalTierJournal.AddEntry(je)
+ globalExpiryState.enqueueTierJournalEntry(je)
+ }
+}
+
+type jentry struct {
+ ObjName string
+ VersionID string
+ TierName string
+}
+
+func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error {
+ w, err := globalTierConfigMgr.getDriver(tierName)
+ if err != nil {
+ return err
+ }
+ err = w.Remove(ctx, objName, remoteVersionID(rvID))
+ if err != nil {
+ return err
}
return nil
}
diff --git a/cmd/xl-storage-free-version.go b/cmd/xl-storage-free-version.go
index 344b0966a..abf2b5af8 100644
--- a/cmd/xl-storage-free-version.go
+++ b/cmd/xl-storage-free-version.go
@@ -30,6 +30,9 @@ const freeVersion = "free-version"
// InitFreeVersion creates a free-version to track the tiered-content of j. If j has
// no tiered content, it returns false.
func (j xlMetaV2Object) InitFreeVersion(fi FileInfo) (xlMetaV2Version, bool) {
+ if fi.SkipTierFreeVersion() {
+ return xlMetaV2Version{}, false
+ }
if status, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok && bytes.Equal(status, []byte(lifecycle.TransitionComplete)) {
vID, err := uuid.Parse(fi.TierFreeVersionID())
if err != nil {
diff --git a/cmd/xl-storage-free-version_test.go b/cmd/xl-storage-free-version_test.go
index f8e4df941..b7205160f 100644
--- a/cmd/xl-storage-free-version_test.go
+++ b/cmd/xl-storage-free-version_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"time"
+ "github.com/google/uuid"
"github.com/minio/minio/internal/bucket/lifecycle"
)
@@ -84,9 +85,7 @@ func TestFreeVersion(t *testing.T) {
Hash: nil,
}},
},
- MarkDeleted: false,
- // DeleteMarkerReplicationStatus: "",
- // VersionPurgeStatus: "",
+ MarkDeleted: false,
NumVersions: 1,
SuccessorModTime: time.Time{},
}
@@ -228,3 +227,55 @@ func TestFreeVersion(t *testing.T) {
t.Fatalf("Expected zero free version but got %d", len(freeVersions))
}
}
+
+func TestSkipFreeVersion(t *testing.T) {
+ fi := FileInfo{
+ Volume: "volume",
+ Name: "object-name",
+ VersionID: "00000000-0000-0000-0000-000000000001",
+ IsLatest: true,
+ Deleted: false,
+ TransitionStatus: "",
+ DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef",
+ XLV1: false,
+ ModTime: time.Now(),
+ Size: 0,
+ Mode: 0,
+ Metadata: nil,
+ Parts: nil,
+ Erasure: ErasureInfo{
+ Algorithm: ReedSolomon.String(),
+ DataBlocks: 4,
+ ParityBlocks: 2,
+ BlockSize: 10000,
+ Index: 1,
+ Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8},
+ Checksums: []ChecksumInfo{{
+ PartNumber: 1,
+ Algorithm: HighwayHash256S,
+ Hash: nil,
+ }},
+ },
+ MarkDeleted: false,
+ // DeleteMarkerReplicationStatus: "",
+ // VersionPurgeStatus: "",
+ NumVersions: 1,
+ SuccessorModTime: time.Time{},
+ }
+ fi.SetTierFreeVersionID(uuid.New().String())
+ // Test if free version is created when SkipTier wasn't set on fi
+ j := xlMetaV2Object{}
+ j.MetaSys = make(map[string][]byte)
+ j.MetaSys[metaTierName] = []byte("WARM-1")
+ j.MetaSys[metaTierStatus] = []byte(lifecycle.TransitionComplete)
+ j.MetaSys[metaTierObjName] = []byte("obj-1")
+ if _, ok := j.InitFreeVersion(fi); !ok {
+ t.Fatal("Expected a free version to be created")
+ }
+
+ // Test if we skip creating a free version if SkipTier was set on fi
+ fi.SetSkipTierFreeVersion()
+ if _, ok := j.InitFreeVersion(fi); ok {
+ t.Fatal("Expected no free version to be created")
+ }
+}
diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go
index 8883a6c04..c113c8fe0 100644
--- a/cmd/xl-storage.go
+++ b/cmd/xl-storage.go
@@ -632,7 +632,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
for _, freeVersion := range fivs.FreeVersions {
oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned)
done = globalScannerMetrics.time(scannerMetricTierObjSweep)
- item.applyTierObjSweep(ctx, objAPI, oi)
+ globalExpiryState.enqueueFreeVersion(oi)
done()
}
diff --git a/internal/config/api/api.go b/internal/config/api/api.go
index dd8508416..d2f4ef443 100644
--- a/internal/config/api/api.go
+++ b/internal/config/api/api.go
@@ -41,6 +41,7 @@ const (
apiReplicationMaxWorkers = "replication_max_workers"
apiTransitionWorkers = "transition_workers"
+ apiExpiryWorkers = "expiry_workers"
apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval"
apiStaleUploadsExpiry = "stale_uploads_expiry"
apiDeleteCleanupInterval = "delete_cleanup_interval"
@@ -56,6 +57,7 @@ const (
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS"
+ EnvAPIExpiryWorkers = "MINIO_API_EXPIRY_WORKERS"
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn
EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY"
@@ -117,6 +119,10 @@ var (
Key: apiTransitionWorkers,
Value: "100",
},
+ config.KV{
+ Key: apiExpiryWorkers,
+ Value: "100",
+ },
config.KV{
Key: apiStaleUploadsCleanupInterval,
Value: "6h",
@@ -164,6 +170,7 @@ type Config struct {
ReplicationPriority string `json:"replication_priority"`
ReplicationMaxWorkers int `json:"replication_max_workers"`
TransitionWorkers int `json:"transition_workers"`
+ ExpiryWorkers int `json:"expiry_workers"`
StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"`
StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"`
DeleteCleanupInterval time.Duration `json:"delete_cleanup_interval"`
@@ -281,6 +288,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
}
cfg.TransitionWorkers = transitionWorkers
+ expiryWorkers, err := strconv.Atoi(env.Get(EnvAPIExpiryWorkers, kvs.GetWithDefault(apiExpiryWorkers, DefaultKVS)))
+ if err != nil {
+ return cfg, err
+ }
+ if expiryWorkers <= 0 || expiryWorkers > 500 {
+ return cfg, config.ErrInvalidExpiryWorkersValue(nil).Msg("Number of expiry workers should be between 1 and 500")
+ }
+ cfg.ExpiryWorkers = expiryWorkers
+
v := env.Get(EnvAPIDeleteCleanupInterval, kvs.Get(apiDeleteCleanupInterval))
if v == "" {
v = env.Get(EnvDeleteCleanupInterval, kvs.GetWithDefault(apiDeleteCleanupInterval, DefaultKVS))
diff --git a/internal/config/api/help.go b/internal/config/api/help.go
index 98a5f18dc..c359c2770 100644
--- a/internal/config/api/help.go
+++ b/internal/config/api/help.go
@@ -19,12 +19,12 @@ package api
import "github.com/minio/minio/internal/config"
-// Help template for storageclass feature.
var (
defaultHelpPostfix = func(key string) string {
return config.DefaultHelpPostfix(DefaultKVS, key)
}
+ // Help holds configuration keys and their default values for api subsystem.
Help = config.HelpKVS{
config.HelpKV{
Key: apiRequestsMax,
@@ -80,6 +80,12 @@ var (
Optional: true,
Type: "number",
},
+ config.HelpKV{
+ Key: apiExpiryWorkers,
+ Description: `set the number of expiry workers` + defaultHelpPostfix(apiExpiryWorkers),
+ Optional: true,
+ Type: "number",
+ },
config.HelpKV{
Key: apiStaleUploadsExpiry,
Description: `set to expire stale multipart uploads older than this values` + defaultHelpPostfix(apiStaleUploadsExpiry),
diff --git a/internal/config/config.go b/internal/config/config.go
index 21733964c..fd8568544 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -120,6 +120,7 @@ const (
DriveSubSys = madmin.DriveSubSys
BatchSubSys = madmin.BatchSubSys
BrowserSubSys = madmin.BrowserSubSys
+ ILMSubSys = madmin.ILMSubsys
// Add new constants here (similar to above) if you add new fields to config.
)
@@ -188,6 +189,7 @@ var SubSystemsDynamic = set.CreateStringSet(
AuditKafkaSubSys,
StorageClassSubSys,
CacheSubSys,
+ ILMSubSys,
BatchSubSys,
BrowserSubSys,
)
@@ -211,6 +213,7 @@ var SubSystemsSingleTargets = set.CreateStringSet(
SubnetSubSys,
CallhomeSubSys,
DriveSubSys,
+ ILMSubSys,
BatchSubSys,
BrowserSubSys,
)
diff --git a/internal/config/errors.go b/internal/config/errors.go
index 751152081..24a2d0042 100644
--- a/internal/config/errors.go
+++ b/internal/config/errors.go
@@ -224,6 +224,11 @@ Examples:
"",
"MINIO_API_TRANSITION_WORKERS: should be >= GOMAXPROCS/2",
)
+ ErrInvalidExpiryWorkersValue = newErrFn(
+ "Invalid value for expiry workers",
+ "",
+ "MINIO_API_EXPIRY_WORKERS: should be between 1 and 500",
+ )
ErrInvalidBatchKeyRotationWorkersWait = newErrFn(
"Invalid value for batch key rotation workers wait",
"Please input a non-negative duration",
diff --git a/internal/config/ilm/help.go b/internal/config/ilm/help.go
new file mode 100644
index 000000000..d0037d8dc
--- /dev/null
+++ b/internal/config/ilm/help.go
@@ -0,0 +1,52 @@
+// Copyright (c) 2015-2024 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ilm
+
+import "github.com/minio/minio/internal/config"
+
+const (
+ transitionWorkers = "transition_workers"
+ expirationWorkers = "expiration_workers"
+ // EnvILMTransitionWorkers env variable to configure number of transition workers
+ EnvILMTransitionWorkers = "MINIO_ILM_TRANSITION_WORKERS"
+ // EnvILMExpirationWorkers env variable to configure number of expiration workers
+ EnvILMExpirationWorkers = "MINIO_ILM_EXPIRATION_WORKERS"
+)
+
+var (
+ defaultHelpPostfix = func(key string) string {
+ return config.DefaultHelpPostfix(DefaultKVS, key)
+ }
+
+ // HelpILM holds configuration keys and their default values for the ILM
+ // subsystem
+ HelpILM = config.HelpKVS{
+ config.HelpKV{
+ Key: transitionWorkers,
+ Type: "number",
+ Description: `set the number of transition workers` + defaultHelpPostfix(transitionWorkers),
+ Optional: true,
+ },
+ config.HelpKV{
+ Key: expirationWorkers,
+ Type: "number",
+ Description: `set the number of expiration workers` + defaultHelpPostfix(expirationWorkers),
+ Optional: true,
+ },
+ }
+)
diff --git a/internal/config/ilm/ilm.go b/internal/config/ilm/ilm.go
new file mode 100644
index 000000000..b677647d5
--- /dev/null
+++ b/internal/config/ilm/ilm.go
@@ -0,0 +1,60 @@
+// Copyright (c) 2015-2024 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ilm
+
+import (
+ "strconv"
+
+ "github.com/minio/minio/internal/config"
+ "github.com/minio/pkg/v2/env"
+)
+
+// DefaultKVS default configuration values for ILM subsystem
+var DefaultKVS = config.KVS{
+ config.KV{
+ Key: transitionWorkers,
+ Value: "100",
+ },
+ config.KV{
+ Key: expirationWorkers,
+ Value: "100",
+ },
+}
+
+// Config represents the different configuration values for ILM subsystem
+type Config struct {
+ TransitionWorkers int
+ ExpirationWorkers int
+}
+
+// LookupConfig - lookup ilm config and override with valid environment settings if any.
+func LookupConfig(kvs config.KVS) (cfg Config, err error) {
+ tw, err := strconv.Atoi(env.Get(EnvILMTransitionWorkers, kvs.GetWithDefault(transitionWorkers, DefaultKVS)))
+ if err != nil {
+ return cfg, err
+ }
+
+ ew, err := strconv.Atoi(env.Get(EnvILMExpirationWorkers, kvs.GetWithDefault(expirationWorkers, DefaultKVS)))
+ if err != nil {
+ return cfg, err
+ }
+
+ cfg.TransitionWorkers = tw
+ cfg.ExpirationWorkers = ew
+ return cfg, nil
+}