From b92cdea5789aab0e71f98c7dbdca9d5c8d22a6fe Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 9 May 2023 16:37:31 -0700 Subject: [PATCH] fix: start using pkg/workers to spawn parallel workers (#17170) --- cmd/erasure-server-pool-decom.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 05fb8dbe0..c7975ee9e 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -27,7 +27,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/dustin/go-humanize" @@ -36,6 +35,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/pkg/console" "github.com/minio/pkg/env" + "github.com/minio/pkg/workers" ) // PoolDecommissionInfo currently decommissioning information @@ -708,14 +708,20 @@ func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi dec func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error { ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) - var wg sync.WaitGroup wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) workerSize, err := strconv.Atoi(wStr) if err != nil { return err } - parallelWorkers := make(chan struct{}, workerSize) + // each set get its own thread separate from the concurrent + // objects/versions being decommissioned. + workerSize += len(pool.sets) + + wk, err := workers.New(workerSize) + if err != nil { + return err + } for _, set := range pool.sets { set := set @@ -749,10 +755,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool } decommissionEntry := func(entry metaCacheEntry) { - defer func() { - <-parallelWorkers - wg.Done() - }() + defer wk.Give() if entry.isDir() { return @@ -902,21 +905,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool z.poolMetaMutex.Unlock() } - wg.Add(1) + wk.Take() go func() { - defer wg.Done() + defer wk.Give() err := set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) { - // Wait must be synchronized here. - wg.Add(1) - parallelWorkers <- struct{}{} + wk.Take() go decommissionEntry(entry) }, ) logger.LogIf(ctx, err) }() } - wg.Wait() + wk.Wait() return nil } @@ -1280,7 +1281,7 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, indices ...i z.HealBucket(ctx, bucket.Name, madmin.HealOpts{}) } - // Create .minio.sys/conifg, .minio.sys/buckets paths if missing, + // Create .minio.sys/config, .minio.sys/buckets paths if missing, // this code is present to avoid any missing meta buckets on other // pools. for _, metaBucket := range []string{