diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 5b772c39e..2ff0644c3 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -583,13 +583,15 @@ func (v versionsSorter) reverse() { func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { var wg sync.WaitGroup wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) - workerSize, _ := strconv.Atoi(wStr) + workerSize, err := strconv.Atoi(wStr) + if err != nil { + return err + } parallelWorkers := make(chan struct{}, workerSize) versioned := globalBucketVersioningSys.Enabled(bName) for _, set := range pool.sets { - parallelWorkers <- struct{}{} set := set disks := set.getOnlineDisks() if len(disks) == 0 { @@ -599,6 +601,9 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool } decommissionEntry := func(entry metaCacheEntry) { + defer func() { + <-parallelWorkers + }() if entry.isDir() { return } @@ -712,17 +717,20 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool forwardTo: "", minDisks: len(disks) / 2, // to capture all quorum ratios reportNotFound: false, - agreed: decommissionEntry, + agreed: func(entry metaCacheEntry) { + parallelWorkers <- struct{}{} + go decommissionEntry(entry) + }, partial: func(entries metaCacheEntries, nAgreed int, errs []error) { entry, ok := entries.resolve(&resolver) if ok { - decommissionEntry(*entry) + parallelWorkers <- struct{}{} + go decommissionEntry(*entry) } }, finished: nil, }) logger.LogIf(ctx, err) - <-parallelWorkers }() } wg.Wait()