From ee49a2322007b36299d6a01703321b835aa09335 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 6 Apr 2022 23:42:05 -0700 Subject: [PATCH] resume/start decommission on the first node of the pool under decommission (#14705) Additionally fixes - IsSuspended() can use read locks - Avoid double cancels panic on canceler --- cmd/admin-handlers-pools.go | 20 ++++++++++++++++ cmd/erasure-server-pool-decom.go | 41 +++++++++++++++++++------------- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/cmd/admin-handlers-pools.go b/cmd/admin-handlers-pools.go index 3b3107905..f4aaf91e8 100644 --- a/cmd/admin-handlers-pools.go +++ b/cmd/admin-handlers-pools.go @@ -58,6 +58,16 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque return } + if ep := globalEndpoints[idx].Endpoints[0]; !ep.IsLocal { + for nodeIdx, proxyEp := range globalProxyEndpoints { + if proxyEp.Endpoint.Host == ep.Host { + if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) { + return + } + } + } + } + if err := pools.Decommission(r.Context(), idx); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return @@ -96,6 +106,16 @@ func (a adminAPIHandlers) CancelDecommission(w http.ResponseWriter, r *http.Requ return } + if ep := globalEndpoints[idx].Endpoints[0]; !ep.IsLocal { + for nodeIdx, proxyEp := range globalProxyEndpoints { + if proxyEp.Endpoint.Host == ep.Host { + if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) { + return + } + } + } + } + if err := pools.DecommissionCancel(ctx, idx); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 25e78bd0c..e04eae03c 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -141,7 +141,7 @@ func (p *poolMeta) returnResumablePools(n int) []PoolStatus { } func (p *poolMeta) DecommissionComplete(idx int) bool { - if p.Pools[idx].Decommission != nil { + if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Complete { p.Pools[idx].LastUpdate = UTCNow() p.Pools[idx].Decommission.Complete = true p.Pools[idx].Decommission.Failed = false @@ -152,7 +152,7 @@ func (p *poolMeta) DecommissionComplete(idx int) bool { } func (p *poolMeta) DecommissionFailed(idx int) bool { - if p.Pools[idx].Decommission != nil { + if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Failed { p.Pools[idx].LastUpdate = UTCNow() p.Pools[idx].Decommission.StartTime = time.Time{} p.Pools[idx].Decommission.Complete = false @@ -164,7 +164,7 @@ func (p *poolMeta) DecommissionFailed(idx int) bool { } func (p *poolMeta) DecommissionCancel(idx int) bool { - if p.Pools[idx].Decommission != nil { + if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.Canceled { p.Pools[idx].LastUpdate = UTCNow() p.Pools[idx].Decommission.StartTime = time.Time{} p.Pools[idx].Decommission.Complete = false @@ -466,24 +466,32 @@ func (z *erasureServerPools) Init(ctx context.Context) error { // if no update is needed return right away. if !update { + z.poolMeta = meta + // We are only supporting single pool decommission at this time // so it makes sense to only resume single pools at any given // time, in future meta.returnResumablePools() might take // '-1' as argument to decommission multiple pools at a time // but this is not a priority at the moment. for _, pool := range meta.returnResumablePools(1) { - go func(pool PoolStatus) { - switch err := z.Decommission(ctx, pool.ID); err { - case errDecommissionAlreadyRunning: - fallthrough - case nil: - z.doDecommissionInRoutine(ctx, pool.ID) - default: - logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err)) - } - }(pool) + idx := globalEndpoints.GetPoolIdx(pool.CmdLine) + if idx == -1 { + return fmt.Errorf("unexpected state present for decommission status pool(%s) not found", pool.CmdLine) + } + if globalEndpoints[idx].Endpoints[0].IsLocal { + go func(pool PoolStatus) { + switch err := z.Decommission(ctx, pool.ID); err { + case nil: + // we already started decommission + case errDecommissionAlreadyRunning: + // A previous decommission running found restart it. + z.doDecommissionInRoutine(ctx, idx) + default: + logger.LogIf(ctx, fmt.Errorf("Unable to resume decommission of pool %v: %w", pool, err)) + } + }(pool) + } } - z.poolMeta = meta return nil } @@ -757,8 +765,8 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in } func (z *erasureServerPools) IsSuspended(idx int) bool { - z.poolMetaMutex.Lock() - defer z.poolMetaMutex.Unlock() + z.poolMetaMutex.RLock() + defer z.poolMetaMutex.RUnlock() return z.poolMeta.IsSuspended(idx) } @@ -920,6 +928,7 @@ func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int) defer z.poolMetaMutex.Unlock() if z.poolMeta.DecommissionComplete(idx) { + z.decommissionCancelers[idx]() // cancel any active thread. if err = z.poolMeta.save(ctx, z.serverPools); err != nil { return err }