From 095fc0561d999415c79dcbc3574828e4d0224ae5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 16 Jan 2023 21:36:34 +0530 Subject: [PATCH] feat: allow decom of multiple pools (#16416) --- cmd/admin-handlers-pools.go | 48 +++++++++--- cmd/erasure-server-pool-decom.go | 121 +++++++++++++------------------ 2 files changed, 89 insertions(+), 80 deletions(-) diff --git a/cmd/admin-handlers-pools.go b/cmd/admin-handlers-pools.go index 56bd5ffa1..f7d325640 100644 --- a/cmd/admin-handlers-pools.go +++ b/cmd/admin-handlers-pools.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "net/http" + "strings" "github.com/gorilla/mux" "github.com/minio/minio/internal/logger" @@ -49,28 +50,53 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque return } - pools, ok := objectAPI.(*erasureServerPools) - if !ok { + z, ok := objectAPI.(*erasureServerPools) + if !ok || len(z.serverPools) == 1 { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) return } - if pools.IsRebalanceStarted() { - writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionRebalanceAlreadyRunning), r.URL) + if z.IsDecommissionRunning() { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionAlreadyRunning), r.URL) + return + } + + if z.IsRebalanceStarted() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceAlreadyStarted), r.URL) return } vars := mux.Vars(r) v := vars["pool"] - idx := globalEndpoints.GetPoolIdx(v) - if idx == -1 { - // We didn't find any matching pools, invalid input - writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) - return + pools := strings.Split(v, ",") + poolIndices := make([]int, 0, len(pools)) + + for _, pool := range pools { + idx := globalEndpoints.GetPoolIdx(pool) + if idx == -1 { + // We didn't find any matching pools, invalid input + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) + return + } + var pool *erasureSets + for pidx := range z.serverPools { + if pidx == idx { + pool = z.serverPools[idx] + break + } + } + if pool == nil { + // We didn't find any matching pools, invalid input + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) + return + } + + poolIndices = append(poolIndices, idx) } - if ep := globalEndpoints[idx].Endpoints[0]; !ep.IsLocal { + if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal { + ep := globalEndpoints[poolIndices[0]].Endpoints[0] for nodeIdx, proxyEp := range globalProxyEndpoints { if proxyEp.Endpoint.Host == ep.Host { if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) { @@ -80,7 +106,7 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque } } - if err := pools.Decommission(r.Context(), idx); err != nil { + if err := z.Decommission(r.Context(), poolIndices...); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 6ceeb0672..c776a54ae 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -126,7 +126,7 @@ type poolMeta struct { // A decommission resumable tells us if decommission is worth // resuming upon restart of a cluster. -func (p *poolMeta) returnResumablePools(n int) []PoolStatus { +func (p *poolMeta) returnResumablePools() []PoolStatus { var newPools []PoolStatus for _, pool := range p.Pools { if pool.Decommission == nil { @@ -139,11 +139,8 @@ func (p *poolMeta) returnResumablePools(n int) []PoolStatus { continue } // In all other situations we need to resume newPools = append(newPools, pool) - if n > 0 && len(newPools) == n { - return newPools - } } - return nil + return newPools } func (p *poolMeta) DecommissionComplete(idx int) bool { @@ -251,18 +248,6 @@ var ( ) func (p *poolMeta) Decommission(idx int, pi poolSpaceInfo) error { - for i, pool := range p.Pools { - if idx == i { - continue - } - if pool.Decommission != nil { - // Do not allow multiple decommissions at the same time. - // We shall for now only allow one pool decommission at - // a time. - return fmt.Errorf("%w at index: %d", errDecommissionAlreadyRunning, i) - } - } - // Return an error when there is decommission on going - the user needs // to explicitly cancel it first in order to restart decommissioning again. if p.Pools[idx].Decommission != nil && @@ -510,7 +495,6 @@ func (z *erasureServerPools) Init(ctx context.Context) error { z.StartRebalance() meta := poolMeta{} - if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil { return err } @@ -524,38 +508,38 @@ func (z *erasureServerPools) Init(ctx context.Context) error { if !update { z.poolMeta = meta - // We are only supporting single pool decommission at this time - // so it makes sense to only resume single pools at any given - // time, in future meta.returnResumablePools() might take - // '-1' as argument to decommission multiple pools at a time - // but this is not a priority at the moment. - for _, pool := range meta.returnResumablePools(1) { + pools := meta.returnResumablePools() + poolIndices := make([]int, 0, len(pools)) + for _, pool := range pools { idx := globalEndpoints.GetPoolIdx(pool.CmdLine) if idx == -1 { return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine) } - if globalEndpoints[idx].Endpoints[0].IsLocal { - go func(pool PoolStatus) { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - for { - if err := z.Decommission(ctx, pool.ID); err != nil { - if errors.Is(err, errDecommissionAlreadyRunning) { - // A previous decommission running found restart it. + poolIndices = append(poolIndices, idx) + } + + if len(poolIndices) > 0 && globalEndpoints[poolIndices[0]].Endpoints[0].IsLocal { + go func() { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for { + if err := z.Decommission(ctx, poolIndices...); err != nil { + if errors.Is(err, errDecommissionAlreadyRunning) { + // A previous decommission running found restart it. + for _, idx := range poolIndices { z.doDecommissionInRoutine(ctx, idx) - return } - if configRetriableErrors(err) { - logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w: retrying..", pool, err)) - time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second))) - continue - } - logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err)) return } - break + if configRetriableErrors(err) { + logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pools %v: %w: retrying..", pools, err)) + time.Sleep(time.Second + time.Duration(r.Float64()*float64(5*time.Second))) + continue + } + logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pools, err)) + return } - }(pool) - } + } + }() } return nil @@ -1039,11 +1023,12 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in } z.poolMetaMutex.Lock() - failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0 + failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0 || contextCanceled(dctx) + poolCmdLine := z.poolMeta.Pools[idx].CmdLine z.poolMetaMutex.Unlock() if !failed { - logger.Info("Decommissioning almost complete - checking for left over objects") + logger.Info("Decommissioning complete for pool '%s', verifying for any pending objects", poolCmdLine) err := z.checkAfterDecom(dctx, idx) if err != nil { logger.LogIf(ctx, err) @@ -1067,8 +1052,8 @@ func (z *erasureServerPools) IsSuspended(idx int) bool { } // Decommission - start decommission session. -func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error { - if idx < 0 { +func (z *erasureServerPools) Decommission(ctx context.Context, indices ...int) error { + if len(indices) == 0 { return errInvalidArgument } @@ -1077,11 +1062,15 @@ func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error { } // Make pool unwritable before decommissioning. - if err := z.StartDecommission(ctx, idx); err != nil { + if err := z.StartDecommission(ctx, indices...); err != nil { return err } - go z.doDecommissionInRoutine(ctx, idx) + go func() { + for _, idx := range indices { + z.doDecommissionInRoutine(ctx, idx) + } + }() // Successfully started decommissioning. return nil @@ -1260,8 +1249,8 @@ func (z *erasureServerPools) getBucketsToDecommission(ctx context.Context) ([]de return decomBuckets, nil } -func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) { - if idx < 0 { +func (z *erasureServerPools) StartDecommission(ctx context.Context, indices ...int) (err error) { + if len(indices) == 0 { return errInvalidArgument } @@ -1304,34 +1293,28 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er } } - var pool *erasureSets - for pidx := range z.serverPools { - if pidx == idx { - pool = z.serverPools[idx] - break - } - } - - if pool == nil { - return errInvalidArgument - } - - pi, err := z.getDecommissionPoolSpaceInfo(idx) - if err != nil { - return err - } - z.poolMetaMutex.Lock() defer z.poolMetaMutex.Unlock() - if err = z.poolMeta.Decommission(idx, pi); err != nil { - return err + for _, idx := range indices { + pi, err := z.getDecommissionPoolSpaceInfo(idx) + if err != nil { + return err + } + + if err = z.poolMeta.Decommission(idx, pi); err != nil { + return err + } + + z.poolMeta.QueueBuckets(idx, decomBuckets) } - z.poolMeta.QueueBuckets(idx, decomBuckets) + if err = z.poolMeta.save(ctx, z.serverPools); err != nil { return err } + globalNotificationSys.ReloadPoolMeta(ctx) + return nil }