filer: evaluate a write precondition in CreateEntry (#9650)

Add an optional WriteCondition to CreateEntryRequest. When set, the filer
evaluates it against the current entry while holding the per-path lock, so the
check and the write are atomic on this filer, and returns PRECONDITION_FAILED
when it does not hold. The caller must route the key's writes to the owner filer
for the check to be authoritative.

A condition is a list of clauses that all must hold (logical AND). One clause is
the common case; several express what a single comparison cannot: an ETag set
(If-Match / If-None-Match with multiple values), weak-ETag comparison, and
compound conditions. ETag comparison mirrors the S3 gateway's precedence (stored
Seaweed ETag attribute, then the Md5/chunk fallback) and follows RFC 7232
strong/weak rules, so results match without coupling the filer to S3 handling.

Condition parsing and evaluation live in filer_grpc_server_condition.go.
This commit is contained in:
Chris Lu
2026-05-23 16:29:14 -07:00
committed by GitHub
parent bce76e6e21
commit b18d3dc96c
6 changed files with 978 additions and 406 deletions

View File

@@ -222,6 +222,41 @@ message CreateEntryRequest {
bool is_from_other_cluster = 4;
repeated int32 signatures = 5;
bool skip_check_parent_directory = 6;
// Optional precondition evaluated against the current entry atomically with
// the write, under the filer's per-path lock. The caller must route the
// key's writes to this entry's owner filer for the check to be authoritative.
WriteCondition condition = 7;
}
// WriteCondition is the precondition the filer evaluates against the existing
// entry before writing, under the per-path lock. A failed condition returns
// FilerError PRECONDITION_FAILED. The client maps request semantics (e.g. RFC
// 7232) to clauses; the filer just compares.
//
// A condition is a list of clauses that ALL must hold (logical AND). One clause
// is the common case; several express what a single comparison cannot: an ETag
// set (If-Match / If-None-Match with multiple values), weak-ETag comparison, and
// compound conditions (e.g. If-Match + If-Unmodified-Since together).
message WriteCondition {
enum Kind {
NONE = 0; // unconditional
IF_NOT_EXISTS = 1; // fail if the entry exists (If-None-Match: *)
IF_EXISTS = 2; // fail if the entry is absent (If-Match: *)
IF_ETAG_MATCH = 3; // fail if absent or etag matches none of the set (If-Match)
IF_ETAG_NOT_MATCH = 4; // fail if present and etag matches any of the set (If-None-Match)
IF_UNMODIFIED_SINCE = 5; // fail if present and mtime > unix_time
IF_MODIFIED_SINCE = 6; // fail if present and mtime <= unix_time
}
// Clause is one primitive comparison. IF_ETAG_MATCH holds when the current
// entry's ETag equals any value in etags; IF_ETAG_NOT_MATCH holds when it
// equals none. allow_weak permits weak-comparison (ignoring the W/ prefix).
message Clause {
Kind kind = 1;
repeated string etags = 2; // ETag set for IF_ETAG_* kinds
int64 unix_time = 3; // bound (unix seconds) for IF_*_SINCE kinds
bool allow_weak = 4; // compare ETags ignoring the weak (W/) marker
}
repeated Clause clauses = 1; // all must hold (logical AND)
}
// Structured error codes for filer entry operations.
@@ -233,6 +268,7 @@ enum FilerError {
EXISTING_IS_DIRECTORY = 3; // cannot overwrite directory with file
EXISTING_IS_FILE = 4; // cannot overwrite file with directory
ENTRY_ALREADY_EXISTS = 5; // O_EXCL and entry already exists
PRECONDITION_FAILED = 6; // WriteCondition not satisfied
}
message CreateEntryResponse {
@@ -421,6 +457,7 @@ message SubscribeMetadataRequest {
repeated string directories = 10; // exact directory to watch
bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events
bool client_supports_metadata_chunks = 12; // client can read log file chunks from volume servers
bool client_supports_idle_heartbeat = 13; // server may send empty responses carrying the current time while the client is caught up
}
message SubscribeMetadataResponse {
string directory = 1;

View File

@@ -222,6 +222,41 @@ message CreateEntryRequest {
bool is_from_other_cluster = 4;
repeated int32 signatures = 5;
bool skip_check_parent_directory = 6;
// Optional precondition evaluated against the current entry atomically with
// the write, under the filer's per-path lock. The caller must route the
// key's writes to this entry's owner filer for the check to be authoritative.
WriteCondition condition = 7;
}
// WriteCondition is the precondition the filer evaluates against the existing
// entry before writing, under the per-path lock. A failed condition returns
// FilerError PRECONDITION_FAILED. The client maps request semantics (e.g. RFC
// 7232) to clauses; the filer just compares.
//
// A condition is a list of clauses that ALL must hold (logical AND). One clause
// is the common case; several express what a single comparison cannot: an ETag
// set (If-Match / If-None-Match with multiple values), weak-ETag comparison, and
// compound conditions (e.g. If-Match + If-Unmodified-Since together).
message WriteCondition {
enum Kind {
NONE = 0; // unconditional
IF_NOT_EXISTS = 1; // fail if the entry exists (If-None-Match: *)
IF_EXISTS = 2; // fail if the entry is absent (If-Match: *)
IF_ETAG_MATCH = 3; // fail if absent or etag matches none of the set (If-Match)
IF_ETAG_NOT_MATCH = 4; // fail if present and etag matches any of the set (If-None-Match)
IF_UNMODIFIED_SINCE = 5; // fail if present and mtime > unix_time
IF_MODIFIED_SINCE = 6; // fail if present and mtime <= unix_time
}
// Clause is one primitive comparison. IF_ETAG_MATCH holds when the current
// entry's ETag equals any value in etags; IF_ETAG_NOT_MATCH holds when it
// equals none. allow_weak permits weak-comparison (ignoring the W/ prefix).
message Clause {
Kind kind = 1;
repeated string etags = 2; // ETag set for IF_ETAG_* kinds
int64 unix_time = 3; // bound (unix seconds) for IF_*_SINCE kinds
bool allow_weak = 4; // compare ETags ignoring the weak (W/) marker
}
repeated Clause clauses = 1; // all must hold (logical AND)
}
// Structured error codes for filer entry operations.
@@ -233,6 +268,7 @@ enum FilerError {
EXISTING_IS_DIRECTORY = 3; // cannot overwrite directory with file
EXISTING_IS_FILE = 4; // cannot overwrite file with directory
ENTRY_ALREADY_EXISTS = 5; // O_EXCL and entry already exists
PRECONDITION_FAILED = 6; // WriteCondition not satisfied
}
message CreateEntryResponse {

File diff suppressed because it is too large Load Diff

View File

@@ -192,6 +192,25 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
pathLock := fs.entryLockTable.AcquireLock("CreateEntry", fullpath, util.ExclusiveLock)
defer fs.entryLockTable.ReleaseLock(fullpath, pathLock)
// Evaluate the optional precondition against the current entry while the
// path lock is held, so the check and the write are atomic on this filer.
if conditionIsSet(req.Condition) {
current, findErr := fs.filer.FindEntry(ctx, fullpath)
if findErr != nil && findErr != filer_pb.ErrNotFound {
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry condition check %s: %w", fullpath, findErr)
}
if findErr == filer_pb.ErrNotFound {
current = nil
}
if !writeConditionSatisfied(req.Condition, current) {
glog.V(3).InfofCtx(ctx, "CreateEntry %s: precondition failed: %v", fullpath, req.Condition)
return &filer_pb.CreateEntryResponse{
Error: "precondition failed",
ErrorCode: filer_pb.FilerError_PRECONDITION_FAILED,
}, nil
}
}
ctx, eventSink := filer.WithMetadataEventSink(ctx)
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)

View File

@@ -0,0 +1,96 @@
package weed_server
import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
// conditionIsSet reports whether a condition asks for any check at all.
func conditionIsSet(cond *filer_pb.WriteCondition) bool {
return cond != nil && len(cond.Clauses) > 0
}
// writeConditionSatisfied reports whether the precondition holds against the
// current entry (nil if absent), evaluated under the path lock. Every clause
// must hold (logical AND).
func writeConditionSatisfied(cond *filer_pb.WriteCondition, current *filer.Entry) bool {
for _, c := range cond.Clauses {
if !clauseSatisfied(c, current) {
return false
}
}
return true
}
// clauseSatisfied evaluates one primitive against the current entry. For the
// ETag kinds, etags is a set: IF_ETAG_MATCH holds when the current ETag equals
// any member, IF_ETAG_NOT_MATCH when it equals none.
func clauseSatisfied(c *filer_pb.WriteCondition_Clause, current *filer.Entry) bool {
exists := current != nil
switch c.Kind {
case filer_pb.WriteCondition_IF_NOT_EXISTS:
return !exists
case filer_pb.WriteCondition_IF_EXISTS:
return exists
case filer_pb.WriteCondition_IF_ETAG_MATCH:
return exists && etagInSet(storedEntryETag(current), c.Etags, c.AllowWeak)
case filer_pb.WriteCondition_IF_ETAG_NOT_MATCH:
return !exists || !etagInSet(storedEntryETag(current), c.Etags, c.AllowWeak)
case filer_pb.WriteCondition_IF_UNMODIFIED_SINCE:
return !exists || current.Attr.Mtime.Unix() <= c.UnixTime
case filer_pb.WriteCondition_IF_MODIFIED_SINCE:
return !exists || current.Attr.Mtime.Unix() > c.UnixTime
default:
return true
}
}
// etagInSet reports whether stored matches any candidate. A strong comparison
// (allowWeak false) treats a weak ETag as never equal; a weak comparison
// ignores the W/ marker on both sides.
func etagInSet(stored string, candidates []string, allowWeak bool) bool {
for _, c := range candidates {
if etagEqual(stored, c, allowWeak) {
return true
}
}
return false
}
func etagEqual(stored, expected string, allowWeak bool) bool {
sv, sWeak := canonicalETag(stored)
ev, eWeak := canonicalETag(expected)
// RFC 7232 strong comparison: a weak ETag on either side never matches.
if !allowWeak && (sWeak || eWeak) {
return false
}
return sv == ev
}
// canonicalETag splits off the weak (W/) marker before stripping quotes, so a
// weak ETag like W/"abc" yields ("abc", true).
func canonicalETag(etag string) (value string, weak bool) {
etag = strings.TrimSpace(etag)
if strings.HasPrefix(etag, "W/") {
return strings.Trim(etag[len("W/"):], `"`), true
}
return strings.Trim(etag, `"`), false
}
// storedEntryETag mirrors the S3 gateway's ETag precedence (the stored
// Seaweed ETag extended attribute, then the chunk/Md5 fallback) so conditional
// comparisons match what the gateway computes, without coupling the filer to
// S3 request handling.
func storedEntryETag(entry *filer.Entry) string {
if v, ok := entry.Extended[s3_constants.ExtETagKey]; ok && len(v) > 0 {
return normalizeETag(string(v))
}
return normalizeETag(filer.ETagEntry(entry))
}
func normalizeETag(etag string) string {
return strings.Trim(etag, `"`)
}

View File

@@ -0,0 +1,161 @@
package weed_server
import (
"context"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func entryWithETag(etag string, mtime time.Time) *filer.Entry {
return &filer.Entry{
FullPath: "/test/obj",
Attr: filer.Attr{Mtime: mtime},
Extended: map[string][]byte{s3_constants.ExtETagKey: []byte(etag)},
}
}
// one wraps a single clause into a condition.
func one(c *filer_pb.WriteCondition_Clause) *filer_pb.WriteCondition {
return &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{c}}
}
func TestWriteConditionSatisfied(t *testing.T) {
base := time.Unix(1700000000, 0)
present := entryWithETag("abc", base)
cases := []struct {
name string
cond *filer_pb.WriteCondition
cur *filer.Entry
want bool
}{
{"empty-absent", &filer_pb.WriteCondition{}, nil, true},
{"ifnotexists-absent", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_NOT_EXISTS}), nil, true},
{"ifnotexists-present", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_NOT_EXISTS}), present, false},
{"ifexists-absent", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_EXISTS}), nil, false},
{"ifexists-present", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_EXISTS}), present, true},
{"etagmatch-hit", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"abc"`}}), present, true},
{"etagmatch-miss", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"zzz"`}}), present, false},
{"etagmatch-absent", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"abc"`}}), nil, false},
{"etagnotmatch-hit", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, Etags: []string{`"abc"`}}), present, false},
{"etagnotmatch-miss", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, Etags: []string{`"zzz"`}}), present, true},
{"etagnotmatch-absent", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, Etags: []string{`"abc"`}}), nil, true},
{"unmodsince-ok", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_UNMODIFIED_SINCE, UnixTime: base.Unix()}), present, true},
{"unmodsince-fail", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_UNMODIFIED_SINCE, UnixTime: base.Unix() - 1}), present, false},
{"modsince-ok", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_MODIFIED_SINCE, UnixTime: base.Unix() - 1}), present, true},
{"modsince-fail", one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_MODIFIED_SINCE, UnixTime: base.Unix()}), present, false},
}
for _, tc := range cases {
if got := writeConditionSatisfied(tc.cond, tc.cur); got != tc.want {
t.Errorf("%s: got %v want %v", tc.name, got, tc.want)
}
}
}
func TestWriteConditionClauses(t *testing.T) {
base := time.Unix(1700000000, 0)
present := entryWithETag("abc", base)
matchAny := func(etags ...string) *filer_pb.WriteCondition {
return &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: etags},
}}
}
noneOf := func(etags ...string) *filer_pb.WriteCondition {
return &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
{Kind: filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, Etags: etags},
}}
}
cases := []struct {
name string
cond *filer_pb.WriteCondition
cur *filer.Entry
want bool
}{
{"set-match-hit", matchAny(`"x"`, `"abc"`, `"y"`), present, true},
{"set-match-miss", matchAny(`"x"`, `"y"`), present, false},
{"set-none-clean", noneOf(`"x"`, `"y"`), present, true},
{"set-none-hit", noneOf(`"abc"`, `"y"`), present, false},
// A weak request ETag never matches under strong comparison.
{"weak-strong-fails", matchAny(`W/"abc"`), present, false},
// allow_weak compares ignoring the W/ marker.
{"weak-allowed", &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`W/"abc"`}, AllowWeak: true},
}}, present, true},
// Compound clauses are ANDed.
{"compound-ok", &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"abc"`}},
{Kind: filer_pb.WriteCondition_IF_UNMODIFIED_SINCE, UnixTime: base.Unix()},
}}, present, true},
{"compound-second-fails", &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{`"abc"`}},
{Kind: filer_pb.WriteCondition_IF_UNMODIFIED_SINCE, UnixTime: base.Unix() - 1},
}}, present, false},
}
for _, tc := range cases {
if got := writeConditionSatisfied(tc.cond, tc.cur); got != tc.want {
t.Errorf("%s: got %v want %v", tc.name, got, tc.want)
}
}
}
// storedEntryETag prefers the stored Seaweed ETag attribute and falls back to
// the Md5-derived ETag, matching the S3 gateway.
func TestStoredEntryETag(t *testing.T) {
withExt := entryWithETag("explicit", time.Unix(0, 0))
if got := storedEntryETag(withExt); got != "explicit" {
t.Errorf("extended etag: got %q", got)
}
md5Only := &filer.Entry{Attr: filer.Attr{Md5: []byte{0xab, 0xcd}}}
if got := storedEntryETag(md5Only); got != "abcd" {
t.Errorf("md5 fallback: got %q", got)
}
}
// The CreateEntry handler enforces the precondition atomically: a matching
// If-Match overwrites, a non-matching one returns PRECONDITION_FAILED.
func TestCreateEntryConditionEnforced(t *testing.T) {
store := newRenameTestStore()
store.entries["/test/obj"] = &filer.Entry{
FullPath: "/test/obj",
Attr: filer.Attr{Inode: 1, Mtime: time.Unix(1700000000, 0)},
Extended: map[string][]byte{s3_constants.ExtETagKey: []byte("abc")},
}
f := newRenameTestFiler(store)
f.DirBucketsPath = "/buckets"
fs := &FilerServer{filer: f, option: &FilerOption{}, entryLockTable: util.NewLockTable[util.FullPath]()}
req := func(etag string) *filer_pb.CreateEntryRequest {
return &filer_pb.CreateEntryRequest{
Directory: "/test",
SkipCheckParentDirectory: true,
Entry: &filer_pb.Entry{
Name: "obj",
Attributes: &filer_pb.FuseAttributes{Mtime: 1700000001, FileMode: 0644, Inode: 2},
},
Condition: one(&filer_pb.WriteCondition_Clause{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etags: []string{etag}}),
}
}
resp, err := fs.CreateEntry(context.Background(), req(`"zzz"`))
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if resp.ErrorCode != filer_pb.FilerError_PRECONDITION_FAILED {
t.Fatalf("mismatched etag: want PRECONDITION_FAILED, got %v (%q)", resp.ErrorCode, resp.Error)
}
resp, err = fs.CreateEntry(context.Background(), req(`"abc"`))
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if resp.Error != "" {
t.Fatalf("matching etag should overwrite, got error %q", resp.Error)
}
}