mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-01 14:36:22 +00:00
s3: serialize bucket config writes with field-level filer patches (#9655)
PutBucketVersioning and PutBucketEncryption ran concurrently each did a whole-entry read-modify-write of the bucket entry, so one could overwrite the other's field with a stale copy. Each config write is now a field-level PATCH_EXTENDED (extended attributes) or set_content (the metadata blob) ObjectTransaction, routed to the bucket's owner filer and merged onto a fresh read under its per-path lock. Disjoint fields no longer clobber each other.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/kms"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/cors"
|
||||
@@ -44,8 +46,8 @@ type BucketConfig struct {
|
||||
// rather than this cache.
|
||||
LifecycleTTL *LifecycleTTLResolver
|
||||
KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations
|
||||
LastModified time.Time
|
||||
Entry *filer_pb.Entry
|
||||
LastModified time.Time
|
||||
Entry *filer_pb.Entry
|
||||
}
|
||||
|
||||
// BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations
|
||||
@@ -524,26 +526,73 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC
|
||||
bucket, s3_constants.ExtObjectLockEnabledKey, string(nextConfig.Entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
||||
}
|
||||
|
||||
// Save to filer
|
||||
glog.V(3).Infof("updateBucketConfig: saving entry to filer for bucket %s", bucket)
|
||||
err := s3a.updateEntry(s3a.bucketRoot(bucket), nextConfig.Entry)
|
||||
if err != nil {
|
||||
glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
|
||||
// Patch only the changed/removed extended keys, leaving Entry.content
|
||||
// untouched so a concurrent content write (e.g. encryption) is preserved.
|
||||
oldExt := config.Entry.GetExtended()
|
||||
newExt := nextConfig.Entry.Extended
|
||||
set := make(map[string][]byte)
|
||||
for k, v := range newExt {
|
||||
if ov, ok := oldExt[k]; !ok || !bytes.Equal(ov, v) {
|
||||
set[k] = v
|
||||
}
|
||||
}
|
||||
var del []string
|
||||
for k := range oldExt {
|
||||
if _, ok := newExt[k]; !ok {
|
||||
del = append(del, k)
|
||||
}
|
||||
}
|
||||
glog.V(3).Infof("updateBucketConfig: patching %d/%d extended keys for bucket %s", len(set), len(del), bucket)
|
||||
if err := s3a.patchBucketEntry(bucket, &filer_pb.ObjectMutation{SetExtended: set, DeleteExtended: del}); err != nil {
|
||||
glog.Errorf("updateBucketConfig: failed to patch bucket entry for %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
glog.V(3).Infof("updateBucketConfig: saved entry to filer for bucket %s", bucket)
|
||||
|
||||
// Update cache. Re-derive every Extended-backed field from the
|
||||
// just-saved Entry — the user's update fn may have flipped, added,
|
||||
// or cleared bytes (e.g. PutBucketLifecycle / DeleteBucketLifecycle
|
||||
// rewrites the lifecycle XML key) and the resolver / parsed configs
|
||||
// must follow.
|
||||
s3a.populateBucketConfigDerivedFields(nextConfig)
|
||||
s3a.bucketConfigCache.Set(bucket, nextConfig)
|
||||
// Invalidate rather than cache nextConfig: its content may be stale relative
|
||||
// to a concurrent content write. The next read re-fetches the merged entry.
|
||||
if s3a.bucketConfigCache != nil {
|
||||
s3a.bucketConfigCache.Remove(bucket)
|
||||
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
|
||||
}
|
||||
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
// patchBucketEntry applies a field-level PATCH_EXTENDED mutation to the bucket's
|
||||
// entry via ObjectTransaction, routed to the bucket's owner filer so its per-path
|
||||
// lock serializes concurrent config writes cluster-wide rather than racing
|
||||
// whole-entry rewrites. A nil/empty mutation is a no-op.
|
||||
func (s3a *S3ApiServer) patchBucketEntry(bucket string, m *filer_pb.ObjectMutation) error {
|
||||
if m == nil || (len(m.SetExtended) == 0 && len(m.DeleteExtended) == 0 && !m.SetContent) {
|
||||
return nil
|
||||
}
|
||||
dir := s3a.option.BucketsPath
|
||||
bucketPath := dir + "/" + bucket
|
||||
m.Type = filer_pb.ObjectMutation_PATCH_EXTENDED
|
||||
m.Directory = dir
|
||||
m.Name = bucket
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: bucketPath,
|
||||
Mutations: []*filer_pb.ObjectMutation{m},
|
||||
}
|
||||
txn := func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.ObjectTransaction(context.Background(), req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return fmt.Errorf("patch bucket %s: %s", bucket, resp.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if s3a.objectWriteLockClient != nil {
|
||||
if owner := s3a.objectWriteLockClient.PrimaryForKey("s3.object.write:" + bucketPath); owner != "" {
|
||||
return pb.WithFilerClient(false, 0, owner, s3a.option.GrpcDialOption, txn)
|
||||
}
|
||||
}
|
||||
return s3a.WithFilerClient(false, txn)
|
||||
}
|
||||
|
||||
func cloneBucketConfig(config *BucketConfig) *BucketConfig {
|
||||
if config == nil {
|
||||
return nil
|
||||
@@ -1070,27 +1119,11 @@ func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadat
|
||||
return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
|
||||
}
|
||||
|
||||
// Update the bucket entry with new content
|
||||
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// Get current bucket entry
|
||||
entry, err := s3a.getBucketEntry(bucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
||||
}
|
||||
if entry == nil {
|
||||
return fmt.Errorf("bucket directory not found %s", bucket)
|
||||
}
|
||||
|
||||
// Update content with metadata
|
||||
entry.Content = metadataBytes
|
||||
|
||||
request := &filer_pb.UpdateEntryRequest{
|
||||
Directory: s3a.bucketRoot(bucket),
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
_, err = client.UpdateEntry(context.Background(), request)
|
||||
return err
|
||||
// Patch only Entry.content so a concurrent extended-attribute write
|
||||
// (e.g. versioning) is preserved.
|
||||
err = s3a.patchBucketEntry(bucket, &filer_pb.ObjectMutation{
|
||||
SetContent: true,
|
||||
Content: metadataBytes,
|
||||
})
|
||||
|
||||
// Invalidate cache after successful update
|
||||
|
||||
Reference in New Issue
Block a user