mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-31 05:56:21 +00:00
s3: route versioned PutObject finalize off the DLM (#9631)
s3: route versioned PutObject finalize off the distributed lock A versioned write's finalize (flip the .versions pointer to the newest version, demote the prior latest) now runs as a single RECOMPUTE_LATEST ObjectTransaction on the object's owner filer, under its per-path lock, instead of the unserialized updateLatestVersionInDirectory. The version file is written first; the owner re-derives the pointer by scanning the directory. RECOMPUTE_LATEST gains size_to_key / mtime_to_key to cache the chosen version's size and mtime on the pointer, and demote_key / demote_value to stamp the displaced prior latest (NoncurrentSinceNs for lifecycle) when the pointer moves. Falls back to updateLatestVersionInDirectory when no owner is known yet.
This commit is contained in:
@@ -329,6 +329,10 @@ message Recompute {
|
||||
bool descending = 2; // pick the child that sorts last by name (else first)
|
||||
map<string, string> copy_extended = 3; // pointer extended key -> source extended key on the chosen child
|
||||
string name_to_key = 4; // if set, store the chosen child's name under this pointer key
|
||||
string size_to_key = 5; // if set, store the chosen child's FileSize (decimal) under this pointer key
|
||||
string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key
|
||||
string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes
|
||||
bytes demote_value = 8; // value for demote_key
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
|
||||
@@ -329,6 +329,10 @@ message Recompute {
|
||||
bool descending = 2; // pick the child that sorts last by name (else first)
|
||||
map<string, string> copy_extended = 3; // pointer extended key -> source extended key on the chosen child
|
||||
string name_to_key = 4; // if set, store the chosen child's name under this pointer key
|
||||
string size_to_key = 5; // if set, store the chosen child's FileSize (decimal) under this pointer key
|
||||
string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key
|
||||
string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes
|
||||
bytes demote_value = 8; // value for demote_key
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
|
||||
@@ -1555,6 +1555,10 @@ type Recompute struct {
|
||||
Descending bool `protobuf:"varint,2,opt,name=descending,proto3" json:"descending,omitempty"` // pick the child that sorts last by name (else first)
|
||||
CopyExtended map[string]string `protobuf:"bytes,3,rep,name=copy_extended,json=copyExtended,proto3" json:"copy_extended,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // pointer extended key -> source extended key on the chosen child
|
||||
NameToKey string `protobuf:"bytes,4,opt,name=name_to_key,json=nameToKey,proto3" json:"name_to_key,omitempty"` // if set, store the chosen child's name under this pointer key
|
||||
SizeToKey string `protobuf:"bytes,5,opt,name=size_to_key,json=sizeToKey,proto3" json:"size_to_key,omitempty"` // if set, store the chosen child's FileSize (decimal) under this pointer key
|
||||
MtimeToKey string `protobuf:"bytes,6,opt,name=mtime_to_key,json=mtimeToKey,proto3" json:"mtime_to_key,omitempty"` // if set, store the chosen child's Mtime (decimal) under this pointer key
|
||||
DemoteKey string `protobuf:"bytes,7,opt,name=demote_key,json=demoteKey,proto3" json:"demote_key,omitempty"` // if set, stamp demote_value on the prior name_to_key target when it changes
|
||||
DemoteValue []byte `protobuf:"bytes,8,opt,name=demote_value,json=demoteValue,proto3" json:"demote_value,omitempty"` // value for demote_key
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -1617,6 +1621,34 @@ func (x *Recompute) GetNameToKey() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Recompute) GetSizeToKey() string {
|
||||
if x != nil {
|
||||
return x.SizeToKey
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Recompute) GetMtimeToKey() string {
|
||||
if x != nil {
|
||||
return x.MtimeToKey
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Recompute) GetDemoteKey() string {
|
||||
if x != nil {
|
||||
return x.DemoteKey
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Recompute) GetDemoteValue() []byte {
|
||||
if x != nil {
|
||||
return x.DemoteValue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
// respect to other writers of the same object, by holding the filer's per-path
|
||||
// lock on lock_key for the whole transaction. The optional condition is checked
|
||||
@@ -6354,14 +6386,20 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\x06DELETE\x10\x01\x12\x12\n" +
|
||||
"\x0ePATCH_EXTENDED\x10\x02\x12\x14\n" +
|
||||
"\x10RECOMPUTE_LATEST\x10\x03\"\xf3\x01\n" +
|
||||
"\x10RECOMPUTE_LATEST\x10\x03\"\xf7\x02\n" +
|
||||
"\tRecompute\x12\x19\n" +
|
||||
"\bscan_dir\x18\x01 \x01(\tR\ascanDir\x12\x1e\n" +
|
||||
"\n" +
|
||||
"descending\x18\x02 \x01(\bR\n" +
|
||||
"descending\x12J\n" +
|
||||
"\rcopy_extended\x18\x03 \x03(\v2%.filer_pb.Recompute.CopyExtendedEntryR\fcopyExtended\x12\x1e\n" +
|
||||
"\vname_to_key\x18\x04 \x01(\tR\tnameToKey\x1a?\n" +
|
||||
"\vname_to_key\x18\x04 \x01(\tR\tnameToKey\x12\x1e\n" +
|
||||
"\vsize_to_key\x18\x05 \x01(\tR\tsizeToKey\x12 \n" +
|
||||
"\fmtime_to_key\x18\x06 \x01(\tR\n" +
|
||||
"mtimeToKey\x12\x1d\n" +
|
||||
"\n" +
|
||||
"demote_key\x18\a \x01(\tR\tdemoteKey\x12!\n" +
|
||||
"\fdemote_value\x18\b \x01(\fR\vdemoteValue\x1a?\n" +
|
||||
"\x11CopyExtendedEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
|
||||
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xf8\x01\n" +
|
||||
|
||||
@@ -1482,13 +1482,8 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
|
||||
// Versioned bucket: resolver returns 0 by construction. Pass 0
|
||||
// directly — versioned objects sit on regular volumes and the
|
||||
// lifecycle worker handles their expiration.
|
||||
etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0, func(versionEntry *filer_pb.Entry) s3err.ErrorCode {
|
||||
if err := s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry); err != nil {
|
||||
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
return s3err.ErrNone
|
||||
}, false)
|
||||
etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0,
|
||||
s3a.versionedAfterCreate(bucket, normalizedObject, versionId, versionFileName, useInvertedFormat), true)
|
||||
if errCode != s3err.ErrNone {
|
||||
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
|
||||
return "", "", errCode, SSEResponseMetadata{}
|
||||
|
||||
@@ -15,23 +15,25 @@ import (
|
||||
)
|
||||
|
||||
// routableWriteOwner returns the owner filer for an object's writes, or "" to
|
||||
// keep them on the distributed lock. Versioned and object-lock buckets stay on
|
||||
// the lock (handled by later routing PRs); any lookup error also falls back.
|
||||
// keep them on the distributed lock. All writes to one object (versioned,
|
||||
// suspended, non-versioned) share the owner; object-lock buckets stay on the
|
||||
// lock until WORM-guard routing. Any lookup error also falls back.
|
||||
func (s3a *S3ApiServer) routableWriteOwner(bucket, object string) pb.ServerAddress {
|
||||
if object == "" || s3a.objectWriteLockClient == nil {
|
||||
return ""
|
||||
}
|
||||
if configured, err := s3a.isVersioningConfigured(bucket); err != nil || configured {
|
||||
return ""
|
||||
}
|
||||
if locked, err := s3a.isObjectLockEnabled(bucket); err != nil || locked {
|
||||
return ""
|
||||
}
|
||||
return s3a.objectWriteLockClient.PrimaryForKey(fmt.Sprintf("s3.object.write:%s", s3a.toFilerPath(bucket, object)))
|
||||
}
|
||||
|
||||
// routedObjectOwner resolves the owner for the unversioned DELETE fast path.
|
||||
// routedObjectOwner is routableWriteOwner restricted to non-versioned buckets,
|
||||
// for the unversioned DELETE fast path.
|
||||
func (s3a *S3ApiServer) routedObjectOwner(bucket, object string) (pb.ServerAddress, bool) {
|
||||
if configured, err := s3a.isVersioningConfigured(bucket); err != nil || configured {
|
||||
return "", false
|
||||
}
|
||||
owner := s3a.routableWriteOwner(bucket, object)
|
||||
return owner, owner != ""
|
||||
}
|
||||
|
||||
86
weed/s3api/s3api_object_versioned_finalize.go
Normal file
86
weed/s3api/s3api_object_versioned_finalize.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// objectWriteOwner resolves the filer that owns all of an object's writes,
|
||||
// regardless of versioning state, or "" when no ring view is available. Normal,
|
||||
// suspended, and versioned writes to the same object hash to one owner and
|
||||
// serialize on its per-path lock.
|
||||
func (s3a *S3ApiServer) objectWriteOwner(bucket, object string) pb.ServerAddress {
|
||||
if s3a.objectWriteLockClient == nil {
|
||||
return ""
|
||||
}
|
||||
return s3a.objectWriteLockClient.PrimaryForKey("s3.object.write:" + s3a.toFilerPath(bucket, object))
|
||||
}
|
||||
|
||||
// routedVersionedFinalize flips the .versions pointer to the newest version and
|
||||
// demotes the prior latest, atomically under the object's per-path lock on the
|
||||
// owner filer, via a single RECOMPUTE_LATEST. The version file is already
|
||||
// written; the owner re-derives the pointer by scanning the directory.
|
||||
func (s3a *S3ApiServer) routedVersionedFinalize(owner pb.ServerAddress, bucket, object string, useInvertedFormat bool) s3err.ErrorCode {
|
||||
versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder)
|
||||
vdir, vname := util.FullPath(versionsPath).DirAndName()
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: s3a.toFilerPath(bucket, object),
|
||||
Mutations: []*filer_pb.ObjectMutation{{
|
||||
Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST,
|
||||
Directory: vdir,
|
||||
Name: vname,
|
||||
Recompute: &filer_pb.Recompute{
|
||||
ScanDir: versionsPath,
|
||||
// Inverted ids sort newest-first, so the newest is the first
|
||||
// ascending entry; legacy ids sort oldest-first (scan to the last).
|
||||
Descending: !useInvertedFormat,
|
||||
NameToKey: s3_constants.ExtLatestVersionFileNameKey,
|
||||
SizeToKey: s3_constants.ExtLatestVersionSizeKey,
|
||||
MtimeToKey: s3_constants.ExtLatestVersionMtimeKey,
|
||||
CopyExtended: map[string]string{
|
||||
s3_constants.ExtLatestVersionIdKey: s3_constants.ExtVersionIdKey,
|
||||
s3_constants.ExtLatestVersionETagKey: s3_constants.ExtETagKey,
|
||||
s3_constants.ExtLatestVersionOwnerKey: s3_constants.ExtAmzOwnerKey,
|
||||
s3_constants.ExtLatestVersionIsDeleteMarker: s3_constants.ExtDeleteMarkerKey,
|
||||
},
|
||||
DemoteKey: s3_constants.ExtNoncurrentSinceNsKey,
|
||||
DemoteValue: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
|
||||
},
|
||||
}},
|
||||
}
|
||||
resp, err := s3a.objectTxnOnFiler(owner, req)
|
||||
switch {
|
||||
case err != nil:
|
||||
glog.Errorf("routedVersionedFinalize: %s/%s on %s: %v", bucket, object, owner, err)
|
||||
return s3err.ErrInternalError
|
||||
case resp.Error != "":
|
||||
glog.Errorf("routedVersionedFinalize: %s/%s: %s", bucket, object, resp.Error)
|
||||
return s3err.ErrInternalError
|
||||
default:
|
||||
return s3err.ErrNone
|
||||
}
|
||||
}
|
||||
|
||||
// versionedAfterCreate returns the putToFiler hook that finalizes a versioned
|
||||
// write: the routed RECOMPUTE_LATEST when the owner is known, else the existing
|
||||
// lock-free updateLatestVersionInDirectory.
|
||||
func (s3a *S3ApiServer) versionedAfterCreate(bucket, object, versionId, versionFileName string, useInvertedFormat bool) func(*filer_pb.Entry) s3err.ErrorCode {
|
||||
owner := s3a.objectWriteOwner(bucket, object)
|
||||
return func(versionEntry *filer_pb.Entry) s3err.ErrorCode {
|
||||
if owner != "" {
|
||||
return s3a.routedVersionedFinalize(owner, bucket, object, useInvertedFormat)
|
||||
}
|
||||
if err := s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName, versionEntry); err != nil {
|
||||
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
return s3err.ErrNone
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
@@ -411,6 +412,12 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj
|
||||
pointer.Extended = make(map[string][]byte)
|
||||
}
|
||||
|
||||
// Remember the prior chosen child so it can be demoted once the pointer moves.
|
||||
var priorName string
|
||||
if rc.NameToKey != "" {
|
||||
priorName = string(pointer.Extended[rc.NameToKey])
|
||||
}
|
||||
|
||||
// The store streams entries ascending by name. For the lowest-name pick we
|
||||
// only need the first entry, so cap the listing at one; for the highest-name
|
||||
// pick we must scan all and keep the last (the store has no reverse order).
|
||||
@@ -427,12 +434,15 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj
|
||||
return listErr
|
||||
}
|
||||
|
||||
cleared := []string{rc.NameToKey, rc.SizeToKey, rc.MtimeToKey}
|
||||
if chosen == nil {
|
||||
for pointerKey := range rc.CopyExtended {
|
||||
delete(pointer.Extended, pointerKey)
|
||||
}
|
||||
if rc.NameToKey != "" {
|
||||
delete(pointer.Extended, rc.NameToKey)
|
||||
for _, k := range cleared {
|
||||
if k != "" {
|
||||
delete(pointer.Extended, k)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for pointerKey, sourceKey := range rc.CopyExtended {
|
||||
@@ -445,9 +455,39 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj
|
||||
if rc.NameToKey != "" {
|
||||
pointer.Extended[rc.NameToKey] = []byte(chosen.Name())
|
||||
}
|
||||
if rc.SizeToKey != "" {
|
||||
pointer.Extended[rc.SizeToKey] = []byte(strconv.FormatUint(chosen.FileSize, 10))
|
||||
}
|
||||
if rc.MtimeToKey != "" {
|
||||
pointer.Extended[rc.MtimeToKey] = []byte(strconv.FormatInt(chosen.Mtime.Unix(), 10))
|
||||
}
|
||||
}
|
||||
|
||||
return fs.filer.UpdateEntry(ctx, pointer, pointer)
|
||||
if err := fs.filer.UpdateEntry(ctx, pointer, pointer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Stamp the displaced prior child (e.g. NoncurrentSinceNs for lifecycle).
|
||||
newName := ""
|
||||
if chosen != nil {
|
||||
newName = chosen.Name()
|
||||
}
|
||||
if rc.DemoteKey != "" && priorName != "" && priorName != newName {
|
||||
priorEntry, perr := fs.filer.FindEntry(ctx, util.NewFullPath(rc.ScanDir, priorName))
|
||||
if perr == filer_pb.ErrNotFound {
|
||||
return nil
|
||||
}
|
||||
if perr != nil {
|
||||
return perr
|
||||
}
|
||||
if priorEntry.Extended == nil {
|
||||
priorEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
priorEntry.Extended[rc.DemoteKey] = rc.DemoteValue
|
||||
return fs.filer.UpdateEntry(ctx, priorEntry, priorEntry)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
||||
|
||||
@@ -2,6 +2,7 @@ package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -431,3 +432,61 @@ func TestObjectTransactionIdempotentNoops(t *testing.T) {
|
||||
t.Fatalf("no-op mutations should not error: %q", resp.Error)
|
||||
}
|
||||
}
|
||||
|
||||
// RECOMPUTE_LATEST copies the chosen child's size/mtime to the pointer and
|
||||
// stamps the demote key on the prior latest when the pointer moves.
|
||||
func TestObjectTransactionRecomputeDemoteAndAttrs(t *testing.T) {
|
||||
t0 := time.Unix(1700000000, 0)
|
||||
t1 := time.Unix(1700000100, 0)
|
||||
mk := func(inode uint64, mt time.Time, size uint64, id string) *filer.Entry {
|
||||
return &filer.Entry{
|
||||
Attr: filer.Attr{Inode: inode, Mtime: mt, Crtime: mt, Mode: 0644, FileSize: size},
|
||||
Extended: map[string][]byte{"vid": []byte(id)},
|
||||
}
|
||||
}
|
||||
fs, store := newTxnTestServer(map[string]*filer.Entry{
|
||||
"/buckets/b/obj/.versions": {
|
||||
Attr: filer.Attr{Inode: 2, Mtime: t0, Crtime: t0, Mode: 0755 | (1 << 31)},
|
||||
Extended: map[string][]byte{"latestName": []byte("v1.ver"), "latestVid": []byte("v1")},
|
||||
},
|
||||
"/buckets/b/obj/.versions/v1.ver": mk(10, t0, 100, "v1"),
|
||||
"/buckets/b/obj/.versions/v2.ver": mk(11, t1, 250, "v2"),
|
||||
})
|
||||
|
||||
resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
Mutations: []*filer_pb.ObjectMutation{{
|
||||
Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions",
|
||||
Recompute: &filer_pb.Recompute{
|
||||
ScanDir: "/buckets/b/obj/.versions",
|
||||
Descending: true,
|
||||
CopyExtended: map[string]string{"latestVid": "vid"},
|
||||
NameToKey: "latestName",
|
||||
SizeToKey: "latestSize",
|
||||
MtimeToKey: "latestMtime",
|
||||
DemoteKey: "noncurrentSince",
|
||||
DemoteValue: []byte("999"),
|
||||
},
|
||||
}},
|
||||
})
|
||||
if err != nil || resp.Error != "" {
|
||||
t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error)
|
||||
}
|
||||
|
||||
ptr := store.entries["/buckets/b/obj/.versions"].Extended
|
||||
if string(ptr["latestName"]) != "v2.ver" || string(ptr["latestVid"]) != "v2" {
|
||||
t.Fatalf("pointer not moved to v2: name=%s vid=%s", ptr["latestName"], ptr["latestVid"])
|
||||
}
|
||||
if string(ptr["latestSize"]) != "250" {
|
||||
t.Errorf("latestSize = %s, want 250", ptr["latestSize"])
|
||||
}
|
||||
if want := strconv.FormatInt(t1.Unix(), 10); string(ptr["latestMtime"]) != want {
|
||||
t.Errorf("latestMtime = %s, want %s", ptr["latestMtime"], want)
|
||||
}
|
||||
if got := store.entries["/buckets/b/obj/.versions/v1.ver"].Extended["noncurrentSince"]; string(got) != "999" {
|
||||
t.Errorf("prior latest v1.ver noncurrentSince = %q, want 999", got)
|
||||
}
|
||||
if _, ok := store.entries["/buckets/b/obj/.versions/v2.ver"].Extended["noncurrentSince"]; ok {
|
||||
t.Errorf("new latest v2.ver should not be demoted")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user