filer: add RECOMPUTE_LATEST mutation to ObjectTransaction (#9647)

Deleting a specific version that happens to be the latest needs the new latest
re-derived from the remaining versions, and that scan must run under the same
lock as the delete. The gateway can't do it atomically across RPCs.

Add a RECOMPUTE_LATEST mutation: it scans a directory under the transaction
lock, picks the child that sorts last (descending) or first by name, copies the
mapped extended keys from it into a pointer entry, and stores its name under
name_to_key. An empty directory clears the pointer keys. The filer stays
mechanical and S3-agnostic: the caller, which knows the versioning scheme,
supplies the sort direction and the key mappings. A missing pointer entry is a
no-op, so a replayed transaction is idempotent.
This commit is contained in:
Chris Lu
2026-05-23 18:29:46 -07:00
committed by GitHub
parent bf022ca018
commit e71bac55e9
5 changed files with 719 additions and 412 deletions

View File

@@ -282,18 +282,35 @@ enum FilerError {
// before the transaction; mutations here are metadata-scoped.
message ObjectMutation {
enum Type {
PUT = 0; // create or replace the entry (entry field)
DELETE = 1; // delete the entry at directory/name (no error if absent)
PATCH_EXTENDED = 2; // merge set_extended / remove delete_extended on the entry
PUT = 0; // create or replace the entry (entry field)
DELETE = 1; // delete the entry at directory/name (no error if absent)
PATCH_EXTENDED = 2; // merge set_extended / remove delete_extended on the entry
RECOMPUTE_LATEST = 3; // scan a directory and re-point a parent entry (recompute)
}
Type type = 1;
string directory = 2;
string name = 3; // entry name for DELETE / PATCH_EXTENDED
string name = 3; // entry name for DELETE / PATCH_EXTENDED / RECOMPUTE_LATEST (the pointer entry)
Entry entry = 4; // full entry for PUT
map<string, bytes> set_extended = 5; // PATCH_EXTENDED: keys to set
repeated string delete_extended = 6; // PATCH_EXTENDED: keys to remove
bool is_delete_data = 7; // DELETE: also delete chunk data
bool is_recursive = 8; // DELETE: recurse into a directory
Recompute recompute = 9; // RECOMPUTE_LATEST parameters
}
// Recompute re-derives a pointer entry (directory/name on the mutation) from the
// current contents of a scanned directory, atomically under the transaction's
// lock. It is mechanical: the filer picks the child that sorts first or last by
// name and copies the requested fields into the pointer; it has no knowledge of
// what the entries mean. The caller (which does know the versioning scheme)
// supplies the sort direction and the key mappings. This covers re-pointing the
// latest version after a specific version is deleted, where the scan must run
// under the lock.
message Recompute {
string scan_dir = 1; // directory whose direct children are scanned
bool descending = 2; // pick the child that sorts last by name (else first)
map<string, string> copy_extended = 3; // pointer extended key -> source extended key on the chosen child
string name_to_key = 4; // if set, store the chosen child's name under this pointer key
}
// ObjectTransactionRequest applies an ordered list of mutations atomically with

View File

@@ -282,18 +282,35 @@ enum FilerError {
// before the transaction; mutations here are metadata-scoped.
message ObjectMutation {
enum Type {
PUT = 0; // create or replace the entry (entry field)
DELETE = 1; // delete the entry at directory/name (no error if absent)
PATCH_EXTENDED = 2; // merge set_extended / remove delete_extended on the entry
PUT = 0; // create or replace the entry (entry field)
DELETE = 1; // delete the entry at directory/name (no error if absent)
PATCH_EXTENDED = 2; // merge set_extended / remove delete_extended on the entry
RECOMPUTE_LATEST = 3; // scan a directory and re-point a parent entry (recompute)
}
Type type = 1;
string directory = 2;
string name = 3; // entry name for DELETE / PATCH_EXTENDED
string name = 3; // entry name for DELETE / PATCH_EXTENDED / RECOMPUTE_LATEST (the pointer entry)
Entry entry = 4; // full entry for PUT
map<string, bytes> set_extended = 5; // PATCH_EXTENDED: keys to set
repeated string delete_extended = 6; // PATCH_EXTENDED: keys to remove
bool is_delete_data = 7; // DELETE: also delete chunk data
bool is_recursive = 8; // DELETE: recurse into a directory
Recompute recompute = 9; // RECOMPUTE_LATEST parameters
}
// Recompute re-derives a pointer entry (directory/name on the mutation) from the
// current contents of a scanned directory, atomically under the transaction's
// lock. It is mechanical: the filer picks the child that sorts first or last by
// name and copies the requested fields into the pointer; it has no knowledge of
// what the entries mean. The caller (which does know the versioning scheme)
// supplies the sort direction and the key mappings. This covers re-pointing the
// latest version after a specific version is deleted, where the scan must run
// under the lock.
message Recompute {
string scan_dir = 1; // directory whose direct children are scanned
bool descending = 2; // pick the child that sorts last by name (else first)
map<string, string> copy_extended = 3; // pointer extended key -> source extended key on the chosen child
string name_to_key = 4; // if set, store the chosen child's name under this pointer key
}
// ObjectTransactionRequest applies an ordered list of mutations atomically with

File diff suppressed because it is too large Load Diff

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"time"
@@ -333,11 +334,77 @@ func (fs *FilerServer) applyObjectMutation(ctx context.Context, m *filer_pb.Obje
fs.filer.NotifyUpdateEvent(ctx, oldEntry, newEntry, true, fromOtherCluster, signatures)
return nil
case filer_pb.ObjectMutation_RECOMPUTE_LATEST:
return fs.applyRecomputeLatest(ctx, m)
default:
return fmt.Errorf("unknown mutation type %v", m.Type)
}
}
// applyRecomputeLatest re-derives the pointer entry (m.Directory/m.Name) from the
// current contents of recompute.scan_dir, under the transaction's lock. It is
// mechanical: pick the child that sorts last (descending) or first by name, copy
// the mapped extended keys from it into the pointer, and store its name under
// name_to_key. When the scanned directory is empty the pointer keys are cleared.
// The caller, which knows the versioning scheme, supplies the direction and the
// key mappings. A missing pointer entry is a no-op (idempotent on replay).
func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.ObjectMutation) error {
rc := m.Recompute
if rc == nil {
return fmt.Errorf("RECOMPUTE_LATEST requires recompute parameters")
}
pointer, err := fs.filer.FindEntry(ctx, util.NewFullPath(m.Directory, m.Name))
if err == filer_pb.ErrNotFound {
return nil
}
if err != nil {
return err
}
if pointer.Extended == nil {
pointer.Extended = make(map[string][]byte)
}
// The store streams entries ascending by name. For the lowest-name pick we
// only need the first entry, so cap the listing at one; for the highest-name
// pick we must scan all and keep the last (the store has no reverse order).
limit := int64(math.MaxInt32)
if !rc.Descending {
limit = 1
}
var chosen *filer.Entry
_, listErr := fs.filer.StreamListDirectoryEntries(ctx, util.FullPath(rc.ScanDir), "", false, limit, "", "", "", func(entry *filer.Entry) (bool, error) {
chosen = entry
return rc.Descending, nil
})
if listErr != nil {
return listErr
}
if chosen == nil {
for pointerKey := range rc.CopyExtended {
delete(pointer.Extended, pointerKey)
}
if rc.NameToKey != "" {
delete(pointer.Extended, rc.NameToKey)
}
} else {
for pointerKey, sourceKey := range rc.CopyExtended {
if v, ok := chosen.Extended[sourceKey]; ok {
pointer.Extended[pointerKey] = v
} else {
delete(pointer.Extended, pointerKey)
}
}
if rc.NameToKey != "" {
pointer.Extended[rc.NameToKey] = []byte(chosen.Name())
}
}
return fs.filer.UpdateEntry(ctx, pointer, pointer)
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "UpdateEntry %v", req)

View File

@@ -143,6 +143,109 @@ func TestObjectTransactionPreconditionAborts(t *testing.T) {
}
}
// Deleting the latest version and recomputing re-points the pointer at the new
// highest-named remaining version; the scan runs under the transaction lock.
func TestObjectTransactionRecomputeLatest(t *testing.T) {
now := time.Unix(1700000000, 0)
ver := func(id string) *filer.Entry {
return &filer.Entry{
Attr: filer.Attr{Inode: 10, Mtime: now, Crtime: now, Mode: 0644},
Extended: map[string][]byte{"vid": []byte(id), "etag": []byte("etag-" + id)},
}
}
fs, store := newTxnTestServer(map[string]*filer.Entry{
"/buckets/b/obj/.versions": {
Attr: filer.Attr{Inode: 2, Mtime: now, Crtime: now, Mode: 0755 | (1 << 31)},
Extended: map[string][]byte{
"latestVid": []byte("v3"), "latestEtag": []byte("etag-v3"), "latestName": []byte("v3.ver"),
},
},
"/buckets/b/obj/.versions/v1.ver": ver("v1"),
"/buckets/b/obj/.versions/v2.ver": ver("v2"),
"/buckets/b/obj/.versions/v3.ver": ver("v3"),
})
recompute := func() *filer_pb.ObjectMutation {
return &filer_pb.ObjectMutation{
Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions",
Recompute: &filer_pb.Recompute{
ScanDir: "/buckets/b/obj/.versions",
Descending: true,
CopyExtended: map[string]string{"latestVid": "vid", "latestEtag": "etag"},
NameToKey: "latestName",
},
}
}
// Delete the latest (v3); recompute should pick v2.
resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
LockKey: "/buckets/b/obj",
Mutations: []*filer_pb.ObjectMutation{
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b/obj/.versions", Name: "v3.ver"},
recompute(),
},
})
if err != nil || resp.Error != "" {
t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error)
}
ptr := store.entries["/buckets/b/obj/.versions"].Extended
if string(ptr["latestVid"]) != "v2" || string(ptr["latestEtag"]) != "etag-v2" || string(ptr["latestName"]) != "v2.ver" {
t.Fatalf("after deleting v3, pointer = vid:%s etag:%s name:%s; want v2",
ptr["latestVid"], ptr["latestEtag"], ptr["latestName"])
}
// Delete the remaining versions; recompute on an empty dir clears the pointer.
resp, err = fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
LockKey: "/buckets/b/obj",
Mutations: []*filer_pb.ObjectMutation{
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b/obj/.versions", Name: "v2.ver"},
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b/obj/.versions", Name: "v1.ver"},
recompute(),
},
})
if err != nil || resp.Error != "" {
t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error)
}
ptr = store.entries["/buckets/b/obj/.versions"].Extended
for _, k := range []string{"latestVid", "latestEtag", "latestName"} {
if _, ok := ptr[k]; ok {
t.Errorf("pointer key %q should be cleared when no versions remain", k)
}
}
}
// With descending=false the lowest-named child is chosen (the listing is capped
// at one entry).
func TestObjectTransactionRecomputeAscending(t *testing.T) {
now := time.Unix(1700000000, 0)
ver := func(id string) *filer.Entry {
return &filer.Entry{Attr: filer.Attr{Inode: 10, Mtime: now, Crtime: now, Mode: 0644}, Extended: map[string][]byte{"vid": []byte(id)}}
}
fs, store := newTxnTestServer(map[string]*filer.Entry{
"/buckets/b/obj/.versions": {Attr: filer.Attr{Inode: 2, Mtime: now, Crtime: now, Mode: 0755 | (1 << 31)}, Extended: map[string][]byte{}},
"/buckets/b/obj/.versions/v1.ver": ver("v1"),
"/buckets/b/obj/.versions/v2.ver": ver("v2"),
})
resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
LockKey: "/buckets/b/obj",
Mutations: []*filer_pb.ObjectMutation{
{Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions",
Recompute: &filer_pb.Recompute{
ScanDir: "/buckets/b/obj/.versions",
Descending: false,
CopyExtended: map[string]string{"latestVid": "vid"},
}},
},
})
if err != nil || resp.Error != "" {
t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error)
}
if got := string(store.entries["/buckets/b/obj/.versions"].Extended["latestVid"]); got != "v1" {
t.Fatalf("ascending recompute latestVid = %q, want v1 (lowest)", got)
}
}
// DELETE and PATCH of an absent entry are no-ops, so a replayed transaction
// does not error.
func TestObjectTransactionIdempotentNoops(t *testing.T) {