From eeda7181aa66016498ce57a7d9cab8f5797acfef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 May 2026 11:07:39 -0700 Subject: [PATCH] s3: route multipart-upload completion off the distributed lock (#9632) completeMultipartUpload routes its writes to the object's owner filer when an owner is known, off the distributed lock. Idempotent replay is handled gateway-side in prepareMultipartCompletionState (it returns the existing result when the object already carries this UploadId), so the lock is not needed to dedupe retries; with no owner yet, the lock remains as the bootstrap path. Versioned completion flips the .versions pointer via routedVersionedFinalize (RECOMPUTE_LATEST). Non-versioned and suspended completion write the object via routedMkFile (a routed PUT) so the write serializes with concurrent writes to the same key on the owner's per-path lock. The version file itself is a unique path and stays a plain mkFile. --- weed/s3api/filer_multipart.go | 39 +++++++++++++++++++----- weed/s3api/s3api_object_routed_write.go | 40 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 6a113f9e4..48b26770b 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -553,9 +553,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId entryName, dirName := s3a.getEntryNameAndDir(input) var completionState *multipartCompletionState - finalizeCode := s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode { - return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key) - }, func() s3err.ErrorCode { + // Route the completion's writes to the object's owner filer when known, off + // the distributed lock. Idempotent replay is handled gateway-side in + // prepareMultipartCompletionState (it returns the existing result when the + // object already carries this UploadId), so the lock is not needed to dedupe + // retries. With no owner yet (no ring), keep the lock as the bootstrap path. + owner := s3a.objectWriteOwner(*input.Bucket, *input.Key) + completionBody := func() s3err.ErrorCode { var prepCode s3err.ErrorCode completionState, output, prepCode = s3a.prepareMultipartCompletionState(r, input, uploadDirectory, entryName, dirName, completedPartNumbers, completedPartMap, maxPartNo) if prepCode != s3err.ErrNone || output != nil { @@ -651,7 +655,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Update the .versions directory metadata to indicate this is the latest version // Pass entry to cache its metadata for single-scan list efficiency - if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil { + // Route the pointer flip to the owner (off the lock) via + // RECOMPUTE_LATEST; the just-written version file is the newest. + if owner != "" { + if code := s3a.routedVersionedFinalize(owner, *input.Bucket, *input.Key, useInvertedFormat); code != s3err.ErrNone { + if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil { + glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after routed finalize error: %v", versionId, *input.Bucket, *input.Key, rollbackErr) + } + return code + } + } else if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil { if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil { glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr) } @@ -675,7 +688,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl if versioningState == s3_constants.VersioningSuspended { // For suspended versioning, add "null" version ID metadata and return "null" version ID - if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) { + if err := s3a.writeMultipartObject(owner, dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } @@ -739,7 +752,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } // For non-versioned buckets, create main object file - if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) { + if err := s3a.writeMultipartObject(owner, dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } @@ -802,7 +815,19 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl ChecksumValue: completionState.checksumValue, } return s3err.ErrNone - }) + } + var finalizeCode s3err.ErrorCode + if owner != "" { + if code := s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key); code != s3err.ErrNone { + finalizeCode = code + } else { + finalizeCode = completionBody() + } + } else { + finalizeCode = s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode { + return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key) + }, completionBody) + } if finalizeCode != s3err.ErrNone { return nil, finalizeCode } diff --git a/weed/s3api/s3api_object_routed_write.go b/weed/s3api/s3api_object_routed_write.go index 8cba0f988..95340a00a 100644 --- a/weed/s3api/s3api_object_routed_write.go +++ b/weed/s3api/s3api_object_routed_write.go @@ -6,6 +6,7 @@ import ( "net/http" "path" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -159,6 +160,45 @@ func (s3a *S3ApiServer) routedPut(owner pb.ServerAddress, filePath string, entry }) } +// routedMkFile builds an entry like filer_pb.MkFile and writes it through a +// routed PUT on the owner filer, for callers that would otherwise mkFile to the +// default filer (e.g. multipart completion of a non-versioned object). +func (s3a *S3ApiServer) routedMkFile(owner pb.ServerAddress, parentDir, name string, chunks []*filer_pb.FileChunk, fn func(*filer_pb.Entry)) error { + now := time.Now().Unix() + entry := &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + Crtime: now, + FileMode: uint32(0770), + Uid: filer_pb.OS_UID, + Gid: filer_pb.OS_GID, + }, + Chunks: chunks, + } + if fn != nil { + fn(entry) + } + resp, err := s3a.routedPut(owner, parentDir+"/"+name, entry, nil) + if err != nil { + return err + } + if resp.Error != "" { + return fmt.Errorf("routed mkfile %s/%s: %s", parentDir, name, resp.Error) + } + return nil +} + +// writeMultipartObject writes a completed multipart object entry, routed to the +// owner when known (so it serializes with concurrent writes to the same key) +// and falling back to a plain mkFile otherwise. +func (s3a *S3ApiServer) writeMultipartObject(owner pb.ServerAddress, dir, name string, chunks []*filer_pb.FileChunk, fn func(*filer_pb.Entry)) error { + if owner != "" { + return s3a.routedMkFile(owner, dir, name, chunks, fn) + } + return s3a.mkFile(dir, name, chunks, fn) +} + func (s3a *S3ApiServer) routedDelete(owner pb.ServerAddress, bucket, object string, cond *filer_pb.WriteCondition) (*filer_pb.ObjectTransactionResponse, error) { // NewFullPath normalizes a trailing-slash directory-marker key (e.g. "dir/") // to the entry name "dir", matching deleteUnversionedObjectWithClient.