From 6186d11761e78b77c27bdbd885a49562a92c82a5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 29 Sep 2024 15:40:36 -0700 Subject: [PATCH] handle the locks properly for multi-pool callers (#20495) - PutObjectMetadata() - PutObjectTags() - DeleteObjectTags() - TransitionObject() - RestoreTransitionObject() Also improve the behavior of multipart code across pool locks, hold locks only once per upload ID for - CompleteMultipartUpload() - AbortMultipartUpload() - ListObjectParts() (read-lock) - GetMultipartInfo() (read-lock) - PutObjectPart() (read-lock) This avoids lock attempts across pools for no reason, this increases O(n) when there are n-pools. --- cmd/erasure-multipart.go | 51 +------- cmd/erasure-object.go | 32 ++--- cmd/erasure-server-pool.go | 109 +++++++++++++++++- cmd/namespace-lock.go | 2 +- .../replication/test_del_marker_proxying.sh | 10 +- 5 files changed, 134 insertions(+), 70 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index c5726b216..af642dfc7 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -577,19 +577,9 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return pi, toObjectErr(errInvalidArgument) } - // Read lock for upload id. - // Only held while reading the upload metadata. - uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout) - if err != nil { - return PartInfo{}, err - } - rctx := rlkctx.Context() - defer uploadIDRLock.RUnlock(rlkctx) - uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. - fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true) + fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true) if err != nil { if errors.Is(err, errVolumeNotFound) { return pi, toObjectErr(err, bucket) @@ -744,10 +734,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - // Write lock for this part ID, only hold it if we are planning to read from the - // stream avoid any concurrent updates. - // - // Must be held throughout this call. + // Serialize concurrent part uploads. partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { @@ -801,14 +788,6 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u UploadID: uploadID, } - uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) - if err != nil { - return MultipartInfo{}, err - } - ctx = lkctx.Context() - defer uploadIDLock.RUnlock(lkctx) - fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) if err != nil { if errors.Is(err, errVolumeNotFound) { @@ -888,14 +867,6 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up auditObjectErasureSet(ctx, "ListObjectParts", object, &er) } - uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) - if err != nil { - return ListPartsInfo{}, err - } - ctx = lkctx.Context() - defer uploadIDLock.RUnlock(lkctx) - fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) if err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -1118,16 +1089,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } - // Hold write locks to verify uploaded parts, also disallows any - // parallel PutObjectPart() requests. - uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) - if err != nil { - return oi, err - } - ctx = wlkctx.Context() - defer uploadIDLock.Unlock(wlkctx) - fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true) if err != nil { if errors.Is(err, errVolumeNotFound) { @@ -1494,14 +1455,6 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec auditObjectErasureSet(ctx, "AbortMultipartUpload", object, &er) } - lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) - lkctx, err := lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return err - } - ctx = lkctx.Context() - defer lk.Unlock(lkctx) - // Validates if upload ID exists. if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil { if errors.Is(err, errVolumeNotFound) { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 6c84c96eb..df878c67c 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -2192,14 +2192,16 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s // PutObjectTags - replace or add tags to an existing object func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { - // Lock the object before updating tags. - lk := er.NewNSLock(bucket, object) - lkctx, err := lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return ObjectInfo{}, err + if !opts.NoLock { + // Lock the object before updating tags. + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) } - ctx = lkctx.Context() - defer lk.Unlock(lkctx) disks := er.getDisks() @@ -2310,14 +2312,16 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st return err } - // Acquire write lock before starting to transition the object. - lk := er.NewNSLock(bucket, object) - lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) - if err != nil { - return err + if !opts.NoLock { + // Acquire write lock before starting to transition the object. + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) } - ctx = lkctx.Context() - defer lk.Unlock(lkctx) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 1fcedb5cc..1ebafad0f 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1858,6 +1858,16 @@ func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object, return PartInfo{}, err } + // Read lock for upload id. + // Only held while reading the upload metadata. + uploadIDRLock := z.NewNSLock(bucket, pathJoin(object, uploadID)) + rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { + return PartInfo{}, err + } + ctx = rlkctx.Context() + defer uploadIDRLock.RUnlock(rlkctx) + if z.SinglePool() { return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } @@ -1890,9 +1900,18 @@ func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, objec return MultipartInfo{}, err } + uploadIDLock := z.NewNSLock(bucket, pathJoin(object, uploadID)) + lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { + return MultipartInfo{}, err + } + ctx = lkctx.Context() + defer uploadIDLock.RUnlock(lkctx) + if z.SinglePool() { return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts) } + for idx, pool := range z.serverPools { if z.IsSuspended(idx) { continue @@ -1908,6 +1927,7 @@ func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, objec // any other unhandled error return right here. return MultipartInfo{}, err } + return MultipartInfo{}, InvalidUploadID{ Bucket: bucket, Object: object, @@ -1921,9 +1941,18 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object return ListPartsInfo{}, err } + uploadIDLock := z.NewNSLock(bucket, pathJoin(object, uploadID)) + lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { + return ListPartsInfo{}, err + } + ctx = lkctx.Context() + defer uploadIDLock.RUnlock(lkctx) + if z.SinglePool() { return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } + for idx, pool := range z.serverPools { if z.IsSuspended(idx) { continue @@ -1937,6 +1966,7 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object } return ListPartsInfo{}, err } + return ListPartsInfo{}, InvalidUploadID{ Bucket: bucket, Object: object, @@ -1957,6 +1987,14 @@ func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, o } }() + lk := z.NewNSLock(bucket, pathJoin(object, uploadID)) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + if z.SinglePool() { return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } @@ -1995,6 +2033,16 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket } }() + // Hold write locks to verify uploaded parts, also disallows any + // parallel PutObjectPart() requests. + uploadIDLock := z.NewNSLock(bucket, pathJoin(object, uploadID)) + wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout) + if err != nil { + return objInfo, err + } + ctx = wlkctx.Context() + defer uploadIDLock.Unlock(wlkctx) + if z.SinglePool() { return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) } @@ -2774,7 +2822,19 @@ func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, obje return z.serverPools[0].PutObjectMetadata(ctx, bucket, object, opts) } + if !opts.NoLock { + // Lock the object before updating metadata. + lk := z.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } + opts.MetadataChg = true + opts.NoLock = true // We don't know the size here set 1GiB at least. idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { @@ -2791,7 +2851,19 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts) } + if !opts.NoLock { + // Lock the object before updating tags. + lk := z.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } + opts.MetadataChg = true + opts.NoLock = true // We don't know the size here set 1GiB at least. idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) @@ -2809,8 +2881,19 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts) } - opts.MetadataChg = true + if !opts.NoLock { + // Lock the object before deleting tags. + lk := z.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } + opts.MetadataChg = true + opts.NoLock = true idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return ObjectInfo{}, err @@ -2841,8 +2924,20 @@ func (z *erasureServerPools) TransitionObject(ctx context.Context, bucket, objec return z.serverPools[0].TransitionObject(ctx, bucket, object, opts) } + if !opts.NoLock { + // Acquire write lock before starting to transition the object. + lk := z.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } + // Avoid transitioning an object from a pool being decommissioned. opts.SkipDecommissioned = true + opts.NoLock = true idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return err @@ -2858,8 +2953,20 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts) } + if !opts.NoLock { + // Acquire write lock before restoring transitioned object + lk := z.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } + // Avoid restoring object from a pool being decommissioned. opts.SkipDecommissioned = true + opts.NoLock = true idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return err diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index a0f510a37..a39d22011 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -229,6 +229,7 @@ type localLockInstance struct { // path. The returned lockInstance object encapsulates the nsLockMap, // volume, path and operation ID. func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker { + sort.Strings(paths) opsID := mustGetUUID() if n.isDistErasure { drwmutex := dsync.NewDRWMutex(&dsync.Dsync{ @@ -237,7 +238,6 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume }, pathsJoinPrefix(volume, paths...)...) return &distLockInstance{drwmutex, opsID} } - sort.Strings(paths) return &localLockInstance{n, volume, paths, opsID} } diff --git a/docs/bucket/replication/test_del_marker_proxying.sh b/docs/bucket/replication/test_del_marker_proxying.sh index 85fd07245..8d7521b4c 100755 --- a/docs/bucket/replication/test_del_marker_proxying.sh +++ b/docs/bucket/replication/test_del_marker_proxying.sh @@ -26,8 +26,8 @@ cleanup export MINIO_CI_CD=1 export MINIO_BROWSER=off -export MINIO_ROOT_USER="minio" -export MINIO_ROOT_PASSWORD="minio123" + +make install-race # Start MinIO instances echo -n "Starting MinIO instances ..." @@ -48,8 +48,8 @@ if [ ! -f ./mc ]; then chmod +x mc fi -export MC_HOST_sitea=http://minio:minio123@127.0.0.1:9001 -export MC_HOST_siteb=http://minio:minio123@127.0.0.1:9004 +export MC_HOST_sitea=http://minioadmin:minioadmin@127.0.0.1:9001 +export MC_HOST_siteb=http://minioadmin:minioadmin@127.0.0.1:9004 ./mc ready sitea ./mc ready siteb @@ -65,7 +65,7 @@ export MC_HOST_siteb=http://minio:minio123@127.0.0.1:9004 # Run the test to make sure proxying of DEL marker doesn't happen loop_count=0 while true; do - if [ $loop_count -eq 100 ]; then + if [ $loop_count -eq 1000 ]; then break fi echo "Hello World" | ./mc pipe sitea/bucket/obj$loop_count