diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 988e5f06b..983f20756 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -329,6 +329,10 @@ message Recompute { bool descending = 2; // pick the child that sorts last by name (else first) map copy_extended = 3; // pointer extended key -> source extended key on the chosen child string name_to_key = 4; // if set, store the chosen child's name under this pointer key + string size_to_key = 5; // if set, store the chosen child's FileSize (decimal) under this pointer key + string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key + string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes + bytes demote_value = 8; // value for demote_key } // ObjectTransactionRequest applies an ordered list of mutations atomically with diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 988e5f06b..983f20756 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -329,6 +329,10 @@ message Recompute { bool descending = 2; // pick the child that sorts last by name (else first) map copy_extended = 3; // pointer extended key -> source extended key on the chosen child string name_to_key = 4; // if set, store the chosen child's name under this pointer key + string size_to_key = 5; // if set, store the chosen child's FileSize (decimal) under this pointer key + string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key + string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes + bytes demote_value = 8; // value for demote_key } // ObjectTransactionRequest applies an ordered list of mutations atomically with diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index f0b2bbfd8..bcec04359 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1555,6 +1555,10 @@ type Recompute struct { Descending bool `protobuf:"varint,2,opt,name=descending,proto3" json:"descending,omitempty"` // pick the child that sorts last by name (else first) CopyExtended map[string]string `protobuf:"bytes,3,rep,name=copy_extended,json=copyExtended,proto3" json:"copy_extended,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // pointer extended key -> source extended key on the chosen child NameToKey string `protobuf:"bytes,4,opt,name=name_to_key,json=nameToKey,proto3" json:"name_to_key,omitempty"` // if set, store the chosen child's name under this pointer key + SizeToKey string `protobuf:"bytes,5,opt,name=size_to_key,json=sizeToKey,proto3" json:"size_to_key,omitempty"` // if set, store the chosen child's FileSize (decimal) under this pointer key + MtimeToKey string `protobuf:"bytes,6,opt,name=mtime_to_key,json=mtimeToKey,proto3" json:"mtime_to_key,omitempty"` // if set, store the chosen child's Mtime (decimal) under this pointer key + DemoteKey string `protobuf:"bytes,7,opt,name=demote_key,json=demoteKey,proto3" json:"demote_key,omitempty"` // if set, stamp demote_value on the prior name_to_key target when it changes + DemoteValue []byte `protobuf:"bytes,8,opt,name=demote_value,json=demoteValue,proto3" json:"demote_value,omitempty"` // value for demote_key unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1617,6 +1621,34 @@ func (x *Recompute) GetNameToKey() string { return "" } +func (x *Recompute) GetSizeToKey() string { + if x != nil { + return x.SizeToKey + } + return "" +} + +func (x *Recompute) GetMtimeToKey() string { + if x != nil { + return x.MtimeToKey + } + return "" +} + +func (x *Recompute) GetDemoteKey() string { + if x != nil { + return x.DemoteKey + } + return "" +} + +func (x *Recompute) GetDemoteValue() []byte { + if x != nil { + return x.DemoteValue + } + return nil +} + // ObjectTransactionRequest applies an ordered list of mutations atomically with // respect to other writers of the same object, by holding the filer's per-path // lock on lock_key for the whole transaction. The optional condition is checked @@ -6354,14 +6386,20 @@ const file_filer_proto_rawDesc = "" + "\n" + "\x06DELETE\x10\x01\x12\x12\n" + "\x0ePATCH_EXTENDED\x10\x02\x12\x14\n" + - "\x10RECOMPUTE_LATEST\x10\x03\"\xf3\x01\n" + + "\x10RECOMPUTE_LATEST\x10\x03\"\xf7\x02\n" + "\tRecompute\x12\x19\n" + "\bscan_dir\x18\x01 \x01(\tR\ascanDir\x12\x1e\n" + "\n" + "descending\x18\x02 \x01(\bR\n" + "descending\x12J\n" + "\rcopy_extended\x18\x03 \x03(\v2%.filer_pb.Recompute.CopyExtendedEntryR\fcopyExtended\x12\x1e\n" + - "\vname_to_key\x18\x04 \x01(\tR\tnameToKey\x1a?\n" + + "\vname_to_key\x18\x04 \x01(\tR\tnameToKey\x12\x1e\n" + + "\vsize_to_key\x18\x05 \x01(\tR\tsizeToKey\x12 \n" + + "\fmtime_to_key\x18\x06 \x01(\tR\n" + + "mtimeToKey\x12\x1d\n" + + "\n" + + "demote_key\x18\a \x01(\tR\tdemoteKey\x12!\n" + + "\fdemote_value\x18\b \x01(\fR\vdemoteValue\x1a?\n" + "\x11CopyExtendedEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xf8\x01\n" + diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 7ad2b0f11..8605450d1 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1482,13 +1482,8 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Versioned bucket: resolver returns 0 by construction. Pass 0 // directly — versioned objects sit on regular volumes and the // lifecycle worker handles their expiration. - etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0, func(versionEntry *filer_pb.Entry) s3err.ErrorCode { - if err := s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry); err != nil { - glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return s3err.ErrInternalError - } - return s3err.ErrNone - }, false) + etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0, + s3a.versionedAfterCreate(bucket, normalizedObject, versionId, versionFileName, useInvertedFormat), true) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) return "", "", errCode, SSEResponseMetadata{} diff --git a/weed/s3api/s3api_object_routed_write.go b/weed/s3api/s3api_object_routed_write.go index 947e4f1b9..80b1162ca 100644 --- a/weed/s3api/s3api_object_routed_write.go +++ b/weed/s3api/s3api_object_routed_write.go @@ -15,23 +15,25 @@ import ( ) // routableWriteOwner returns the owner filer for an object's writes, or "" to -// keep them on the distributed lock. Versioned and object-lock buckets stay on -// the lock (handled by later routing PRs); any lookup error also falls back. +// keep them on the distributed lock. All writes to one object (versioned, +// suspended, non-versioned) share the owner; object-lock buckets stay on the +// lock until WORM-guard routing. Any lookup error also falls back. func (s3a *S3ApiServer) routableWriteOwner(bucket, object string) pb.ServerAddress { if object == "" || s3a.objectWriteLockClient == nil { return "" } - if configured, err := s3a.isVersioningConfigured(bucket); err != nil || configured { - return "" - } if locked, err := s3a.isObjectLockEnabled(bucket); err != nil || locked { return "" } return s3a.objectWriteLockClient.PrimaryForKey(fmt.Sprintf("s3.object.write:%s", s3a.toFilerPath(bucket, object))) } -// routedObjectOwner resolves the owner for the unversioned DELETE fast path. +// routedObjectOwner is routableWriteOwner restricted to non-versioned buckets, +// for the unversioned DELETE fast path. func (s3a *S3ApiServer) routedObjectOwner(bucket, object string) (pb.ServerAddress, bool) { + if configured, err := s3a.isVersioningConfigured(bucket); err != nil || configured { + return "", false + } owner := s3a.routableWriteOwner(bucket, object) return owner, owner != "" } diff --git a/weed/s3api/s3api_object_versioned_finalize.go b/weed/s3api/s3api_object_versioned_finalize.go new file mode 100644 index 000000000..874817aae --- /dev/null +++ b/weed/s3api/s3api_object_versioned_finalize.go @@ -0,0 +1,86 @@ +package s3api + +import ( + "strconv" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// objectWriteOwner resolves the filer that owns all of an object's writes, +// regardless of versioning state, or "" when no ring view is available. Normal, +// suspended, and versioned writes to the same object hash to one owner and +// serialize on its per-path lock. +func (s3a *S3ApiServer) objectWriteOwner(bucket, object string) pb.ServerAddress { + if s3a.objectWriteLockClient == nil { + return "" + } + return s3a.objectWriteLockClient.PrimaryForKey("s3.object.write:" + s3a.toFilerPath(bucket, object)) +} + +// routedVersionedFinalize flips the .versions pointer to the newest version and +// demotes the prior latest, atomically under the object's per-path lock on the +// owner filer, via a single RECOMPUTE_LATEST. The version file is already +// written; the owner re-derives the pointer by scanning the directory. +func (s3a *S3ApiServer) routedVersionedFinalize(owner pb.ServerAddress, bucket, object string, useInvertedFormat bool) s3err.ErrorCode { + versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder) + vdir, vname := util.FullPath(versionsPath).DirAndName() + req := &filer_pb.ObjectTransactionRequest{ + LockKey: s3a.toFilerPath(bucket, object), + Mutations: []*filer_pb.ObjectMutation{{ + Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, + Directory: vdir, + Name: vname, + Recompute: &filer_pb.Recompute{ + ScanDir: versionsPath, + // Inverted ids sort newest-first, so the newest is the first + // ascending entry; legacy ids sort oldest-first (scan to the last). + Descending: !useInvertedFormat, + NameToKey: s3_constants.ExtLatestVersionFileNameKey, + SizeToKey: s3_constants.ExtLatestVersionSizeKey, + MtimeToKey: s3_constants.ExtLatestVersionMtimeKey, + CopyExtended: map[string]string{ + s3_constants.ExtLatestVersionIdKey: s3_constants.ExtVersionIdKey, + s3_constants.ExtLatestVersionETagKey: s3_constants.ExtETagKey, + s3_constants.ExtLatestVersionOwnerKey: s3_constants.ExtAmzOwnerKey, + s3_constants.ExtLatestVersionIsDeleteMarker: s3_constants.ExtDeleteMarkerKey, + }, + DemoteKey: s3_constants.ExtNoncurrentSinceNsKey, + DemoteValue: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), + }, + }}, + } + resp, err := s3a.objectTxnOnFiler(owner, req) + switch { + case err != nil: + glog.Errorf("routedVersionedFinalize: %s/%s on %s: %v", bucket, object, owner, err) + return s3err.ErrInternalError + case resp.Error != "": + glog.Errorf("routedVersionedFinalize: %s/%s: %s", bucket, object, resp.Error) + return s3err.ErrInternalError + default: + return s3err.ErrNone + } +} + +// versionedAfterCreate returns the putToFiler hook that finalizes a versioned +// write: the routed RECOMPUTE_LATEST when the owner is known, else the existing +// lock-free updateLatestVersionInDirectory. +func (s3a *S3ApiServer) versionedAfterCreate(bucket, object, versionId, versionFileName string, useInvertedFormat bool) func(*filer_pb.Entry) s3err.ErrorCode { + owner := s3a.objectWriteOwner(bucket, object) + return func(versionEntry *filer_pb.Entry) s3err.ErrorCode { + if owner != "" { + return s3a.routedVersionedFinalize(owner, bucket, object, useInvertedFormat) + } + if err := s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName, versionEntry); err != nil { + glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) + return s3err.ErrInternalError + } + return s3err.ErrNone + } +} diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 851b80437..8a02c7d85 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -8,6 +8,7 @@ import ( "math" "os" "path/filepath" + "strconv" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -411,6 +412,12 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj pointer.Extended = make(map[string][]byte) } + // Remember the prior chosen child so it can be demoted once the pointer moves. + var priorName string + if rc.NameToKey != "" { + priorName = string(pointer.Extended[rc.NameToKey]) + } + // The store streams entries ascending by name. For the lowest-name pick we // only need the first entry, so cap the listing at one; for the highest-name // pick we must scan all and keep the last (the store has no reverse order). @@ -427,12 +434,15 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj return listErr } + cleared := []string{rc.NameToKey, rc.SizeToKey, rc.MtimeToKey} if chosen == nil { for pointerKey := range rc.CopyExtended { delete(pointer.Extended, pointerKey) } - if rc.NameToKey != "" { - delete(pointer.Extended, rc.NameToKey) + for _, k := range cleared { + if k != "" { + delete(pointer.Extended, k) + } } } else { for pointerKey, sourceKey := range rc.CopyExtended { @@ -445,9 +455,39 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj if rc.NameToKey != "" { pointer.Extended[rc.NameToKey] = []byte(chosen.Name()) } + if rc.SizeToKey != "" { + pointer.Extended[rc.SizeToKey] = []byte(strconv.FormatUint(chosen.FileSize, 10)) + } + if rc.MtimeToKey != "" { + pointer.Extended[rc.MtimeToKey] = []byte(strconv.FormatInt(chosen.Mtime.Unix(), 10)) + } } - return fs.filer.UpdateEntry(ctx, pointer, pointer) + if err := fs.filer.UpdateEntry(ctx, pointer, pointer); err != nil { + return err + } + + // Stamp the displaced prior child (e.g. NoncurrentSinceNs for lifecycle). + newName := "" + if chosen != nil { + newName = chosen.Name() + } + if rc.DemoteKey != "" && priorName != "" && priorName != newName { + priorEntry, perr := fs.filer.FindEntry(ctx, util.NewFullPath(rc.ScanDir, priorName)) + if perr == filer_pb.ErrNotFound { + return nil + } + if perr != nil { + return perr + } + if priorEntry.Extended == nil { + priorEntry.Extended = make(map[string][]byte) + } + priorEntry.Extended[rc.DemoteKey] = rc.DemoteValue + return fs.filer.UpdateEntry(ctx, priorEntry, priorEntry) + } + + return nil } func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { diff --git a/weed/server/filer_grpc_server_object_txn_test.go b/weed/server/filer_grpc_server_object_txn_test.go index d47eebf48..496d0cb60 100644 --- a/weed/server/filer_grpc_server_object_txn_test.go +++ b/weed/server/filer_grpc_server_object_txn_test.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "strconv" "testing" "time" @@ -431,3 +432,61 @@ func TestObjectTransactionIdempotentNoops(t *testing.T) { t.Fatalf("no-op mutations should not error: %q", resp.Error) } } + +// RECOMPUTE_LATEST copies the chosen child's size/mtime to the pointer and +// stamps the demote key on the prior latest when the pointer moves. +func TestObjectTransactionRecomputeDemoteAndAttrs(t *testing.T) { + t0 := time.Unix(1700000000, 0) + t1 := time.Unix(1700000100, 0) + mk := func(inode uint64, mt time.Time, size uint64, id string) *filer.Entry { + return &filer.Entry{ + Attr: filer.Attr{Inode: inode, Mtime: mt, Crtime: mt, Mode: 0644, FileSize: size}, + Extended: map[string][]byte{"vid": []byte(id)}, + } + } + fs, store := newTxnTestServer(map[string]*filer.Entry{ + "/buckets/b/obj/.versions": { + Attr: filer.Attr{Inode: 2, Mtime: t0, Crtime: t0, Mode: 0755 | (1 << 31)}, + Extended: map[string][]byte{"latestName": []byte("v1.ver"), "latestVid": []byte("v1")}, + }, + "/buckets/b/obj/.versions/v1.ver": mk(10, t0, 100, "v1"), + "/buckets/b/obj/.versions/v2.ver": mk(11, t1, 250, "v2"), + }) + + resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{ + LockKey: "/buckets/b/obj", + Mutations: []*filer_pb.ObjectMutation{{ + Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions", + Recompute: &filer_pb.Recompute{ + ScanDir: "/buckets/b/obj/.versions", + Descending: true, + CopyExtended: map[string]string{"latestVid": "vid"}, + NameToKey: "latestName", + SizeToKey: "latestSize", + MtimeToKey: "latestMtime", + DemoteKey: "noncurrentSince", + DemoteValue: []byte("999"), + }, + }}, + }) + if err != nil || resp.Error != "" { + t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error) + } + + ptr := store.entries["/buckets/b/obj/.versions"].Extended + if string(ptr["latestName"]) != "v2.ver" || string(ptr["latestVid"]) != "v2" { + t.Fatalf("pointer not moved to v2: name=%s vid=%s", ptr["latestName"], ptr["latestVid"]) + } + if string(ptr["latestSize"]) != "250" { + t.Errorf("latestSize = %s, want 250", ptr["latestSize"]) + } + if want := strconv.FormatInt(t1.Unix(), 10); string(ptr["latestMtime"]) != want { + t.Errorf("latestMtime = %s, want %s", ptr["latestMtime"], want) + } + if got := store.entries["/buckets/b/obj/.versions/v1.ver"].Extended["noncurrentSince"]; string(got) != "999" { + t.Errorf("prior latest v1.ver noncurrentSince = %q, want 999", got) + } + if _, ok := store.entries["/buckets/b/obj/.versions/v2.ver"].Extended["noncurrentSince"]; ok { + t.Errorf("new latest v2.ver should not be demoted") + } +}