filer: add ObjectTransactionBatch for multi-key object writes (#9649)

A multi-object delete spans many keys that route to different owner filers. The
gateway groups keys by owner and sends one batch per owner; the filer applies
each transaction under its own per-path lock, independent of the others.

A failed transaction (precondition or mutation error) is reported in its own
response without aborting the rest, matching S3 multi-object semantics where
each key succeeds or fails on its own. There is no cross-key atomicity, which S3
batch delete does not require.
This commit is contained in:
Chris Lu
2026-05-23 21:09:02 -07:00
committed by GitHub
parent dc5621d2ae
commit 091aad59dc
6 changed files with 680 additions and 387 deletions

View File

@@ -34,6 +34,9 @@ service SeaweedFiler {
rpc ObjectTransaction (ObjectTransactionRequest) returns (ObjectTransactionResponse) {
}
rpc ObjectTransactionBatch (ObjectTransactionBatchRequest) returns (ObjectTransactionBatchResponse) {
}
rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
}
rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) {
@@ -344,6 +347,19 @@ message ObjectTransactionResponse {
FilerError error_code = 2;
}
// ObjectTransactionBatch applies several object transactions in one round trip,
// each under its own per-path lock and independent of the others (no cross-key
// atomicity). A caller groups keys that route to the same owner filer and sends
// one batch per owner, e.g. for a multi-object delete. Each response is parallel
// to its request.
message ObjectTransactionBatchRequest {
repeated ObjectTransactionRequest transactions = 1;
}
message ObjectTransactionBatchResponse {
repeated ObjectTransactionResponse responses = 1;
}
message CreateEntryResponse {
string error = 1; // kept for human readability + backward compat
SubscribeMetadataResponse metadata_event = 2;

View File

@@ -34,6 +34,9 @@ service SeaweedFiler {
rpc ObjectTransaction (ObjectTransactionRequest) returns (ObjectTransactionResponse) {
}
rpc ObjectTransactionBatch (ObjectTransactionBatchRequest) returns (ObjectTransactionBatchResponse) {
}
rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
}
rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) {
@@ -344,6 +347,19 @@ message ObjectTransactionResponse {
FilerError error_code = 2;
}
// ObjectTransactionBatch applies several object transactions in one round trip,
// each under its own per-path lock and independent of the others (no cross-key
// atomicity). A caller groups keys that route to the same owner filer and sends
// one batch per owner, e.g. for a multi-object delete. Each response is parallel
// to its request.
message ObjectTransactionBatchRequest {
repeated ObjectTransactionRequest transactions = 1;
}
message ObjectTransactionBatchResponse {
repeated ObjectTransactionResponse responses = 1;
}
message CreateEntryResponse {
string error = 1; // kept for human readability + backward compat
SubscribeMetadataResponse metadata_event = 2;

File diff suppressed because it is too large Load Diff

View File

@@ -27,6 +27,7 @@ const (
SeaweedFiler_AppendToEntry_FullMethodName = "/filer_pb.SeaweedFiler/AppendToEntry"
SeaweedFiler_DeleteEntry_FullMethodName = "/filer_pb.SeaweedFiler/DeleteEntry"
SeaweedFiler_ObjectTransaction_FullMethodName = "/filer_pb.SeaweedFiler/ObjectTransaction"
SeaweedFiler_ObjectTransactionBatch_FullMethodName = "/filer_pb.SeaweedFiler/ObjectTransactionBatch"
SeaweedFiler_AtomicRenameEntry_FullMethodName = "/filer_pb.SeaweedFiler/AtomicRenameEntry"
SeaweedFiler_StreamRenameEntry_FullMethodName = "/filer_pb.SeaweedFiler/StreamRenameEntry"
SeaweedFiler_StreamMutateEntry_FullMethodName = "/filer_pb.SeaweedFiler/StreamMutateEntry"
@@ -64,6 +65,7 @@ type SeaweedFilerClient interface {
AppendToEntry(ctx context.Context, in *AppendToEntryRequest, opts ...grpc.CallOption) (*AppendToEntryResponse, error)
DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error)
ObjectTransaction(ctx context.Context, in *ObjectTransactionRequest, opts ...grpc.CallOption) (*ObjectTransactionResponse, error)
ObjectTransactionBatch(ctx context.Context, in *ObjectTransactionBatchRequest, opts ...grpc.CallOption) (*ObjectTransactionBatchResponse, error)
AtomicRenameEntry(ctx context.Context, in *AtomicRenameEntryRequest, opts ...grpc.CallOption) (*AtomicRenameEntryResponse, error)
StreamRenameEntry(ctx context.Context, in *StreamRenameEntryRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamRenameEntryResponse], error)
StreamMutateEntry(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[StreamMutateEntryRequest, StreamMutateEntryResponse], error)
@@ -189,6 +191,16 @@ func (c *seaweedFilerClient) ObjectTransaction(ctx context.Context, in *ObjectTr
return out, nil
}
func (c *seaweedFilerClient) ObjectTransactionBatch(ctx context.Context, in *ObjectTransactionBatchRequest, opts ...grpc.CallOption) (*ObjectTransactionBatchResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ObjectTransactionBatchResponse)
err := c.cc.Invoke(ctx, SeaweedFiler_ObjectTransactionBatch_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedFilerClient) AtomicRenameEntry(ctx context.Context, in *AtomicRenameEntryRequest, opts ...grpc.CallOption) (*AtomicRenameEntryResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AtomicRenameEntryResponse)
@@ -470,6 +482,7 @@ type SeaweedFilerServer interface {
AppendToEntry(context.Context, *AppendToEntryRequest) (*AppendToEntryResponse, error)
DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error)
ObjectTransaction(context.Context, *ObjectTransactionRequest) (*ObjectTransactionResponse, error)
ObjectTransactionBatch(context.Context, *ObjectTransactionBatchRequest) (*ObjectTransactionBatchResponse, error)
AtomicRenameEntry(context.Context, *AtomicRenameEntryRequest) (*AtomicRenameEntryResponse, error)
StreamRenameEntry(*StreamRenameEntryRequest, grpc.ServerStreamingServer[StreamRenameEntryResponse]) error
StreamMutateEntry(grpc.BidiStreamingServer[StreamMutateEntryRequest, StreamMutateEntryResponse]) error
@@ -530,6 +543,9 @@ func (UnimplementedSeaweedFilerServer) DeleteEntry(context.Context, *DeleteEntry
func (UnimplementedSeaweedFilerServer) ObjectTransaction(context.Context, *ObjectTransactionRequest) (*ObjectTransactionResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ObjectTransaction not implemented")
}
func (UnimplementedSeaweedFilerServer) ObjectTransactionBatch(context.Context, *ObjectTransactionBatchRequest) (*ObjectTransactionBatchResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ObjectTransactionBatch not implemented")
}
func (UnimplementedSeaweedFilerServer) AtomicRenameEntry(context.Context, *AtomicRenameEntryRequest) (*AtomicRenameEntryResponse, error) {
return nil, status.Error(codes.Unimplemented, "method AtomicRenameEntry not implemented")
}
@@ -757,6 +773,24 @@ func _SeaweedFiler_ObjectTransaction_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_ObjectTransactionBatch_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ObjectTransactionBatchRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).ObjectTransactionBatch(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: SeaweedFiler_ObjectTransactionBatch_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).ObjectTransactionBatch(ctx, req.(*ObjectTransactionBatchRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_AtomicRenameEntry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AtomicRenameEntryRequest)
if err := dec(in); err != nil {
@@ -1167,6 +1201,10 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
MethodName: "ObjectTransaction",
Handler: _SeaweedFiler_ObjectTransaction_Handler,
},
{
MethodName: "ObjectTransactionBatch",
Handler: _SeaweedFiler_ObjectTransactionBatch_Handler,
},
{
MethodName: "AtomicRenameEntry",
Handler: _SeaweedFiler_AtomicRenameEntry_Handler,

View File

@@ -282,6 +282,39 @@ func (fs *FilerServer) ObjectTransaction(ctx context.Context, req *filer_pb.Obje
return &filer_pb.ObjectTransactionResponse{}, nil
}
// ObjectTransactionBatch applies several object transactions in one round trip,
// each under its own per-path lock and independent of the others. A failed
// transaction (precondition or mutation error) is reported in its own response
// without aborting the rest, matching S3 multi-object semantics where each key
// succeeds or fails on its own.
func (fs *FilerServer) ObjectTransactionBatch(ctx context.Context, req *filer_pb.ObjectTransactionBatchRequest) (*filer_pb.ObjectTransactionBatchResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request is required")
}
resp := &filer_pb.ObjectTransactionBatchResponse{
Responses: make([]*filer_pb.ObjectTransactionResponse, 0, len(req.Transactions)),
}
for _, txn := range req.Transactions {
// Stop early if the caller went away; the request still holds the
// unprocessed transactions, so it is retried rather than lost.
if err := ctx.Err(); err != nil {
return nil, err
}
if txn == nil {
resp.Responses = append(resp.Responses, &filer_pb.ObjectTransactionResponse{Error: "nil transaction"})
continue
}
one, err := fs.ObjectTransaction(ctx, txn)
if err != nil {
// A transport-level error on one transaction is surfaced as that
// transaction's error; the batch RPC itself still succeeds.
one = &filer_pb.ObjectTransactionResponse{Error: err.Error()}
}
resp.Responses = append(resp.Responses, one)
}
return resp, nil
}
// applyObjectMutation applies a single mutation while the transaction's path
// lock is held. PUT entries are expected to be fully prepared by the caller
// (chunks resolved); mutations here are metadata-scoped. A DELETE of an absent

View File

@@ -246,6 +246,92 @@ func TestObjectTransactionRecomputeAscending(t *testing.T) {
}
}
// A batch applies each transaction independently: one failed precondition does
// not abort the others, matching S3 multi-object delete semantics.
func TestObjectTransactionBatchIndependent(t *testing.T) {
now := time.Unix(1700000000, 0)
obj := func(inode uint64) *filer.Entry {
return &filer.Entry{
Attr: filer.Attr{Inode: inode, Mtime: now, Crtime: now, Mode: 0644},
Extended: map[string][]byte{s3_constants.ExtETagKey: []byte("abc")},
}
}
fs, store := newTxnTestServer(map[string]*filer.Entry{
"/buckets/b/a": obj(1),
"/buckets/b/c": obj(3),
})
del := func(name string, cond *filer_pb.WriteCondition) *filer_pb.ObjectTransactionRequest {
return &filer_pb.ObjectTransactionRequest{
LockKey: "/buckets/b/" + name,
Condition: cond,
Mutations: []*filer_pb.ObjectMutation{
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b", Name: name},
},
}
}
resp, err := fs.ObjectTransactionBatch(context.Background(), &filer_pb.ObjectTransactionBatchRequest{
Transactions: []*filer_pb.ObjectTransactionRequest{
del("a", nil),
del("c", &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"zzz"`}},
}}),
},
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if len(resp.Responses) != 2 {
t.Fatalf("want 2 responses, got %d", len(resp.Responses))
}
if resp.Responses[0].Error != "" {
t.Errorf("delete a should succeed: %q", resp.Responses[0].Error)
}
if resp.Responses[1].ErrorCode != filer_pb.FilerError_PRECONDITION_FAILED {
t.Errorf("delete c should fail precondition, got %v", resp.Responses[1].ErrorCode)
}
if _, ok := store.entries["/buckets/b/a"]; ok {
t.Errorf("a should be deleted")
}
if _, ok := store.entries["/buckets/b/c"]; !ok {
t.Errorf("c should survive its failed precondition")
}
}
// A nil transaction in a batch yields an error response in its slot rather than
// panicking, keeping responses parallel to the requests.
func TestObjectTransactionBatchNilTransaction(t *testing.T) {
now := time.Unix(1700000000, 0)
fs, store := newTxnTestServer(map[string]*filer.Entry{
"/buckets/b/a": {Attr: filer.Attr{Inode: 1, Mtime: now, Crtime: now, Mode: 0644}},
})
resp, err := fs.ObjectTransactionBatch(context.Background(), &filer_pb.ObjectTransactionBatchRequest{
Transactions: []*filer_pb.ObjectTransactionRequest{
nil,
{LockKey: "/buckets/b/a", Mutations: []*filer_pb.ObjectMutation{
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b", Name: "a"},
}},
},
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if len(resp.Responses) != 2 {
t.Fatalf("want 2 responses (parallel to requests), got %d", len(resp.Responses))
}
if resp.Responses[0].Error == "" {
t.Errorf("nil transaction should produce an error response")
}
if resp.Responses[1].Error != "" {
t.Errorf("valid transaction should succeed: %q", resp.Responses[1].Error)
}
if _, ok := store.entries["/buckets/b/a"]; ok {
t.Errorf("a should be deleted by the valid transaction")
}
}
// DELETE and PATCH of an absent entry are no-ops, so a replayed transaction
// does not error.
func TestObjectTransactionIdempotentNoops(t *testing.T) {