From a3c0baa9b0e3dec856ef5b84413cdb90be8d717c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 25 May 2026 12:34:15 -0700 Subject: [PATCH] filer: cooling-off dual-read for POSIX locks during ring changes (#9672) While the ring changed within the last snapshot interval, a fresh owner asks the key's previous owner (LockRing.PriorOwner) whether it still holds a conflicting lock before granting TRY_LOCK or answering GET_LK, so it does not double-grant before re-assertion rebuilds its local state. The probe is marked cooling_probe so the previous owner answers from local state without recursing. PriorOwner uses the snapshot's prebuilt ring rather than rebuilding a hash ring per call. --- other/java/client/src/main/proto/filer.proto | 2 + weed/cluster/lock_manager/lock_ring.go | 46 ++++++++++++-- .../lock_manager/lock_ring_prior_test.go | 63 +++++++++++++++++++ weed/pb/filer.proto | 4 ++ weed/pb/filer_pb/filer.pb.go | 18 +++++- weed/server/filer_grpc_server_posix_lock.go | 60 ++++++++++++++++++ 6 files changed, 184 insertions(+), 9 deletions(-) create mode 100644 weed/cluster/lock_manager/lock_ring_prior_test.go diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index b2ae87131..4d7861d98 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -388,6 +388,8 @@ message PosixLockRequest { bool is_moved = 2; PosixLockOp op = 3; PosixLockRange lock = 4; + repeated PosixLockRange locks = 5; + bool cooling_probe = 6; } enum PosixLockOp { diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go index e635d7076..33e96c776 100644 --- a/weed/cluster/lock_manager/lock_ring.go +++ b/weed/cluster/lock_manager/lock_ring.go @@ -12,6 +12,7 @@ import ( type LockRingSnapshot struct { servers []pb.ServerAddress ts time.Time + ring *HashRing // prebuilt ring for servers, so PriorOwner need not rebuild per call } type LockRing struct { @@ -58,10 +59,12 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress, version int64) bool { // are always consistent — prevents a concurrent SetSnapshot from // seeing the new version but applying its servers to the old ring. r.Ring.SetServers(servers) + // Append the snapshot under the same lock as the ring update so a concurrent + // PriorOwner always sees snapshots[0] matching r.Ring (and snapshots[1] as the + // true prior); otherwise it could pair a new ring with a stale prior snapshot. + r.addOneSnapshotLocked(servers) r.Unlock() - r.addOneSnapshot(servers) - r.cleanupWg.Add(1) go func() { defer r.cleanupWg.Done() @@ -78,14 +81,16 @@ func (r *LockRing) Version() int64 { return r.version } -func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) { - r.Lock() - defer r.Unlock() - +// addOneSnapshotLocked appends a new snapshot (newest at index 0). The caller +// must hold r.Lock(), so the ring update and snapshot append are one atomic step. +func (r *LockRing) addOneSnapshotLocked(servers []pb.ServerAddress) { ts := time.Now() + ring := NewHashRing(DefaultVnodeCount) + ring.SetServers(servers) t := &LockRingSnapshot{ servers: servers, ts: ts, + ring: ring, } r.snapshots = append(r.snapshots, t) for i := len(r.snapshots) - 2; i >= 0; i-- { @@ -148,6 +153,35 @@ func (r *LockRing) GetPrimary(key string) pb.ServerAddress { return r.Ring.GetPrimary(key) } +// PriorOwner returns the key's owner from the previous ring snapshot, but only +// while the ring changed within the last snapshotInterval and that owner differs +// from the current primary. This is the cooling-off window in which the previous +// owner may still hold locks the new owner has not yet rebuilt — a caller can +// consult it before granting so a fresh owner does not double-grant during a +// rebalance. Returns "" outside the window or when ownership did not move. It +// uses the snapshot's prebuilt ring, so it does not rebuild a hash ring per call. +func (r *LockRing) PriorOwner(key string) pb.ServerAddress { + r.RLock() + defer r.RUnlock() + if len(r.snapshots) < 2 { + return "" + } + if time.Since(r.snapshots[0].ts) > r.snapshotInterval { + return "" + } + current := r.Ring.GetPrimary(key) + var prior pb.ServerAddress + if pr := r.snapshots[1].ring; pr != nil { + prior = pr.GetPrimary(key) + } else { + prior = hashKeyToServer(key, r.snapshots[1].servers) + } + if prior != "" && prior != current { + return prior + } + return "" +} + // hashKeyToServer uses a temporary consistent hash ring for the given server list. func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { if len(servers) == 0 { diff --git a/weed/cluster/lock_manager/lock_ring_prior_test.go b/weed/cluster/lock_manager/lock_ring_prior_test.go new file mode 100644 index 000000000..74bc0c648 --- /dev/null +++ b/weed/cluster/lock_manager/lock_ring_prior_test.go @@ -0,0 +1,63 @@ +package lock_manager + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" +) + +func TestLockRing_PriorOwner(t *testing.T) { + r := NewLockRing(5 * time.Second) + t.Cleanup(r.WaitForCleanup) + + setA := []pb.ServerAddress{"s1:1", "s2:1", "s3:1"} + r.SetSnapshot(setA, 1) + + // Only one snapshot: nothing to fall back to. + if got := r.PriorOwner("any"); got != "" { + t.Fatalf("single snapshot should have no prior owner, got %q", got) + } + + // Add a server so some keys' ownership moves. + setB := []pb.ServerAddress{"s1:1", "s2:1", "s3:1", "s4:1"} + r.SetSnapshot(setB, 2) + + var moved, stable string + for i := 0; i < 2000 && (moved == "" || stable == ""); i++ { + key := fmt.Sprintf("key-%d", i) + if r.GetPrimary(key) != hashKeyToServer(key, setA) { + if moved == "" { + moved = key + } + } else if stable == "" { + stable = key + } + } + if moved == "" || stable == "" { + t.Skip("could not find both a moved and a stable key") + } + + if got, want := r.PriorOwner(moved), hashKeyToServer(moved, setA); got != want { + t.Fatalf("PriorOwner(moved)=%q, want %q", got, want) + } + if got := r.PriorOwner(stable); got != "" { + t.Fatalf("unmoved key should have no prior owner, got %q", got) + } +} + +func TestLockRing_PriorOwnerExpires(t *testing.T) { + r := NewLockRing(20 * time.Millisecond) + t.Cleanup(r.WaitForCleanup) + r.SetSnapshot([]pb.ServerAddress{"s1:1", "s2:1", "s3:1"}, 1) + r.SetSnapshot([]pb.ServerAddress{"s1:1", "s2:1", "s3:1", "s4:1"}, 2) + + // Past the cooling interval, the prior owner is no longer offered. + time.Sleep(40 * time.Millisecond) + for i := 0; i < 2000; i++ { + if got := r.PriorOwner(fmt.Sprintf("key-%d", i)); got != "" { + t.Fatalf("prior owner should expire after the cooling interval, got %q", got) + } + } +} diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 0bcb44185..6e5abd3f0 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -392,6 +392,10 @@ message PosixLockRequest { // re-assertion, so the current owner filer can rebuild its in-memory state // after an ownership change or restart. lock.sid identifies the session. repeated PosixLockRange locks = 5; + // cooling_probe marks a dual-read a new owner sends to the previous owner + // during a ring change, so the previous owner answers from local state + // without itself cooling-off (no recursion). + bool cooling_probe = 6; } enum PosixLockOp { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index 77373db6d..0c12c0e7e 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1991,7 +1991,11 @@ type PosixLockRequest struct { // locks carries the full set a mount holds on key for a KEEP_ALIVE // re-assertion, so the current owner filer can rebuild its in-memory state // after an ownership change or restart. lock.sid identifies the session. - Locks []*PosixLockRange `protobuf:"bytes,5,rep,name=locks,proto3" json:"locks,omitempty"` + Locks []*PosixLockRange `protobuf:"bytes,5,rep,name=locks,proto3" json:"locks,omitempty"` + // cooling_probe marks a dual-read a new owner sends to the previous owner + // during a ring change, so the previous owner answers from local state + // without itself cooling-off (no recursion). + CoolingProbe bool `protobuf:"varint,6,opt,name=cooling_probe,json=coolingProbe,proto3" json:"cooling_probe,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2061,6 +2065,13 @@ func (x *PosixLockRequest) GetLocks() []*PosixLockRange { return nil } +func (x *PosixLockRequest) GetCoolingProbe() bool { + if x != nil { + return x.CoolingProbe + } + return false +} + type PosixLockResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Granted bool `protobuf:"varint,1,opt,name=granted,proto3" json:"granted,omitempty"` // for TRY_LOCK: whether the lock was granted @@ -6786,13 +6797,14 @@ const file_filer_proto_rawDesc = "" + "\x03sid\x18\x04 \x01(\x04R\x03sid\x12\x14\n" + "\x05owner\x18\x05 \x01(\x04R\x05owner\x12\x10\n" + "\x03pid\x18\x06 \x01(\rR\x03pid\x12\x19\n" + - "\bis_flock\x18\a \x01(\bR\aisFlock\"\xc4\x01\n" + + "\bis_flock\x18\a \x01(\bR\aisFlock\"\xe9\x01\n" + "\x10PosixLockRequest\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x19\n" + "\bis_moved\x18\x02 \x01(\bR\aisMoved\x12%\n" + "\x02op\x18\x03 \x01(\x0e2\x15.filer_pb.PosixLockOpR\x02op\x12,\n" + "\x04lock\x18\x04 \x01(\v2\x18.filer_pb.PosixLockRangeR\x04lock\x12.\n" + - "\x05locks\x18\x05 \x03(\v2\x18.filer_pb.PosixLockRangeR\x05locks\"\x86\x01\n" + + "\x05locks\x18\x05 \x03(\v2\x18.filer_pb.PosixLockRangeR\x05locks\x12#\n" + + "\rcooling_probe\x18\x06 \x01(\bR\fcoolingProbe\"\x86\x01\n" + "\x11PosixLockResponse\x12\x18\n" + "\agranted\x18\x01 \x01(\bR\agranted\x12!\n" + "\fhas_conflict\x18\x02 \x01(\bR\vhasConflict\x124\n" + diff --git a/weed/server/filer_grpc_server_posix_lock.go b/weed/server/filer_grpc_server_posix_lock.go index e01377774..5036953d5 100644 --- a/weed/server/filer_grpc_server_posix_lock.go +++ b/weed/server/filer_grpc_server_posix_lock.go @@ -17,6 +17,9 @@ const ( // each filer checks. The mount renews well within the TTL. posixLockSessionTTL = 15 * time.Second posixLockSweepInterval = 5 * time.Second + // posixCoolingProbeTimeout bounds the dual-read probe to the prior owner so a + // slow peer can't stall a non-blocking lock call during the cooling window. + posixCoolingProbeTimeout = 2 * time.Second ) // startPosixLockSweeper periodically reaps the locks of leased sessions (mounts) @@ -90,6 +93,15 @@ func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockReq resp := &filer_pb.PosixLockResponse{} switch req.Op { case filer_pb.PosixLockOp_TRY_LOCK: + // During a ring change, a previous owner may still hold a conflicting lock + // this fresh owner has not rebuilt yet; consult it before granting. + if !req.CoolingProbe { + if c, found := fs.coolingConflict(ctx, req.Key, lk); found { + resp.HasConflict = true + resp.Conflict = c + break + } + } if c, granted := fs.posixLocks.TryLock(req.Key, lk); granted { resp.Granted = true } else { @@ -99,6 +111,13 @@ func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockReq case filer_pb.PosixLockOp_UNLOCK: fs.posixLocks.Unlock(req.Key, lk) case filer_pb.PosixLockOp_GET_LK: + if !req.CoolingProbe { + if c, found := fs.coolingConflict(ctx, req.Key, lk); found { + resp.HasConflict = true + resp.Conflict = c + break + } + } if c, found := fs.posixLocks.GetLk(req.Key, lk); found { resp.HasConflict = true resp.Conflict = posixRangeToPb(c) @@ -128,6 +147,47 @@ func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockReq return resp, nil } +// coolingConflict asks the key's previous owner, during a ring-change cooling +// window, whether it still holds a lock that blocks lk — so a fresh owner does +// not double-grant before re-assertion rebuilds its local state. Best-effort: if +// the previous owner is unreachable (e.g. it left, which caused the change, so +// its locks are gone) the caller proceeds. The probe is marked cooling_probe so +// the previous owner answers from local state without itself cooling off. +func (fs *FilerServer) coolingConflict(ctx context.Context, key string, lk posixlock.Range) (*filer_pb.PosixLockRange, bool) { + if fs.filer.Dlm == nil { + return nil, false + } + prior := fs.filer.Dlm.LockRing.PriorOwner(key) + if prior == "" || prior == fs.option.Host { + return nil, false + } + // Bound the probe with its own short deadline: it runs on the non-blocking + // lock path, so a slow prior owner must not stall the caller. On timeout the + // probe is treated as unreachable (best-effort). + probeCtx, cancel := context.WithTimeout(ctx, posixCoolingProbeTimeout) + defer cancel() + var resp *filer_pb.PosixLockResponse + err := pb.WithFilerClient(false, 0, prior, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + var e error + resp, e = client.PosixLock(probeCtx, &filer_pb.PosixLockRequest{ + Key: key, + IsMoved: true, + CoolingProbe: true, + Op: filer_pb.PosixLockOp_GET_LK, + Lock: posixRangeToPb(lk), + }) + return e + }) + if err != nil { + glog.V(2).InfofCtx(ctx, "posix cooling probe %s -> %s: %v", key, prior, err) + return nil, false + } + if resp.GetHasConflict() { + return resp.GetConflict(), true + } + return nil, false +} + func posixRangeFromPb(l *filer_pb.PosixLockRange) posixlock.Range { return posixlock.Range{ Start: l.GetStart(),