mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-28 12:41:15 +00:00
filer: routed PosixLock RPC over the in-memory authority (#9664)
* filer: in-memory POSIX lock authority (Manager) Concurrent multi-inode authority over the per-inode Set: a Set per opaque inode key (path, or hl:<HardLinkId>) plus a session->keys index so a dead mount's locks reap in O(locks held). Lock state stays in memory like the distributed lock manager's, off the replicated meta-log. TryLock/Unlock/ GetLk/ReleasePosixOwner/ReleaseFlockOwner/ReleaseSession; empty sets and stale index entries are pruned on release. * filer: routed PosixLock RPC over the in-memory authority Adds the PosixLock RPC (try/unlock/get_lk + the flush/release owner drops) that the owner filer answers from its in-memory Manager. The request key is the inode identity ring key; a non-owner filer forwards one hop (is_moved-bounded), mirroring ObjectTransaction, so the owner's table stays the single authority under a stale ring view. Strictly non-blocking; SetLkw polling lives in the mount.
This commit is contained in:
@@ -37,6 +37,9 @@ service SeaweedFiler {
|
||||
rpc ObjectTransactionBatch (ObjectTransactionBatchRequest) returns (ObjectTransactionBatchResponse) {
|
||||
}
|
||||
|
||||
rpc PosixLock (PosixLockRequest) returns (PosixLockResponse) {
|
||||
}
|
||||
|
||||
rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
|
||||
}
|
||||
rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) {
|
||||
@@ -351,7 +354,7 @@ message ObjectTransactionRequest {
|
||||
repeated ObjectMutation mutations = 3;
|
||||
bool is_from_other_cluster = 4;
|
||||
repeated int32 signatures = 5;
|
||||
string condition_key = 6;
|
||||
string condition_key = 6; // if set, evaluate the condition against this entry instead of lock_key (still locking lock_key)
|
||||
string route_key = 7; // ring key identifying the owner filer; a non-owner forwards the whole transaction to it
|
||||
bool is_moved = 8; // set on a forwarded transaction so the receiver applies it locally instead of forwarding again
|
||||
}
|
||||
@@ -361,6 +364,46 @@ message ObjectTransactionResponse {
|
||||
FilerError error_code = 2;
|
||||
}
|
||||
|
||||
// PosixLockRange is one advisory byte-range lock. Owner identity is (sid, owner):
|
||||
// sid is the mount session, owner the FUSE lock owner within it, so owners from
|
||||
// different mounts never alias. end is inclusive (max uint64 = to EOF); is_flock
|
||||
// separates the flock and fcntl namespaces, which never conflict.
|
||||
message PosixLockRange {
|
||||
uint64 start = 1;
|
||||
uint64 end = 2;
|
||||
uint32 type = 3; // 1=read, 2=write, 3=unlock
|
||||
uint64 sid = 4;
|
||||
uint64 owner = 5;
|
||||
uint32 pid = 6; // holder pid, for get_lk reporting only
|
||||
bool is_flock = 7;
|
||||
}
|
||||
|
||||
// PosixLock routes an advisory lock operation to the inode's owner filer, which
|
||||
// holds the authoritative in-memory lock table. key is the inode identity ring
|
||||
// key (the file path, or hl:<HardLinkId> for a hardlink) used both to resolve the
|
||||
// owner and to index the table. A non-owner filer forwards the request one hop;
|
||||
// is_moved bounds it so a stale ring view cannot loop.
|
||||
message PosixLockRequest {
|
||||
string key = 1;
|
||||
bool is_moved = 2;
|
||||
PosixLockOp op = 3;
|
||||
PosixLockRange lock = 4;
|
||||
}
|
||||
|
||||
enum PosixLockOp {
|
||||
TRY_LOCK = 0; // grant lock or report conflict (non-blocking)
|
||||
UNLOCK = 1; // release lock's owner's locks over its range
|
||||
GET_LK = 2; // report a conflicting lock, if any
|
||||
RELEASE_POSIX_OWNER = 3; // drop the owner's fcntl locks (flush-time)
|
||||
RELEASE_FLOCK_OWNER = 4; // drop the owner's flock locks (release-time)
|
||||
}
|
||||
|
||||
message PosixLockResponse {
|
||||
bool granted = 1; // for TRY_LOCK: whether the lock was granted
|
||||
bool has_conflict = 2; // whether conflict is populated
|
||||
PosixLockRange conflict = 3; // the blocking lock (TRY_LOCK conflict / GET_LK result)
|
||||
}
|
||||
|
||||
// ObjectTransactionBatch applies several object transactions in one round trip,
|
||||
// each under its own per-path lock and independent of the others (no cross-key
|
||||
// atomicity). A caller groups keys that route to the same owner filer and sends
|
||||
|
||||
@@ -37,6 +37,9 @@ service SeaweedFiler {
|
||||
rpc ObjectTransactionBatch (ObjectTransactionBatchRequest) returns (ObjectTransactionBatchResponse) {
|
||||
}
|
||||
|
||||
rpc PosixLock (PosixLockRequest) returns (PosixLockResponse) {
|
||||
}
|
||||
|
||||
rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
|
||||
}
|
||||
rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) {
|
||||
@@ -361,6 +364,46 @@ message ObjectTransactionResponse {
|
||||
FilerError error_code = 2;
|
||||
}
|
||||
|
||||
// PosixLockRange is one advisory byte-range lock. Owner identity is (sid, owner):
|
||||
// sid is the mount session, owner the FUSE lock owner within it, so owners from
|
||||
// different mounts never alias. end is inclusive (max uint64 = to EOF); is_flock
|
||||
// separates the flock and fcntl namespaces, which never conflict.
|
||||
message PosixLockRange {
|
||||
uint64 start = 1;
|
||||
uint64 end = 2;
|
||||
uint32 type = 3; // 1=read, 2=write, 3=unlock
|
||||
uint64 sid = 4;
|
||||
uint64 owner = 5;
|
||||
uint32 pid = 6; // holder pid, for get_lk reporting only
|
||||
bool is_flock = 7;
|
||||
}
|
||||
|
||||
// PosixLock routes an advisory lock operation to the inode's owner filer, which
|
||||
// holds the authoritative in-memory lock table. key is the inode identity ring
|
||||
// key (the file path, or hl:<HardLinkId> for a hardlink) used both to resolve the
|
||||
// owner and to index the table. A non-owner filer forwards the request one hop;
|
||||
// is_moved bounds it so a stale ring view cannot loop.
|
||||
message PosixLockRequest {
|
||||
string key = 1;
|
||||
bool is_moved = 2;
|
||||
PosixLockOp op = 3;
|
||||
PosixLockRange lock = 4;
|
||||
}
|
||||
|
||||
enum PosixLockOp {
|
||||
TRY_LOCK = 0; // grant lock or report conflict (non-blocking)
|
||||
UNLOCK = 1; // release lock's owner's locks over its range
|
||||
GET_LK = 2; // report a conflicting lock, if any
|
||||
RELEASE_POSIX_OWNER = 3; // drop the owner's fcntl locks (flush-time)
|
||||
RELEASE_FLOCK_OWNER = 4; // drop the owner's flock locks (release-time)
|
||||
}
|
||||
|
||||
message PosixLockResponse {
|
||||
bool granted = 1; // for TRY_LOCK: whether the lock was granted
|
||||
bool has_conflict = 2; // whether conflict is populated
|
||||
PosixLockRange conflict = 3; // the blocking lock (TRY_LOCK conflict / GET_LK result)
|
||||
}
|
||||
|
||||
// ObjectTransactionBatch applies several object transactions in one round trip,
|
||||
// each under its own per-path lock and independent of the others (no cross-key
|
||||
// atomicity). A caller groups keys that route to the same owner filer and sends
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -28,6 +28,7 @@ const (
|
||||
SeaweedFiler_DeleteEntry_FullMethodName = "/filer_pb.SeaweedFiler/DeleteEntry"
|
||||
SeaweedFiler_ObjectTransaction_FullMethodName = "/filer_pb.SeaweedFiler/ObjectTransaction"
|
||||
SeaweedFiler_ObjectTransactionBatch_FullMethodName = "/filer_pb.SeaweedFiler/ObjectTransactionBatch"
|
||||
SeaweedFiler_PosixLock_FullMethodName = "/filer_pb.SeaweedFiler/PosixLock"
|
||||
SeaweedFiler_AtomicRenameEntry_FullMethodName = "/filer_pb.SeaweedFiler/AtomicRenameEntry"
|
||||
SeaweedFiler_StreamRenameEntry_FullMethodName = "/filer_pb.SeaweedFiler/StreamRenameEntry"
|
||||
SeaweedFiler_StreamMutateEntry_FullMethodName = "/filer_pb.SeaweedFiler/StreamMutateEntry"
|
||||
@@ -66,6 +67,7 @@ type SeaweedFilerClient interface {
|
||||
DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error)
|
||||
ObjectTransaction(ctx context.Context, in *ObjectTransactionRequest, opts ...grpc.CallOption) (*ObjectTransactionResponse, error)
|
||||
ObjectTransactionBatch(ctx context.Context, in *ObjectTransactionBatchRequest, opts ...grpc.CallOption) (*ObjectTransactionBatchResponse, error)
|
||||
PosixLock(ctx context.Context, in *PosixLockRequest, opts ...grpc.CallOption) (*PosixLockResponse, error)
|
||||
AtomicRenameEntry(ctx context.Context, in *AtomicRenameEntryRequest, opts ...grpc.CallOption) (*AtomicRenameEntryResponse, error)
|
||||
StreamRenameEntry(ctx context.Context, in *StreamRenameEntryRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[StreamRenameEntryResponse], error)
|
||||
StreamMutateEntry(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[StreamMutateEntryRequest, StreamMutateEntryResponse], error)
|
||||
@@ -201,6 +203,16 @@ func (c *seaweedFilerClient) ObjectTransactionBatch(ctx context.Context, in *Obj
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedFilerClient) PosixLock(ctx context.Context, in *PosixLockRequest, opts ...grpc.CallOption) (*PosixLockResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(PosixLockResponse)
|
||||
err := c.cc.Invoke(ctx, SeaweedFiler_PosixLock_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedFilerClient) AtomicRenameEntry(ctx context.Context, in *AtomicRenameEntryRequest, opts ...grpc.CallOption) (*AtomicRenameEntryResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(AtomicRenameEntryResponse)
|
||||
@@ -483,6 +495,7 @@ type SeaweedFilerServer interface {
|
||||
DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error)
|
||||
ObjectTransaction(context.Context, *ObjectTransactionRequest) (*ObjectTransactionResponse, error)
|
||||
ObjectTransactionBatch(context.Context, *ObjectTransactionBatchRequest) (*ObjectTransactionBatchResponse, error)
|
||||
PosixLock(context.Context, *PosixLockRequest) (*PosixLockResponse, error)
|
||||
AtomicRenameEntry(context.Context, *AtomicRenameEntryRequest) (*AtomicRenameEntryResponse, error)
|
||||
StreamRenameEntry(*StreamRenameEntryRequest, grpc.ServerStreamingServer[StreamRenameEntryResponse]) error
|
||||
StreamMutateEntry(grpc.BidiStreamingServer[StreamMutateEntryRequest, StreamMutateEntryResponse]) error
|
||||
@@ -546,6 +559,9 @@ func (UnimplementedSeaweedFilerServer) ObjectTransaction(context.Context, *Objec
|
||||
func (UnimplementedSeaweedFilerServer) ObjectTransactionBatch(context.Context, *ObjectTransactionBatchRequest) (*ObjectTransactionBatchResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ObjectTransactionBatch not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedFilerServer) PosixLock(context.Context, *PosixLockRequest) (*PosixLockResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method PosixLock not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedFilerServer) AtomicRenameEntry(context.Context, *AtomicRenameEntryRequest) (*AtomicRenameEntryResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method AtomicRenameEntry not implemented")
|
||||
}
|
||||
@@ -791,6 +807,24 @@ func _SeaweedFiler_ObjectTransactionBatch_Handler(srv interface{}, ctx context.C
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _SeaweedFiler_PosixLock_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(PosixLockRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(SeaweedFilerServer).PosixLock(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: SeaweedFiler_PosixLock_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(SeaweedFilerServer).PosixLock(ctx, req.(*PosixLockRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _SeaweedFiler_AtomicRenameEntry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(AtomicRenameEntryRequest)
|
||||
if err := dec(in); err != nil {
|
||||
@@ -1205,6 +1239,10 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "ObjectTransactionBatch",
|
||||
Handler: _SeaweedFiler_ObjectTransactionBatch_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "PosixLock",
|
||||
Handler: _SeaweedFiler_PosixLock_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "AtomicRenameEntry",
|
||||
Handler: _SeaweedFiler_AtomicRenameEntry_Handler,
|
||||
|
||||
95
weed/server/filer_grpc_server_posix_lock.go
Normal file
95
weed/server/filer_grpc_server_posix_lock.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer/posixlock"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
)
|
||||
|
||||
// PosixLock applies one advisory lock operation against the in-memory lock table
|
||||
// of the inode's owner filer. The owner is resolved from req.Key on the same
|
||||
// ring the gateway and DLM use; a non-owner filer forwards the request one hop
|
||||
// (is_moved bounds it), so the owner's table stays the single authority even when
|
||||
// the caller's ring view is stale. Blocking (SetLkw) is the caller's job — this
|
||||
// is strictly non-blocking try/release/query.
|
||||
func (fs *FilerServer) PosixLock(ctx context.Context, req *filer_pb.PosixLockRequest) (*filer_pb.PosixLockResponse, error) {
|
||||
if req.Key == "" {
|
||||
return &filer_pb.PosixLockResponse{}, fmt.Errorf("key is required")
|
||||
}
|
||||
if req.Lock == nil {
|
||||
return &filer_pb.PosixLockResponse{}, fmt.Errorf("lock is required")
|
||||
}
|
||||
|
||||
if !req.IsMoved && fs.filer.Dlm != nil {
|
||||
if owner := fs.filer.Dlm.LockRing.GetPrimary(req.Key); owner != "" && owner != fs.option.Host {
|
||||
forwarded := &filer_pb.PosixLockRequest{
|
||||
Key: req.Key,
|
||||
IsMoved: true,
|
||||
Op: req.Op,
|
||||
Lock: req.Lock,
|
||||
}
|
||||
glog.V(4).InfofCtx(ctx, "PosixLock %s op=%v: forwarding to owner %s", req.Key, req.Op, owner)
|
||||
var resp *filer_pb.PosixLockResponse
|
||||
err := pb.WithFilerClient(false, 0, owner, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
var e error
|
||||
resp, e = client.PosixLock(ctx, forwarded)
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return &filer_pb.PosixLockResponse{}, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
|
||||
lk := posixlock.Range{
|
||||
Start: req.Lock.GetStart(),
|
||||
End: req.Lock.GetEnd(),
|
||||
Type: req.Lock.GetType(),
|
||||
Sid: req.Lock.GetSid(),
|
||||
Owner: req.Lock.GetOwner(),
|
||||
Pid: req.Lock.GetPid(),
|
||||
IsFlock: req.Lock.GetIsFlock(),
|
||||
}
|
||||
|
||||
resp := &filer_pb.PosixLockResponse{}
|
||||
switch req.Op {
|
||||
case filer_pb.PosixLockOp_TRY_LOCK:
|
||||
if c, granted := fs.posixLocks.TryLock(req.Key, lk); granted {
|
||||
resp.Granted = true
|
||||
} else {
|
||||
resp.HasConflict = true
|
||||
resp.Conflict = posixRangeToPb(c)
|
||||
}
|
||||
case filer_pb.PosixLockOp_UNLOCK:
|
||||
fs.posixLocks.Unlock(req.Key, lk)
|
||||
case filer_pb.PosixLockOp_GET_LK:
|
||||
if c, found := fs.posixLocks.GetLk(req.Key, lk); found {
|
||||
resp.HasConflict = true
|
||||
resp.Conflict = posixRangeToPb(c)
|
||||
}
|
||||
case filer_pb.PosixLockOp_RELEASE_POSIX_OWNER:
|
||||
fs.posixLocks.ReleasePosixOwner(req.Key, lk.Sid, lk.Owner)
|
||||
case filer_pb.PosixLockOp_RELEASE_FLOCK_OWNER:
|
||||
fs.posixLocks.ReleaseFlockOwner(req.Key, lk.Sid, lk.Owner)
|
||||
default:
|
||||
return &filer_pb.PosixLockResponse{}, fmt.Errorf("unknown posix lock op %v", req.Op)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func posixRangeToPb(r posixlock.Range) *filer_pb.PosixLockRange {
|
||||
return &filer_pb.PosixLockRange{
|
||||
Start: r.Start,
|
||||
End: r.End,
|
||||
Type: r.Type,
|
||||
Sid: r.Sid,
|
||||
Owner: r.Owner,
|
||||
Pid: r.Pid,
|
||||
IsFlock: r.IsFlock,
|
||||
}
|
||||
}
|
||||
135
weed/server/filer_grpc_server_posix_lock_test.go
Normal file
135
weed/server/filer_grpc_server_posix_lock_test.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer/posixlock"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
func newPosixTestServer() *FilerServer {
|
||||
fs, _ := newTxnTestServer(nil)
|
||||
fs.posixLocks = posixlock.NewManager()
|
||||
return fs
|
||||
}
|
||||
|
||||
func pbLock(start, end uint64, typ uint32, sid, owner uint64, pid uint32, flock bool) *filer_pb.PosixLockRange {
|
||||
return &filer_pb.PosixLockRange{Start: start, End: end, Type: typ, Sid: sid, Owner: owner, Pid: pid, IsFlock: flock}
|
||||
}
|
||||
|
||||
func posixOp(t *testing.T, fs *FilerServer, op filer_pb.PosixLockOp, lk *filer_pb.PosixLockRange) *filer_pb.PosixLockResponse {
|
||||
t.Helper()
|
||||
resp, err := fs.PosixLock(context.Background(), &filer_pb.PosixLockRequest{Key: "s3.fuse.lock:/x", Op: op, Lock: lk})
|
||||
if err != nil {
|
||||
t.Fatalf("PosixLock op=%v: %v", op, err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func TestPosixLockGrantAndConflict(t *testing.T) {
|
||||
fs := newPosixTestServer()
|
||||
|
||||
if r := posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(0, 99, posixlock.Write, 1, 1, 7, false)); !r.Granted {
|
||||
t.Fatal("first lock should be granted")
|
||||
}
|
||||
// Conflicting writer from another session: rejected, conflict reported.
|
||||
r := posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(50, 149, posixlock.Write, 2, 1, 8, false))
|
||||
if r.Granted {
|
||||
t.Fatal("overlapping lock from another session should conflict")
|
||||
}
|
||||
if !r.HasConflict || r.Conflict.GetPid() != 7 || r.Conflict.GetStart() != 0 || r.Conflict.GetEnd() != 99 {
|
||||
t.Fatalf("conflict not reported correctly: %+v", r.Conflict)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosixLockUnlockThenReacquire(t *testing.T) {
|
||||
fs := newPosixTestServer()
|
||||
posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(0, 99, posixlock.Write, 1, 1, 7, false))
|
||||
posixOp(t, fs, filer_pb.PosixLockOp_UNLOCK, pbLock(0, 99, posixlock.Unlock, 1, 1, 7, false))
|
||||
if r := posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(0, 99, posixlock.Write, 2, 1, 8, false)); !r.Granted {
|
||||
t.Fatal("lock should be grantable after the holder unlocked")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosixLockGetLk(t *testing.T) {
|
||||
fs := newPosixTestServer()
|
||||
posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(10, 50, posixlock.Write, 1, 1, 7, false))
|
||||
|
||||
r := posixOp(t, fs, filer_pb.PosixLockOp_GET_LK, pbLock(30, 70, posixlock.Read, 2, 1, 8, false))
|
||||
if !r.HasConflict || r.Conflict.GetPid() != 7 {
|
||||
t.Fatalf("GET_LK should report the holder, got %+v", r)
|
||||
}
|
||||
// No conflict for the same owner.
|
||||
r = posixOp(t, fs, filer_pb.PosixLockOp_GET_LK, pbLock(30, 70, posixlock.Read, 1, 1, 7, false))
|
||||
if r.HasConflict {
|
||||
t.Fatal("an owner should not conflict with itself")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosixLockReleasePosixOwnerKeepsFlock(t *testing.T) {
|
||||
fs := newPosixTestServer()
|
||||
posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(0, 99, posixlock.Write, 1, 1, 7, false))
|
||||
posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(0, 1<<63, posixlock.Write, 1, 1, 7, true))
|
||||
|
||||
posixOp(t, fs, filer_pb.PosixLockOp_RELEASE_POSIX_OWNER, pbLock(0, 0, posixlock.Unlock, 1, 1, 7, false))
|
||||
|
||||
// fcntl gone, flock remains.
|
||||
if r := posixOp(t, fs, filer_pb.PosixLockOp_TRY_LOCK, pbLock(0, 99, posixlock.Write, 2, 1, 8, false)); !r.Granted {
|
||||
t.Fatal("fcntl lock should be gone after RELEASE_POSIX_OWNER")
|
||||
}
|
||||
if r := posixOp(t, fs, filer_pb.PosixLockOp_GET_LK, pbLock(0, 10, posixlock.Write, 3, 3, 9, true)); !r.HasConflict {
|
||||
t.Fatal("flock lock should survive RELEASE_POSIX_OWNER")
|
||||
}
|
||||
}
|
||||
|
||||
// A request whose key is owned by another filer is forwarded to it; the owner
|
||||
// applies it and the sender does not. The owner's ring points back at the bogus
|
||||
// sender, so without is_moved on the forwarded hop it would re-forward and fail.
|
||||
func TestPosixLockForwardsToOwner(t *testing.T) {
|
||||
const key = "s3.fuse.lock:/x"
|
||||
owner := newPosixTestServer()
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("listen: %v", err)
|
||||
}
|
||||
port := lis.Addr().(*net.TCPAddr).Port
|
||||
ownerAddr := pb.NewServerAddressWithGrpcPort(lis.Addr().String(), port)
|
||||
sender := pb.ServerAddress("127.0.0.1:1")
|
||||
|
||||
withRing(owner, ownerAddr, sender)
|
||||
owner.grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
srv := grpc.NewServer()
|
||||
filer_pb.RegisterSeaweedFilerServer(srv, owner)
|
||||
go srv.Serve(lis)
|
||||
t.Cleanup(srv.Stop)
|
||||
|
||||
self := newPosixTestServer()
|
||||
withRing(self, sender, ownerAddr)
|
||||
self.grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
resp, err := self.PosixLock(ctx, &filer_pb.PosixLockRequest{
|
||||
Key: key, Op: filer_pb.PosixLockOp_TRY_LOCK,
|
||||
Lock: pbLock(0, 99, posixlock.Write, 1, 1, 7, false),
|
||||
})
|
||||
if err != nil || !resp.GetGranted() {
|
||||
t.Fatalf("forwarded TRY_LOCK: err=%v granted=%v", err, resp.GetGranted())
|
||||
}
|
||||
|
||||
// The lock landed on the owner: a conflicting acquire there is rejected.
|
||||
if _, granted := owner.posixLocks.TryLock(key, posixlock.Range{Start: 50, End: 149, Type: posixlock.Write, Sid: 2, Owner: 1}); granted {
|
||||
t.Fatal("owner should hold the forwarded lock")
|
||||
}
|
||||
// The sender did not apply locally.
|
||||
if _, granted := self.posixLocks.TryLock(key, posixlock.Range{Start: 0, End: 99, Type: posixlock.Write, Sid: 9, Owner: 9}); !granted {
|
||||
t.Fatal("sender must forward, not apply locally")
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer/posixlock"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra2"
|
||||
@@ -132,6 +133,12 @@ type FilerServer struct {
|
||||
// the distributed lock for that key. Idle keys are evicted automatically, so
|
||||
// the table stays bounded.
|
||||
entryLockTable *util.LockTable[util.FullPath]
|
||||
|
||||
// posixLocks is the in-memory authority for cross-mount POSIX advisory locks
|
||||
// on inodes this filer owns (per the route-by-key ring). Lock state is kept
|
||||
// here rather than in replicated metadata: it is transient coordination, so
|
||||
// keeping it off the meta-log avoids churn.
|
||||
posixLocks *posixlock.Manager
|
||||
}
|
||||
|
||||
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
|
||||
@@ -171,6 +178,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
||||
recentCopyRequests: make(map[string]recentCopyRequest),
|
||||
CredentialManager: option.CredentialManager,
|
||||
entryLockTable: util.NewLockTable[util.FullPath](),
|
||||
posixLocks: posixlock.NewManager(),
|
||||
}
|
||||
fs.mountPeerRegistry = filer.NewMountPeerRegistry()
|
||||
go fs.runMountPeerRegistrySweeper()
|
||||
|
||||
Reference in New Issue
Block a user