From 32aa70ab59080de14f1648afa16a0f01225b1157 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 May 2026 02:30:26 -0700 Subject: [PATCH] s3: serialize bucket config writes with field-level filer patches (#9655) PutBucketVersioning and PutBucketEncryption ran concurrently each did a whole-entry read-modify-write of the bucket entry, so one could overwrite the other's field with a stale copy. Each config write is now a field-level PATCH_EXTENDED (extended attributes) or set_content (the metadata blob) ObjectTransaction, routed to the bucket's owner filer and merged onto a fresh read under its per-path lock. Disjoint fields no longer clobber each other. --- weed/s3api/s3api_bucket_config.go | 105 ++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 36 deletions(-) diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index dfdb7baec..f12f54200 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -1,6 +1,7 @@ package s3api import ( + "bytes" "context" "encoding/json" "errors" @@ -15,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/kms" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/cors" @@ -44,8 +46,8 @@ type BucketConfig struct { // rather than this cache. LifecycleTTL *LifecycleTTLResolver KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations - LastModified time.Time - Entry *filer_pb.Entry + LastModified time.Time + Entry *filer_pb.Entry } // BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations @@ -524,26 +526,73 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC bucket, s3_constants.ExtObjectLockEnabledKey, string(nextConfig.Entry.Extended[s3_constants.ExtObjectLockEnabledKey])) } - // Save to filer - glog.V(3).Infof("updateBucketConfig: saving entry to filer for bucket %s", bucket) - err := s3a.updateEntry(s3a.bucketRoot(bucket), nextConfig.Entry) - if err != nil { - glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err) + // Patch only the changed/removed extended keys, leaving Entry.content + // untouched so a concurrent content write (e.g. encryption) is preserved. + oldExt := config.Entry.GetExtended() + newExt := nextConfig.Entry.Extended + set := make(map[string][]byte) + for k, v := range newExt { + if ov, ok := oldExt[k]; !ok || !bytes.Equal(ov, v) { + set[k] = v + } + } + var del []string + for k := range oldExt { + if _, ok := newExt[k]; !ok { + del = append(del, k) + } + } + glog.V(3).Infof("updateBucketConfig: patching %d/%d extended keys for bucket %s", len(set), len(del), bucket) + if err := s3a.patchBucketEntry(bucket, &filer_pb.ObjectMutation{SetExtended: set, DeleteExtended: del}); err != nil { + glog.Errorf("updateBucketConfig: failed to patch bucket entry for %s: %v", bucket, err) return s3err.ErrInternalError } - glog.V(3).Infof("updateBucketConfig: saved entry to filer for bucket %s", bucket) - // Update cache. Re-derive every Extended-backed field from the - // just-saved Entry — the user's update fn may have flipped, added, - // or cleared bytes (e.g. PutBucketLifecycle / DeleteBucketLifecycle - // rewrites the lifecycle XML key) and the resolver / parsed configs - // must follow. - s3a.populateBucketConfigDerivedFields(nextConfig) - s3a.bucketConfigCache.Set(bucket, nextConfig) + // Invalidate rather than cache nextConfig: its content may be stale relative + // to a concurrent content write. The next read re-fetches the merged entry. + if s3a.bucketConfigCache != nil { + s3a.bucketConfigCache.Remove(bucket) + s3a.bucketConfigCache.RemoveNegativeCache(bucket) + } return s3err.ErrNone } +// patchBucketEntry applies a field-level PATCH_EXTENDED mutation to the bucket's +// entry via ObjectTransaction, routed to the bucket's owner filer so its per-path +// lock serializes concurrent config writes cluster-wide rather than racing +// whole-entry rewrites. A nil/empty mutation is a no-op. +func (s3a *S3ApiServer) patchBucketEntry(bucket string, m *filer_pb.ObjectMutation) error { + if m == nil || (len(m.SetExtended) == 0 && len(m.DeleteExtended) == 0 && !m.SetContent) { + return nil + } + dir := s3a.option.BucketsPath + bucketPath := dir + "/" + bucket + m.Type = filer_pb.ObjectMutation_PATCH_EXTENDED + m.Directory = dir + m.Name = bucket + req := &filer_pb.ObjectTransactionRequest{ + LockKey: bucketPath, + Mutations: []*filer_pb.ObjectMutation{m}, + } + txn := func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.ObjectTransaction(context.Background(), req) + if err != nil { + return err + } + if resp.Error != "" { + return fmt.Errorf("patch bucket %s: %s", bucket, resp.Error) + } + return nil + } + if s3a.objectWriteLockClient != nil { + if owner := s3a.objectWriteLockClient.PrimaryForKey("s3.object.write:" + bucketPath); owner != "" { + return pb.WithFilerClient(false, 0, owner, s3a.option.GrpcDialOption, txn) + } + } + return s3a.WithFilerClient(false, txn) +} + func cloneBucketConfig(config *BucketConfig) *BucketConfig { if config == nil { return nil @@ -1070,27 +1119,11 @@ func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadat return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err) } - // Update the bucket entry with new content - err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Get current bucket entry - entry, err := s3a.getBucketEntry(bucket) - if err != nil { - return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err) - } - if entry == nil { - return fmt.Errorf("bucket directory not found %s", bucket) - } - - // Update content with metadata - entry.Content = metadataBytes - - request := &filer_pb.UpdateEntryRequest{ - Directory: s3a.bucketRoot(bucket), - Entry: entry, - } - - _, err = client.UpdateEntry(context.Background(), request) - return err + // Patch only Entry.content so a concurrent extended-attribute write + // (e.g. versioning) is preserved. + err = s3a.patchBucketEntry(bucket, &filer_pb.ObjectMutation{ + SetContent: true, + Content: metadataBytes, }) // Invalidate cache after successful update