mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-03 23:46:25 +00:00
s3: route multipart-upload completion off the distributed lock (#9632)
completeMultipartUpload routes its writes to the object's owner filer when an owner is known, off the distributed lock. Idempotent replay is handled gateway-side in prepareMultipartCompletionState (it returns the existing result when the object already carries this UploadId), so the lock is not needed to dedupe retries; with no owner yet, the lock remains as the bootstrap path. Versioned completion flips the .versions pointer via routedVersionedFinalize (RECOMPUTE_LATEST). Non-versioned and suspended completion write the object via routedMkFile (a routed PUT) so the write serializes with concurrent writes to the same key on the owner's per-path lock. The version file itself is a unique path and stays a plain mkFile.
This commit is contained in:
@@ -553,9 +553,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
||||
entryName, dirName := s3a.getEntryNameAndDir(input)
|
||||
var completionState *multipartCompletionState
|
||||
finalizeCode := s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode {
|
||||
return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key)
|
||||
}, func() s3err.ErrorCode {
|
||||
// Route the completion's writes to the object's owner filer when known, off
|
||||
// the distributed lock. Idempotent replay is handled gateway-side in
|
||||
// prepareMultipartCompletionState (it returns the existing result when the
|
||||
// object already carries this UploadId), so the lock is not needed to dedupe
|
||||
// retries. With no owner yet (no ring), keep the lock as the bootstrap path.
|
||||
owner := s3a.objectWriteOwner(*input.Bucket, *input.Key)
|
||||
completionBody := func() s3err.ErrorCode {
|
||||
var prepCode s3err.ErrorCode
|
||||
completionState, output, prepCode = s3a.prepareMultipartCompletionState(r, input, uploadDirectory, entryName, dirName, completedPartNumbers, completedPartMap, maxPartNo)
|
||||
if prepCode != s3err.ErrNone || output != nil {
|
||||
@@ -651,7 +655,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
|
||||
// Update the .versions directory metadata to indicate this is the latest version
|
||||
// Pass entry to cache its metadata for single-scan list efficiency
|
||||
if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil {
|
||||
// Route the pointer flip to the owner (off the lock) via
|
||||
// RECOMPUTE_LATEST; the just-written version file is the newest.
|
||||
if owner != "" {
|
||||
if code := s3a.routedVersionedFinalize(owner, *input.Bucket, *input.Key, useInvertedFormat); code != s3err.ErrNone {
|
||||
if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil {
|
||||
glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after routed finalize error: %v", versionId, *input.Bucket, *input.Key, rollbackErr)
|
||||
}
|
||||
return code
|
||||
}
|
||||
} else if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil {
|
||||
if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil {
|
||||
glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr)
|
||||
}
|
||||
@@ -675,7 +688,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
|
||||
if versioningState == s3_constants.VersioningSuspended {
|
||||
// For suspended versioning, add "null" version ID metadata and return "null" version ID
|
||||
if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
|
||||
if err := s3a.writeMultipartObject(owner, dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
@@ -739,7 +752,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
}
|
||||
|
||||
// For non-versioned buckets, create main object file
|
||||
if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
|
||||
if err := s3a.writeMultipartObject(owner, dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
@@ -802,7 +815,19 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
ChecksumValue: completionState.checksumValue,
|
||||
}
|
||||
return s3err.ErrNone
|
||||
})
|
||||
}
|
||||
var finalizeCode s3err.ErrorCode
|
||||
if owner != "" {
|
||||
if code := s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key); code != s3err.ErrNone {
|
||||
finalizeCode = code
|
||||
} else {
|
||||
finalizeCode = completionBody()
|
||||
}
|
||||
} else {
|
||||
finalizeCode = s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode {
|
||||
return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key)
|
||||
}, completionBody)
|
||||
}
|
||||
if finalizeCode != s3err.ErrNone {
|
||||
return nil, finalizeCode
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -159,6 +160,45 @@ func (s3a *S3ApiServer) routedPut(owner pb.ServerAddress, filePath string, entry
|
||||
})
|
||||
}
|
||||
|
||||
// routedMkFile builds an entry like filer_pb.MkFile and writes it through a
|
||||
// routed PUT on the owner filer, for callers that would otherwise mkFile to the
|
||||
// default filer (e.g. multipart completion of a non-versioned object).
|
||||
func (s3a *S3ApiServer) routedMkFile(owner pb.ServerAddress, parentDir, name string, chunks []*filer_pb.FileChunk, fn func(*filer_pb.Entry)) error {
|
||||
now := time.Now().Unix()
|
||||
entry := &filer_pb.Entry{
|
||||
Name: name,
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Mtime: now,
|
||||
Crtime: now,
|
||||
FileMode: uint32(0770),
|
||||
Uid: filer_pb.OS_UID,
|
||||
Gid: filer_pb.OS_GID,
|
||||
},
|
||||
Chunks: chunks,
|
||||
}
|
||||
if fn != nil {
|
||||
fn(entry)
|
||||
}
|
||||
resp, err := s3a.routedPut(owner, parentDir+"/"+name, entry, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return fmt.Errorf("routed mkfile %s/%s: %s", parentDir, name, resp.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeMultipartObject writes a completed multipart object entry, routed to the
|
||||
// owner when known (so it serializes with concurrent writes to the same key)
|
||||
// and falling back to a plain mkFile otherwise.
|
||||
func (s3a *S3ApiServer) writeMultipartObject(owner pb.ServerAddress, dir, name string, chunks []*filer_pb.FileChunk, fn func(*filer_pb.Entry)) error {
|
||||
if owner != "" {
|
||||
return s3a.routedMkFile(owner, dir, name, chunks, fn)
|
||||
}
|
||||
return s3a.mkFile(dir, name, chunks, fn)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) routedDelete(owner pb.ServerAddress, bucket, object string, cond *filer_pb.WriteCondition) (*filer_pb.ObjectTransactionResponse, error) {
|
||||
// NewFullPath normalizes a trailing-slash directory-marker key (e.g. "dir/")
|
||||
// to the entry name "dir", matching deleteUnversionedObjectWithClient.
|
||||
|
||||
Reference in New Issue
Block a user