From 48594617b521f8307e91f80530988c25dd739cfb Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Thu, 7 Apr 2022 23:19:13 -0700 Subject: [PATCH] Parallelize decommissioning process (#14704) --- cmd/erasure-server-pool-decom.go | 58 ++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index e04eae03c..5b772c39e 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -24,12 +24,15 @@ import ( "fmt" "net/http" "sort" + "strconv" + "sync" "time" "github.com/dustin/go-humanize" "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/console" + "github.com/minio/pkg/env" ) // PoolDecommissionInfo currently decommissioning information @@ -578,15 +581,16 @@ func (v versionsSorter) reverse() { } func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { - var forwardTo string - // If we resume to the same bucket, forward to last known item. - rbucket, robject := z.poolMeta.ResumeBucketObject(idx) - if rbucket != "" && rbucket == bName { - forwardTo = robject - } + var wg sync.WaitGroup + wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) + workerSize, _ := strconv.Atoi(wStr) + + parallelWorkers := make(chan struct{}, workerSize) versioned := globalBucketVersioningSys.Enabled(bName) for _, set := range pool.sets { + parallelWorkers <- struct{}{} + set := set disks := set.getOnlineDisks() if len(disks) == 0 { logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s", @@ -698,26 +702,30 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool bucket: bName, } - if err := listPathRaw(ctx, listPathRawOptions{ - disks: disks, - bucket: bName, - recursive: true, - forwardTo: forwardTo, - minDisks: len(disks) / 2, // to capture all quorum ratios - reportNotFound: false, - agreed: decommissionEntry, - partial: func(entries metaCacheEntries, nAgreed int, errs []error) { - entry, ok := entries.resolve(&resolver) - if ok { - decommissionEntry(*entry) - } - }, - finished: nil, - }); err != nil { - // Decommissioning failed and won't continue - return err - } + wg.Add(1) + go func() { + defer wg.Done() + err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: bName, + recursive: true, + forwardTo: "", + minDisks: len(disks) / 2, // to capture all quorum ratios + reportNotFound: false, + agreed: decommissionEntry, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + entry, ok := entries.resolve(&resolver) + if ok { + decommissionEntry(*entry) + } + }, + finished: nil, + }) + logger.LogIf(ctx, err) + <-parallelWorkers + }() } + wg.Wait() return nil }