mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-29 13:10:21 +00:00
filer: cooling-off dual-read for POSIX locks during ring changes (#9672)
While the ring changed within the last snapshot interval, a fresh owner asks the key's previous owner (LockRing.PriorOwner) whether it still holds a conflicting lock before granting TRY_LOCK or answering GET_LK, so it does not double-grant before re-assertion rebuilds its local state. The probe is marked cooling_probe so the previous owner answers from local state without recursing. PriorOwner uses the snapshot's prebuilt ring rather than rebuilding a hash ring per call.
This commit is contained in:
@@ -388,6 +388,8 @@ message PosixLockRequest {
|
||||
bool is_moved = 2;
|
||||
PosixLockOp op = 3;
|
||||
PosixLockRange lock = 4;
|
||||
repeated PosixLockRange locks = 5;
|
||||
bool cooling_probe = 6;
|
||||
}
|
||||
|
||||
enum PosixLockOp {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
type LockRingSnapshot struct {
|
||||
servers []pb.ServerAddress
|
||||
ts time.Time
|
||||
ring *HashRing // prebuilt ring for servers, so PriorOwner need not rebuild per call
|
||||
}
|
||||
|
||||
type LockRing struct {
|
||||
@@ -58,10 +59,12 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress, version int64) bool {
|
||||
// are always consistent — prevents a concurrent SetSnapshot from
|
||||
// seeing the new version but applying its servers to the old ring.
|
||||
r.Ring.SetServers(servers)
|
||||
// Append the snapshot under the same lock as the ring update so a concurrent
|
||||
// PriorOwner always sees snapshots[0] matching r.Ring (and snapshots[1] as the
|
||||
// true prior); otherwise it could pair a new ring with a stale prior snapshot.
|
||||
r.addOneSnapshotLocked(servers)
|
||||
r.Unlock()
|
||||
|
||||
r.addOneSnapshot(servers)
|
||||
|
||||
r.cleanupWg.Add(1)
|
||||
go func() {
|
||||
defer r.cleanupWg.Done()
|
||||
@@ -78,14 +81,16 @@ func (r *LockRing) Version() int64 {
|
||||
return r.version
|
||||
}
|
||||
|
||||
func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
// addOneSnapshotLocked appends a new snapshot (newest at index 0). The caller
|
||||
// must hold r.Lock(), so the ring update and snapshot append are one atomic step.
|
||||
func (r *LockRing) addOneSnapshotLocked(servers []pb.ServerAddress) {
|
||||
ts := time.Now()
|
||||
ring := NewHashRing(DefaultVnodeCount)
|
||||
ring.SetServers(servers)
|
||||
t := &LockRingSnapshot{
|
||||
servers: servers,
|
||||
ts: ts,
|
||||
ring: ring,
|
||||
}
|
||||
r.snapshots = append(r.snapshots, t)
|
||||
for i := len(r.snapshots) - 2; i >= 0; i-- {
|
||||
@@ -148,6 +153,35 @@ func (r *LockRing) GetPrimary(key string) pb.ServerAddress {
|
||||
return r.Ring.GetPrimary(key)
|
||||
}
|
||||
|
||||
// PriorOwner returns the key's owner from the previous ring snapshot, but only
|
||||
// while the ring changed within the last snapshotInterval and that owner differs
|
||||
// from the current primary. This is the cooling-off window in which the previous
|
||||
// owner may still hold locks the new owner has not yet rebuilt — a caller can
|
||||
// consult it before granting so a fresh owner does not double-grant during a
|
||||
// rebalance. Returns "" outside the window or when ownership did not move. It
|
||||
// uses the snapshot's prebuilt ring, so it does not rebuild a hash ring per call.
|
||||
func (r *LockRing) PriorOwner(key string) pb.ServerAddress {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
if len(r.snapshots) < 2 {
|
||||
return ""
|
||||
}
|
||||
if time.Since(r.snapshots[0].ts) > r.snapshotInterval {
|
||||
return ""
|
||||
}
|
||||
current := r.Ring.GetPrimary(key)
|
||||
var prior pb.ServerAddress
|
||||
if pr := r.snapshots[1].ring; pr != nil {
|
||||
prior = pr.GetPrimary(key)
|
||||
} else {
|
||||
prior = hashKeyToServer(key, r.snapshots[1].servers)
|
||||
}
|
||||
if prior != "" && prior != current {
|
||||
return prior
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// hashKeyToServer uses a temporary consistent hash ring for the given server list.
|
||||
func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
|
||||
if len(servers) == 0 {
|
||||
|
||||
63
weed/cluster/lock_manager/lock_ring_prior_test.go
Normal file
63
weed/cluster/lock_manager/lock_ring_prior_test.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package lock_manager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
)
|
||||
|
||||
func TestLockRing_PriorOwner(t *testing.T) {
|
||||
r := NewLockRing(5 * time.Second)
|
||||
t.Cleanup(r.WaitForCleanup)
|
||||
|
||||
setA := []pb.ServerAddress{"s1:1", "s2:1", "s3:1"}
|
||||
r.SetSnapshot(setA, 1)
|
||||
|
||||
// Only one snapshot: nothing to fall back to.
|
||||
if got := r.PriorOwner("any"); got != "" {
|
||||
t.Fatalf("single snapshot should have no prior owner, got %q", got)
|
||||
}
|
||||
|
||||
// Add a server so some keys' ownership moves.
|
||||
setB := []pb.ServerAddress{"s1:1", "s2:1", "s3:1", "s4:1"}
|
||||
r.SetSnapshot(setB, 2)
|
||||
|
||||
var moved, stable string
|
||||
for i := 0; i < 2000 && (moved == "" || stable == ""); i++ {
|
||||
key := fmt.Sprintf("key-%d", i)
|
||||
if r.GetPrimary(key) != hashKeyToServer(key, setA) {
|
||||
if moved == "" {
|
||||
moved = key
|
||||
}
|
||||
} else if stable == "" {
|
||||
stable = key
|
||||
}
|
||||
}
|
||||
if moved == "" || stable == "" {
|
||||
t.Skip("could not find both a moved and a stable key")
|
||||
}
|
||||
|
||||
if got, want := r.PriorOwner(moved), hashKeyToServer(moved, setA); got != want {
|
||||
t.Fatalf("PriorOwner(moved)=%q, want %q", got, want)
|
||||
}
|
||||
if got := r.PriorOwner(stable); got != "" {
|
||||
t.Fatalf("unmoved key should have no prior owner, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLockRing_PriorOwnerExpires(t *testing.T) {
|
||||
r := NewLockRing(20 * time.Millisecond)
|
||||
t.Cleanup(r.WaitForCleanup)
|
||||
r.SetSnapshot([]pb.ServerAddress{"s1:1", "s2:1", "s3:1"}, 1)
|
||||
r.SetSnapshot([]pb.ServerAddress{"s1:1", "s2:1", "s3:1", "s4:1"}, 2)
|
||||
|
||||
// Past the cooling interval, the prior owner is no longer offered.
|
||||
time.Sleep(40 * time.Millisecond)
|
||||
for i := 0; i < 2000; i++ {
|
||||
if got := r.PriorOwner(fmt.Sprintf("key-%d", i)); got != "" {
|
||||
t.Fatalf("prior owner should expire after the cooling interval, got %q", got)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -392,6 +392,10 @@ message PosixLockRequest {
|
||||
// re-assertion, so the current owner filer can rebuild its in-memory state
|
||||
// after an ownership change or restart. lock.sid identifies the session.
|
||||
repeated PosixLockRange locks = 5;
|
||||
// cooling_probe marks a dual-read a new owner sends to the previous owner
|
||||
// during a ring change, so the previous owner answers from local state
|
||||
// without itself cooling-off (no recursion).
|
||||
bool cooling_probe = 6;
|
||||
}
|
||||
|
||||
enum PosixLockOp {
|
||||
|
||||
@@ -1991,7 +1991,11 @@ type PosixLockRequest struct {
|
||||
// locks carries the full set a mount holds on key for a KEEP_ALIVE
|
||||
// re-assertion, so the current owner filer can rebuild its in-memory state
|
||||
// after an ownership change or restart. lock.sid identifies the session.
|
||||
Locks []*PosixLockRange `protobuf:"bytes,5,rep,name=locks,proto3" json:"locks,omitempty"`
|
||||
Locks []*PosixLockRange `protobuf:"bytes,5,rep,name=locks,proto3" json:"locks,omitempty"`
|
||||
// cooling_probe marks a dual-read a new owner sends to the previous owner
|
||||
// during a ring change, so the previous owner answers from local state
|
||||
// without itself cooling-off (no recursion).
|
||||
CoolingProbe bool `protobuf:"varint,6,opt,name=cooling_probe,json=coolingProbe,proto3" json:"cooling_probe,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -2061,6 +2065,13 @@ func (x *PosixLockRequest) GetLocks() []*PosixLockRange {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *PosixLockRequest) GetCoolingProbe() bool {
|
||||
if x != nil {
|
||||
return x.CoolingProbe
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type PosixLockResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Granted bool `protobuf:"varint,1,opt,name=granted,proto3" json:"granted,omitempty"` // for TRY_LOCK: whether the lock was granted
|
||||
@@ -6786,13 +6797,14 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\x03sid\x18\x04 \x01(\x04R\x03sid\x12\x14\n" +
|
||||
"\x05owner\x18\x05 \x01(\x04R\x05owner\x12\x10\n" +
|
||||
"\x03pid\x18\x06 \x01(\rR\x03pid\x12\x19\n" +
|
||||
"\bis_flock\x18\a \x01(\bR\aisFlock\"\xc4\x01\n" +
|
||||
"\bis_flock\x18\a \x01(\bR\aisFlock\"\xe9\x01\n" +
|
||||
"\x10PosixLockRequest\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x19\n" +
|
||||
"\bis_moved\x18\x02 \x01(\bR\aisMoved\x12%\n" +
|
||||
"\x02op\x18\x03 \x01(\x0e2\x15.filer_pb.PosixLockOpR\x02op\x12,\n" +
|
||||
"\x04lock\x18\x04 \x01(\v2\x18.filer_pb.PosixLockRangeR\x04lock\x12.\n" +
|
||||
"\x05locks\x18\x05 \x03(\v2\x18.filer_pb.PosixLockRangeR\x05locks\"\x86\x01\n" +
|
||||
"\x05locks\x18\x05 \x03(\v2\x18.filer_pb.PosixLockRangeR\x05locks\x12#\n" +
|
||||
"\rcooling_probe\x18\x06 \x01(\bR\fcoolingProbe\"\x86\x01\n" +
|
||||
"\x11PosixLockResponse\x12\x18\n" +
|
||||
"\agranted\x18\x01 \x01(\bR\agranted\x12!\n" +
|
||||
"\fhas_conflict\x18\x02 \x01(\bR\vhasConflict\x124\n" +
|
||||
|
||||
@@ -17,6 +17,9 @@ const (
|
||||
// each filer checks. The mount renews well within the TTL.
|
||||
posixLockSessionTTL = 15 * time.Second
|
||||
posixLockSweepInterval = 5 * time.Second
|
||||
// posixCoolingProbeTimeout bounds the dual-read probe to the prior owner so a
|
||||
// slow peer can't stall a non-blocking lock call during the cooling window.
|
||||
posixCoolingProbeTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// startPosixLockSweeper periodically reaps the locks of leased sessions (mounts)
|
||||
@@ -90,6 +93,15 @@ func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockReq
|
||||
resp := &filer_pb.PosixLockResponse{}
|
||||
switch req.Op {
|
||||
case filer_pb.PosixLockOp_TRY_LOCK:
|
||||
// During a ring change, a previous owner may still hold a conflicting lock
|
||||
// this fresh owner has not rebuilt yet; consult it before granting.
|
||||
if !req.CoolingProbe {
|
||||
if c, found := fs.coolingConflict(ctx, req.Key, lk); found {
|
||||
resp.HasConflict = true
|
||||
resp.Conflict = c
|
||||
break
|
||||
}
|
||||
}
|
||||
if c, granted := fs.posixLocks.TryLock(req.Key, lk); granted {
|
||||
resp.Granted = true
|
||||
} else {
|
||||
@@ -99,6 +111,13 @@ func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockReq
|
||||
case filer_pb.PosixLockOp_UNLOCK:
|
||||
fs.posixLocks.Unlock(req.Key, lk)
|
||||
case filer_pb.PosixLockOp_GET_LK:
|
||||
if !req.CoolingProbe {
|
||||
if c, found := fs.coolingConflict(ctx, req.Key, lk); found {
|
||||
resp.HasConflict = true
|
||||
resp.Conflict = c
|
||||
break
|
||||
}
|
||||
}
|
||||
if c, found := fs.posixLocks.GetLk(req.Key, lk); found {
|
||||
resp.HasConflict = true
|
||||
resp.Conflict = posixRangeToPb(c)
|
||||
@@ -128,6 +147,47 @@ func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockReq
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// coolingConflict asks the key's previous owner, during a ring-change cooling
|
||||
// window, whether it still holds a lock that blocks lk — so a fresh owner does
|
||||
// not double-grant before re-assertion rebuilds its local state. Best-effort: if
|
||||
// the previous owner is unreachable (e.g. it left, which caused the change, so
|
||||
// its locks are gone) the caller proceeds. The probe is marked cooling_probe so
|
||||
// the previous owner answers from local state without itself cooling off.
|
||||
func (fs *FilerServer) coolingConflict(ctx context.Context, key string, lk posixlock.Range) (*filer_pb.PosixLockRange, bool) {
|
||||
if fs.filer.Dlm == nil {
|
||||
return nil, false
|
||||
}
|
||||
prior := fs.filer.Dlm.LockRing.PriorOwner(key)
|
||||
if prior == "" || prior == fs.option.Host {
|
||||
return nil, false
|
||||
}
|
||||
// Bound the probe with its own short deadline: it runs on the non-blocking
|
||||
// lock path, so a slow prior owner must not stall the caller. On timeout the
|
||||
// probe is treated as unreachable (best-effort).
|
||||
probeCtx, cancel := context.WithTimeout(ctx, posixCoolingProbeTimeout)
|
||||
defer cancel()
|
||||
var resp *filer_pb.PosixLockResponse
|
||||
err := pb.WithFilerClient(false, 0, prior, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
var e error
|
||||
resp, e = client.PosixLock(probeCtx, &filer_pb.PosixLockRequest{
|
||||
Key: key,
|
||||
IsMoved: true,
|
||||
CoolingProbe: true,
|
||||
Op: filer_pb.PosixLockOp_GET_LK,
|
||||
Lock: posixRangeToPb(lk),
|
||||
})
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
glog.V(2).InfofCtx(ctx, "posix cooling probe %s -> %s: %v", key, prior, err)
|
||||
return nil, false
|
||||
}
|
||||
if resp.GetHasConflict() {
|
||||
return resp.GetConflict(), true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func posixRangeFromPb(l *filer_pb.PosixLockRange) posixlock.Range {
|
||||
return posixlock.Range{
|
||||
Start: l.GetStart(),
|
||||
|
||||
Reference in New Issue
Block a user