diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 0a04720ac..1f32ef94a 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1074,6 +1074,11 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba return err } + walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict") + if walkQuorum == "" { + walkQuorum = "strict" + } + retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { @@ -1083,9 +1088,11 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 - if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, walkCh, ObjectOptions{ - WalkMarker: lastObject, - WalkFilter: selectObj, + results := make(chan ObjectInfo, 100) + if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, WalkOptions{ + Marker: lastObject, + Filter: selectObj, + AskDisks: walkQuorum, }); err != nil { cancel() // Do not need to retry if we can't list objects on source. @@ -1429,7 +1436,7 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobPrefix, resultCh, ObjectOptions{}); err != nil { + if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobPrefix, resultCh, WalkOptions{}); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } @@ -1646,7 +1653,7 @@ func (j *BatchJobPool) resume() { results := make(chan ObjectInfo, 100) ctx, cancel := context.WithCancel(j.ctx) defer cancel() - if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, ObjectOptions{}); err != nil { + if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil { logger.LogIf(j.ctx, err) return } diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index 72eaf10f8..f46b49590 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -359,9 +359,9 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba ctx, cancel := context.WithCancel(ctx) results := make(chan ObjectInfo, 100) - if err := api.Walk(ctx, r.Bucket, r.Prefix, results, ObjectOptions{ - WalkMarker: lastObject, - WalkFilter: skip, + if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ + Marker: lastObject, + Filter: skip, }); err != nil { cancel() // Do not need to retry if we can't list objects on source. diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index cd6edc6d9..bde90cd59 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2579,7 +2579,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object // Walk through all object versions - Walk() is always in ascending order needed to ensure // delete marker replicated to target after object version is first created. - if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil { + if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, WalkOptions{}); err != nil { logger.LogIf(ctx, err) return } @@ -2952,7 +2952,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, } objInfoCh := make(chan ObjectInfo, 10) - if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil { + if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, WalkOptions{}); err != nil { logger.LogIf(ctx, err) return nil, err } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 5b8b3fb22..e98dec6d2 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1945,7 +1945,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts // to allocate a receive channel for ObjectInfo, upon any unhandled // error walker returns error. Optionally if context.Done() is received // then Walk() stops the walker. -func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { +func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil { // Upon error close the channel. close(results) @@ -1982,21 +1982,27 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re } } - askDisks := getListQuorum(opts.WalkAskDisks, set.setDriveCount) - var fallbackDisks []StorageAPI + askDisks := getListQuorum(opts.AskDisks, set.setDriveCount) + if askDisks == -1 { + askDisks = getListQuorum("strict", set.setDriveCount) + } // Special case: ask all disks if the drive count is 4 if set.setDriveCount == 4 || askDisks > len(disks) { askDisks = len(disks) // use all available drives } + var fallbackDisks []StorageAPI if askDisks > 0 && len(disks) > askDisks { + rand.Shuffle(len(disks), func(i, j int) { + disks[i], disks[j] = disks[j], disks[i] + }) fallbackDisks = disks[askDisks:] disks = disks[:askDisks] } requestedVersions := 0 - if opts.WalkLatestOnly { + if opts.LatestOnly { requestedVersions = 1 } loadEntry := func(entry metaCacheEntry) { @@ -2004,14 +2010,14 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re return } - if opts.WalkLatestOnly { + if opts.LatestOnly { fi, err := entry.fileInfo(bucket) if err != nil { cancel() return } - if opts.WalkFilter != nil { - if opts.WalkFilter(fi) { + if opts.Filter != nil { + if opts.Filter(fi) { if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) { return } @@ -2032,8 +2038,8 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re versionsSorter(fivs.Versions).reverse() for _, version := range fivs.Versions { - if opts.WalkFilter != nil { - if opts.WalkFilter(version) { + if opts.Filter != nil { + if opts.Filter(version) { if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) { return } @@ -2047,10 +2053,13 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re } } + // However many we ask, versions must exist on ~50% + listingQuorum := (askDisks + 1) / 2 + // How to resolve partial results. resolver := metadataResolutionParams{ - dirQuorum: 1, - objQuorum: 1, + dirQuorum: listingQuorum, + objQuorum: listingQuorum, bucket: bucket, requestedVersions: requestedVersions, } @@ -2068,19 +2077,15 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re path: path, filterPrefix: filterPrefix, recursive: true, - forwardTo: opts.WalkMarker, + forwardTo: opts.Marker, minDisks: 1, reportNotFound: false, agreed: loadEntry, partial: func(entries metaCacheEntries, _ []error) { entry, ok := entries.resolve(&resolver) - if !ok { - // check if we can get one entry atleast - // proceed to heal nonetheless. - entry, _ = entries.firstFound() + if ok { + loadEntry(*entry) } - - loadEntry(*entry) }, finished: nil, } diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 64bd3e01b..51263e130 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -580,7 +580,7 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri // Allocate new results channel to receive ObjectInfo. objInfoCh := make(chan ObjectInfo) - if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, ObjectOptions{}); err != nil { + if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, WalkOptions{}); err != nil { select { case ch <- itemOrErr{Err: err}: case <-ctx.Done(): diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 8b17e82df..fe8f40a91 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -95,10 +95,6 @@ type ObjectOptions struct { // participating in a rebalance operation. Typically set for 'write' operations. SkipRebalancing bool - WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false' - WalkMarker string // set to skip until this object - WalkLatestOnly bool // returns only latest versions for all matching objects - WalkAskDisks string // dictates how many disks are being listed PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix // IndexCB will return any index created but the compression. @@ -113,6 +109,14 @@ type ObjectOptions struct { FastGetObjInfo bool // Only for S3 Head/Get Object calls for now } +// WalkOptions provides filtering, marker and other Walk() specific options. +type WalkOptions struct { + Filter func(info FileInfo) bool // return WalkFilter returns 'true/false' + Marker string // set to skip until this object + LatestOnly bool // returns only latest versions for all matching objects + AskDisks string // dictates how many disks are being listed +} + // ExpirationOptions represents object options for object expiration at objectLayer. type ExpirationOptions struct { Expire bool @@ -224,7 +228,7 @@ type ObjectLayer interface { ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error) // Walk lists all objects including versions, delete markers. - Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error + Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error // Object operations.