From 410a1ac04087e93461b5b30d8da4625c3fcf1f40 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Sat, 27 Apr 2024 00:59:28 +0530 Subject: [PATCH] Handle failures in pool rebalancing (#19623) --- cmd/erasure-server-pool-rebalance.go | 60 +++++++++++++++------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index 89eb6bed4..e645792b1 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -376,7 +376,7 @@ func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool { } func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) { - doneCh := make(chan struct{}) + doneCh := make(chan error, 1) defer xioutil.SafeClose(doneCh) // Save rebalance.bin periodically. @@ -391,48 +391,50 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) timer := time.NewTimer(randSleepFor()) defer timer.Stop() - var rebalDone bool - var traceMsg string + + var ( + quit bool + traceMsg string + ) for { select { - case <-doneCh: - // rebalance completed for poolIdx + case rebalErr := <-doneCh: + quit = true now := time.Now() + var status rebalStatus + + switch { + case errors.Is(rebalErr, context.Canceled): + status = rebalStopped + traceMsg = fmt.Sprintf("stopped at %s", now) + case rebalErr == nil: + status = rebalCompleted + traceMsg = fmt.Sprintf("completed at %s", now) + default: + status = rebalFailed + traceMsg = fmt.Sprintf("stopped at %s with err: %v", now, rebalErr) + } + z.rebalMu.Lock() - z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted + z.rebalMeta.PoolStats[poolIdx].Info.Status = status z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now z.rebalMu.Unlock() - rebalDone = true - traceMsg = fmt.Sprintf("completed at %s", now) - - case <-ctx.Done(): - - // rebalance stopped for poolIdx - now := time.Now() - z.rebalMu.Lock() - z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped - z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now - z.rebalMeta.cancel = nil // remove the already used context.CancelFunc - z.rebalMu.Unlock() - - rebalDone = true - traceMsg = fmt.Sprintf("stopped at %s", now) - case <-timer.C: traceMsg = fmt.Sprintf("saved at %s", time.Now()) } stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg) - err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats) + err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats) stopFn(err) - rebalanceLogIf(ctx, err) - timer.Reset(randSleepFor()) + rebalanceLogIf(GlobalContext, err) - if rebalDone { + if quit { return } + + timer.Reset(randSleepFor()) } }() @@ -441,6 +443,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) for { select { case <-ctx.Done(): + doneCh <- ctx.Err() return default: } @@ -457,14 +460,15 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) { continue } - rebalanceLogIf(ctx, err) + rebalanceLogIf(GlobalContext, err) + doneCh <- err return } stopFn(nil) z.bucketRebalanceDone(bucket, poolIdx) } - rebalanceLogEvent(ctx, "Pool %d rebalancing is done", poolIdx+1) + rebalanceLogEvent(GlobalContext, "Pool %d rebalancing is done", poolIdx+1) return err }