mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-30 13:36:23 +00:00
filer: add ObjectTransaction for atomic multi-entry object writes (#9646)
A versioned object write touches several entries that must change together: the main object, a delete marker or version file, and the latest pointer on the .versions directory. Holding a distributed lock across separate RPCs to do this is what the per-path lock was meant to replace, but a single CreateEntry only covers one entry. Add ObjectTransaction: a request carries a lock_key (the object path), an optional WriteCondition, and an ordered list of mutations (PUT / DELETE / PATCH_EXTENDED). The filer holds the per-path lock on lock_key for the whole call, checks the condition against the entry at lock_key, then applies the mutations in order. Callers route the object's writes to its owner filer so the lock is authoritative across all of the object's entries. DELETE and PATCH of an absent entry are no-ops, so a replayed transaction is idempotent. PUT entries are metadata-scoped; data-bearing writes (chunks) are written before the transaction, as today.
This commit is contained in:
@@ -31,6 +31,9 @@ service SeaweedFiler {
|
||||
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
|
||||
}
|
||||
|
||||
rpc ObjectTransaction (ObjectTransactionRequest) returns (ObjectTransactionResponse) {
|
||||
}
|
||||
|
||||
rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
|
||||
}
|
||||
rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) {
|
||||
@@ -271,6 +274,46 @@ enum FilerError {
|
||||
PRECONDITION_FAILED = 6; // WriteCondition not satisfied
|
||||
}
|
||||
|
||||
// ObjectMutation is one entry-level change applied by ObjectTransaction. All
|
||||
// mutations of a transaction run under a single per-path lock (the request's
|
||||
// lock_key) and in order, so the gateway can describe a multi-entry object
|
||||
// operation as one request instead of holding a distributed lock across
|
||||
// several RPCs. Data-bearing writes (entries with chunks) should be written
|
||||
// before the transaction; mutations here are metadata-scoped.
|
||||
message ObjectMutation {
|
||||
enum Type {
|
||||
PUT = 0; // create or replace the entry (entry field)
|
||||
DELETE = 1; // delete the entry at directory/name (no error if absent)
|
||||
PATCH_EXTENDED = 2; // merge set_extended / remove delete_extended on the entry
|
||||
}
|
||||
Type type = 1;
|
||||
string directory = 2;
|
||||
string name = 3; // entry name for DELETE / PATCH_EXTENDED
|
||||
Entry entry = 4; // full entry for PUT
|
||||
map<string, bytes> set_extended = 5; // PATCH_EXTENDED: keys to set
|
||||
repeated string delete_extended = 6; // PATCH_EXTENDED: keys to remove
|
||||
bool is_delete_data = 7; // DELETE: also delete chunk data
|
||||
bool is_recursive = 8; // DELETE: recurse into a directory
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
// respect to other writers of the same object, by holding the filer's per-path
|
||||
// lock on lock_key for the whole transaction. The optional condition is checked
|
||||
// first, against the entry at lock_key. Callers must route the object's writes
|
||||
// to its owner filer for the lock to be authoritative.
|
||||
message ObjectTransactionRequest {
|
||||
string lock_key = 1; // object path to lock and to evaluate the condition against
|
||||
WriteCondition condition = 2; // optional precondition, checked under the lock
|
||||
repeated ObjectMutation mutations = 3;
|
||||
bool is_from_other_cluster = 4;
|
||||
repeated int32 signatures = 5;
|
||||
}
|
||||
|
||||
message ObjectTransactionResponse {
|
||||
string error = 1;
|
||||
FilerError error_code = 2;
|
||||
}
|
||||
|
||||
message CreateEntryResponse {
|
||||
string error = 1; // kept for human readability + backward compat
|
||||
SubscribeMetadataResponse metadata_event = 2;
|
||||
|
||||
@@ -31,6 +31,9 @@ service SeaweedFiler {
|
||||
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
|
||||
}
|
||||
|
||||
rpc ObjectTransaction (ObjectTransactionRequest) returns (ObjectTransactionResponse) {
|
||||
}
|
||||
|
||||
rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
|
||||
}
|
||||
rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) {
|
||||
@@ -271,6 +274,46 @@ enum FilerError {
|
||||
PRECONDITION_FAILED = 6; // WriteCondition not satisfied
|
||||
}
|
||||
|
||||
// ObjectMutation is one entry-level change applied by ObjectTransaction. All
|
||||
// mutations of a transaction run under a single per-path lock (the request's
|
||||
// lock_key) and in order, so the gateway can describe a multi-entry object
|
||||
// operation as one request instead of holding a distributed lock across
|
||||
// several RPCs. Data-bearing writes (entries with chunks) should be written
|
||||
// before the transaction; mutations here are metadata-scoped.
|
||||
message ObjectMutation {
|
||||
enum Type {
|
||||
PUT = 0; // create or replace the entry (entry field)
|
||||
DELETE = 1; // delete the entry at directory/name (no error if absent)
|
||||
PATCH_EXTENDED = 2; // merge set_extended / remove delete_extended on the entry
|
||||
}
|
||||
Type type = 1;
|
||||
string directory = 2;
|
||||
string name = 3; // entry name for DELETE / PATCH_EXTENDED
|
||||
Entry entry = 4; // full entry for PUT
|
||||
map<string, bytes> set_extended = 5; // PATCH_EXTENDED: keys to set
|
||||
repeated string delete_extended = 6; // PATCH_EXTENDED: keys to remove
|
||||
bool is_delete_data = 7; // DELETE: also delete chunk data
|
||||
bool is_recursive = 8; // DELETE: recurse into a directory
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
// respect to other writers of the same object, by holding the filer's per-path
|
||||
// lock on lock_key for the whole transaction. The optional condition is checked
|
||||
// first, against the entry at lock_key. Callers must route the object's writes
|
||||
// to its owner filer for the lock to be authoritative.
|
||||
message ObjectTransactionRequest {
|
||||
string lock_key = 1; // object path to lock and to evaluate the condition against
|
||||
WriteCondition condition = 2; // optional precondition, checked under the lock
|
||||
repeated ObjectMutation mutations = 3;
|
||||
bool is_from_other_cluster = 4;
|
||||
repeated int32 signatures = 5;
|
||||
}
|
||||
|
||||
message ObjectTransactionResponse {
|
||||
string error = 1;
|
||||
FilerError error_code = 2;
|
||||
}
|
||||
|
||||
message CreateEntryResponse {
|
||||
string error = 1; // kept for human readability + backward compat
|
||||
SubscribeMetadataResponse metadata_event = 2;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.2
|
||||
// - protoc v7.34.1
|
||||
// - protoc v6.33.4
|
||||
// source: filer.proto
|
||||
|
||||
package filer_pb
|
||||
@@ -26,6 +26,7 @@ const (
|
||||
SeaweedFiler_TouchAccessTime_FullMethodName = "/filer_pb.SeaweedFiler/TouchAccessTime"
|
||||
SeaweedFiler_AppendToEntry_FullMethodName = "/filer_pb.SeaweedFiler/AppendToEntry"
|
||||
SeaweedFiler_DeleteEntry_FullMethodName = "/filer_pb.SeaweedFiler/DeleteEntry"
|
||||
SeaweedFiler_ObjectTransaction_FullMethodName = "/filer_pb.SeaweedFiler/ObjectTransaction"
|
||||
SeaweedFiler_AtomicRenameEntry_FullMethodName = "/filer_pb.SeaweedFiler/AtomicRenameEntry"
|
||||
SeaweedFiler_StreamRenameEntry_FullMethodName = "/filer_pb.SeaweedFiler/StreamRenameEntry"
|
||||
SeaweedFiler_StreamMutateEntry_FullMethodName = "/filer_pb.SeaweedFiler/StreamMutateEntry"
|
||||
@@ -62,6 +63,7 @@ type SeaweedFilerClient interface {
|
||||
TouchAccessTime(ctx context.Context, in *TouchAccessTimeRequest, opts ...grpc.CallOption) (*TouchAccessTimeResponse, error)
|
||||
AppendToEntry(ctx context.Context, in *AppendToEntryRequest, opts ...grpc.CallOption) (*AppendToEntryResponse, error)
|
||||
DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error)
|
||||
ObjectTransaction(ctx context.Context, in *ObjectTransactionRequest, opts ...grpc.CallOption) (*ObjectTransactionResponse, error)
|
||||
AtomicRenameEntry(ctx context.Context, in *AtomicRenameEntryRequest, opts ...grpc.CallOption) (*AtomicRenameEntryResponse, error)
|
||||
StreamRenameEntry(ctx context.Context, in *StreamRenameEntryRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamRenameEntryResponse], error)
|
||||
StreamMutateEntry(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[StreamMutateEntryRequest, StreamMutateEntryResponse], error)
|
||||
@@ -177,6 +179,16 @@ func (c *seaweedFilerClient) DeleteEntry(ctx context.Context, in *DeleteEntryReq
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedFilerClient) ObjectTransaction(ctx context.Context, in *ObjectTransactionRequest, opts ...grpc.CallOption) (*ObjectTransactionResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ObjectTransactionResponse)
|
||||
err := c.cc.Invoke(ctx, SeaweedFiler_ObjectTransaction_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedFilerClient) AtomicRenameEntry(ctx context.Context, in *AtomicRenameEntryRequest, opts ...grpc.CallOption) (*AtomicRenameEntryResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(AtomicRenameEntryResponse)
|
||||
@@ -457,6 +469,7 @@ type SeaweedFilerServer interface {
|
||||
TouchAccessTime(context.Context, *TouchAccessTimeRequest) (*TouchAccessTimeResponse, error)
|
||||
AppendToEntry(context.Context, *AppendToEntryRequest) (*AppendToEntryResponse, error)
|
||||
DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error)
|
||||
ObjectTransaction(context.Context, *ObjectTransactionRequest) (*ObjectTransactionResponse, error)
|
||||
AtomicRenameEntry(context.Context, *AtomicRenameEntryRequest) (*AtomicRenameEntryResponse, error)
|
||||
StreamRenameEntry(*StreamRenameEntryRequest, grpc.ServerStreamingServer[StreamRenameEntryResponse]) error
|
||||
StreamMutateEntry(grpc.BidiStreamingServer[StreamMutateEntryRequest, StreamMutateEntryResponse]) error
|
||||
@@ -514,6 +527,9 @@ func (UnimplementedSeaweedFilerServer) AppendToEntry(context.Context, *AppendToE
|
||||
func (UnimplementedSeaweedFilerServer) DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method DeleteEntry not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedFilerServer) ObjectTransaction(context.Context, *ObjectTransactionRequest) (*ObjectTransactionResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ObjectTransaction not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedFilerServer) AtomicRenameEntry(context.Context, *AtomicRenameEntryRequest) (*AtomicRenameEntryResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method AtomicRenameEntry not implemented")
|
||||
}
|
||||
@@ -723,6 +739,24 @@ func _SeaweedFiler_DeleteEntry_Handler(srv interface{}, ctx context.Context, dec
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _SeaweedFiler_ObjectTransaction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ObjectTransactionRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(SeaweedFilerServer).ObjectTransaction(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: SeaweedFiler_ObjectTransaction_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(SeaweedFilerServer).ObjectTransaction(ctx, req.(*ObjectTransactionRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _SeaweedFiler_AtomicRenameEntry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(AtomicRenameEntryRequest)
|
||||
if err := dec(in); err != nil {
|
||||
@@ -1129,6 +1163,10 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "DeleteEntry",
|
||||
Handler: _SeaweedFiler_DeleteEntry_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ObjectTransaction",
|
||||
Handler: _SeaweedFiler_ObjectTransaction_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "AtomicRenameEntry",
|
||||
Handler: _SeaweedFiler_AtomicRenameEntry_Handler,
|
||||
|
||||
@@ -237,6 +237,107 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
||||
return
|
||||
}
|
||||
|
||||
// ObjectTransaction applies an ordered list of entry mutations atomically with
|
||||
// respect to other writers of the same object, by holding the per-path lock on
|
||||
// lock_key for the whole call. The optional condition is checked first, against
|
||||
// the entry at lock_key. This lets a caller describe a multi-entry object
|
||||
// operation (e.g. delete the null version + write a delete marker + flip the
|
||||
// latest pointer) as one request, replacing a distributed lock held across
|
||||
// several RPCs. Callers must route the object's writes to its owner filer for
|
||||
// the lock to be authoritative.
|
||||
func (fs *FilerServer) ObjectTransaction(ctx context.Context, req *filer_pb.ObjectTransactionRequest) (*filer_pb.ObjectTransactionResponse, error) {
|
||||
if req.LockKey == "" {
|
||||
return &filer_pb.ObjectTransactionResponse{Error: "lock_key is required"}, nil
|
||||
}
|
||||
|
||||
lockPath := util.FullPath(req.LockKey)
|
||||
pathLock := fs.entryLockTable.AcquireLock("ObjectTransaction", lockPath, util.ExclusiveLock)
|
||||
defer fs.entryLockTable.ReleaseLock(lockPath, pathLock)
|
||||
|
||||
if conditionIsSet(req.Condition) {
|
||||
current, findErr := fs.filer.FindEntry(ctx, lockPath)
|
||||
if findErr != nil && findErr != filer_pb.ErrNotFound {
|
||||
return &filer_pb.ObjectTransactionResponse{}, fmt.Errorf("ObjectTransaction condition %s: %w", lockPath, findErr)
|
||||
}
|
||||
if findErr == filer_pb.ErrNotFound {
|
||||
current = nil
|
||||
}
|
||||
if !writeConditionSatisfied(req.Condition, current) {
|
||||
glog.V(3).InfofCtx(ctx, "ObjectTransaction %s: precondition failed", lockPath)
|
||||
return &filer_pb.ObjectTransactionResponse{
|
||||
Error: "precondition failed",
|
||||
ErrorCode: filer_pb.FilerError_PRECONDITION_FAILED,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
for i, m := range req.Mutations {
|
||||
if err := fs.applyObjectMutation(ctx, m, req.IsFromOtherCluster, req.Signatures); err != nil {
|
||||
glog.V(2).InfofCtx(ctx, "ObjectTransaction %s mutation %d (%v): %v", lockPath, i, m.Type, err)
|
||||
return &filer_pb.ObjectTransactionResponse{Error: fmt.Sprintf("mutation %d: %v", i, err)}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return &filer_pb.ObjectTransactionResponse{}, nil
|
||||
}
|
||||
|
||||
// applyObjectMutation applies a single mutation while the transaction's path
|
||||
// lock is held. PUT entries are expected to be fully prepared by the caller
|
||||
// (chunks resolved); mutations here are metadata-scoped. A DELETE of an absent
|
||||
// entry and a PATCH of an absent entry are no-ops, so transactions are
|
||||
// idempotent on replay.
|
||||
func (fs *FilerServer) applyObjectMutation(ctx context.Context, m *filer_pb.ObjectMutation, fromOtherCluster bool, signatures []int32) error {
|
||||
switch m.Type {
|
||||
case filer_pb.ObjectMutation_PUT:
|
||||
if m.Entry == nil {
|
||||
return fmt.Errorf("PUT requires an entry")
|
||||
}
|
||||
newEntry := filer.FromPbEntry(m.Directory, m.Entry)
|
||||
return fs.filer.CreateEntry(ctx, newEntry, false, fromOtherCluster, signatures, false, fs.filer.MaxFilenameLength)
|
||||
|
||||
case filer_pb.ObjectMutation_DELETE:
|
||||
fullpath := util.NewFullPath(m.Directory, m.Name)
|
||||
err := fs.filer.DeleteEntryMetaAndData(ctx, fullpath, m.IsRecursive, false, m.IsDeleteData, fromOtherCluster, signatures, 0)
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
||||
case filer_pb.ObjectMutation_PATCH_EXTENDED:
|
||||
fullpath := util.NewFullPath(m.Directory, m.Name)
|
||||
oldEntry, err := fs.filer.FindEntry(ctx, fullpath)
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Patch a copy so oldEntry still reflects the pre-update state for the
|
||||
// metadata notification's diff.
|
||||
newEntry := oldEntry.ShallowClone()
|
||||
newEntry.Extended = make(map[string][]byte, len(oldEntry.Extended))
|
||||
for k, v := range oldEntry.Extended {
|
||||
newEntry.Extended[k] = v
|
||||
}
|
||||
for k, v := range m.SetExtended {
|
||||
newEntry.Extended[k] = v
|
||||
}
|
||||
for _, k := range m.DeleteExtended {
|
||||
delete(newEntry.Extended, k)
|
||||
}
|
||||
if err := fs.filer.UpdateEntry(ctx, oldEntry, newEntry); err != nil {
|
||||
return err
|
||||
}
|
||||
// Emit the metadata event so the update replicates and subscribers see it,
|
||||
// matching the UpdateEntry handler.
|
||||
fs.filer.NotifyUpdateEvent(ctx, oldEntry, newEntry, true, fromOtherCluster, signatures)
|
||||
return nil
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown mutation type %v", m.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
||||
|
||||
glog.V(4).InfofCtx(ctx, "UpdateEntry %v", req)
|
||||
|
||||
167
weed/server/filer_grpc_server_object_txn_test.go
Normal file
167
weed/server/filer_grpc_server_object_txn_test.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func newTxnTestServer(seed map[string]*filer.Entry) (*FilerServer, *renameTestStore) {
|
||||
store := newRenameTestStore()
|
||||
for path, entry := range seed {
|
||||
entry.FullPath = util.FullPath(path)
|
||||
store.entries[path] = entry
|
||||
}
|
||||
f := newRenameTestFiler(store)
|
||||
f.DirBucketsPath = "/buckets"
|
||||
fs := &FilerServer{filer: f, option: &FilerOption{}, entryLockTable: util.NewLockTable[util.FullPath]()}
|
||||
return fs, store
|
||||
}
|
||||
|
||||
// A versioned delete is a multi-entry object operation: drop the null version,
|
||||
// write a delete marker, and flip the latest pointer. ObjectTransaction applies
|
||||
// all three atomically under one lock keyed on the object path.
|
||||
func TestObjectTransactionMultiEntry(t *testing.T) {
|
||||
now := time.Unix(1700000000, 0)
|
||||
fs, store := newTxnTestServer(map[string]*filer.Entry{
|
||||
"/buckets/b/obj": {
|
||||
Attr: filer.Attr{Inode: 1, Mtime: now, Crtime: now, Mode: 0644},
|
||||
Extended: map[string][]byte{s3_constants.ExtETagKey: []byte("abc")},
|
||||
},
|
||||
"/buckets/b/obj/.versions": {
|
||||
Attr: filer.Attr{Inode: 2, Mtime: now, Crtime: now, Mode: 0755 | (1 << 31)},
|
||||
Extended: map[string][]byte{"latest": []byte("v1")},
|
||||
},
|
||||
})
|
||||
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b", Name: "obj"},
|
||||
{Type: filer_pb.ObjectMutation_PUT, Directory: "/buckets/b/obj/.versions", Entry: &filer_pb.Entry{
|
||||
Name: "marker",
|
||||
Attributes: &filer_pb.FuseAttributes{Mtime: now.Unix(), FileMode: 0644, Inode: 3},
|
||||
Extended: map[string][]byte{"isDeleteMarker": []byte("true")},
|
||||
}},
|
||||
{Type: filer_pb.ObjectMutation_PATCH_EXTENDED, Directory: "/buckets/b/obj", Name: ".versions",
|
||||
SetExtended: map[string][]byte{"latest": []byte("marker")},
|
||||
DeleteExtended: nil},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := fs.ObjectTransaction(context.Background(), req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if resp.Error != "" {
|
||||
t.Fatalf("unexpected response error: %q", resp.Error)
|
||||
}
|
||||
|
||||
if _, ok := store.entries["/buckets/b/obj"]; ok {
|
||||
t.Errorf("null version should be deleted")
|
||||
}
|
||||
if _, ok := store.entries["/buckets/b/obj/.versions/marker"]; !ok {
|
||||
t.Errorf("delete marker should be created")
|
||||
}
|
||||
if got := string(store.entries["/buckets/b/obj/.versions"].Extended["latest"]); got != "marker" {
|
||||
t.Errorf("latest pointer = %q, want marker", got)
|
||||
}
|
||||
}
|
||||
|
||||
// A PATCH_EXTENDED mutation emits a metadata event (so the change replicates and
|
||||
// subscribers see it), carrying both the prior and updated state in the diff.
|
||||
func TestObjectTransactionPatchNotifies(t *testing.T) {
|
||||
queue := &captureQueue{}
|
||||
swapNotificationQueue(t, queue)
|
||||
|
||||
now := time.Unix(1700000000, 0)
|
||||
fs, _ := newTxnTestServer(map[string]*filer.Entry{
|
||||
"/buckets/b/obj/.versions": {
|
||||
Attr: filer.Attr{Inode: 2, Mtime: now, Crtime: now, Mode: 0755 | (1 << 31)},
|
||||
Extended: map[string][]byte{"latest": []byte("v1")},
|
||||
},
|
||||
})
|
||||
|
||||
resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
{Type: filer_pb.ObjectMutation_PATCH_EXTENDED, Directory: "/buckets/b/obj", Name: ".versions",
|
||||
SetExtended: map[string][]byte{"latest": []byte("v2")}},
|
||||
},
|
||||
})
|
||||
if err != nil || resp.Error != "" {
|
||||
t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error)
|
||||
}
|
||||
|
||||
events := queue.snapshot()
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 metadata event from PATCH_EXTENDED, got %d", len(events))
|
||||
}
|
||||
ev := events[0].notification
|
||||
if ev.NewEntry == nil || string(ev.NewEntry.Extended["latest"]) != "v2" {
|
||||
t.Fatalf("event new entry latest = %q, want v2", ev.GetNewEntry().GetExtended()["latest"])
|
||||
}
|
||||
if ev.OldEntry == nil || string(ev.OldEntry.Extended["latest"]) != "v1" {
|
||||
t.Fatalf("event old entry latest = %q, want v1 (clone must preserve prior state)", ev.GetOldEntry().GetExtended()["latest"])
|
||||
}
|
||||
}
|
||||
|
||||
// A failing precondition aborts before any mutation is applied.
|
||||
func TestObjectTransactionPreconditionAborts(t *testing.T) {
|
||||
now := time.Unix(1700000000, 0)
|
||||
fs, store := newTxnTestServer(map[string]*filer.Entry{
|
||||
"/buckets/b/obj": {
|
||||
Attr: filer.Attr{Inode: 1, Mtime: now, Crtime: now, Mode: 0644},
|
||||
Extended: map[string][]byte{s3_constants.ExtETagKey: []byte("abc")},
|
||||
},
|
||||
})
|
||||
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
Condition: &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
|
||||
{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"zzz"`}},
|
||||
}},
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b", Name: "obj"},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := fs.ObjectTransaction(context.Background(), req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if resp.ErrorCode != filer_pb.FilerError_PRECONDITION_FAILED {
|
||||
t.Fatalf("want PRECONDITION_FAILED, got %v (%q)", resp.ErrorCode, resp.Error)
|
||||
}
|
||||
if _, ok := store.entries["/buckets/b/obj"]; !ok {
|
||||
t.Errorf("object must survive a failed precondition")
|
||||
}
|
||||
}
|
||||
|
||||
// DELETE and PATCH of an absent entry are no-ops, so a replayed transaction
|
||||
// does not error.
|
||||
func TestObjectTransactionIdempotentNoops(t *testing.T) {
|
||||
fs, _ := newTxnTestServer(nil)
|
||||
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b", Name: "obj"},
|
||||
{Type: filer_pb.ObjectMutation_PATCH_EXTENDED, Directory: "/buckets/b/obj", Name: ".versions",
|
||||
SetExtended: map[string][]byte{"latest": []byte("x")}},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := fs.ObjectTransaction(context.Background(), req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if resp.Error != "" {
|
||||
t.Fatalf("no-op mutations should not error: %q", resp.Error)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user