perf(filer.sync): don't serialize descendants behind dir attribute updates (#9079)

* perf(filer.sync): don't serialize descendants behind dir attribute updates

The MetadataProcessor treated every in-flight directory job as a subtree
barrier: any active dir job at /foo forced all file events under /foo to
wait, and because the admit loop runs on the single stream.Recv()
goroutine, a stalled descendant also stalled the whole gRPC stream. For
large directories this turned every attribute-only dir event (mtime /
xattr / chmod bumps) into a full-subtree pinch point.

Classify dir jobs as barrier (create / delete / rename) vs non-barrier
(filer_pb.IsUpdate on a directory — same parent and same name, i.e. an
in-place attribute update). Only barrier dirs block descendants and get
blocked by ancestor barrier dirs. Non-barrier dir updates still bump the
ancestor descendantCount, so an incoming barrier dir on an ancestor
still waits for them — preserving the "delete /a waits for in-flight
/a/b update" safety.

Tests cover the loosened cases and the preserved barriers:
non-barrier update doesn't block a file descendant, barrier create
still does, barrier delete still waits for in-flight descendants, and
a barrier ancestor still waits for a non-barrier descendant update.

* fix(filer.sync): serialize same-path barrier dir jobs against concurrent ops

Review (Gemini) flagged that pathConflicts had latent same-path gaps
that predated this PR but deserve fixing alongside the dir-conflict
loosening: two barrier dir jobs at the same path could run concurrently
(e.g. create /a and delete /a), and a file job at the same path as an
in-flight barrier dir wasn't blocked either.

Tighten pathConflicts so that:
- an active barrier dir at p blocks every incoming job at p (file,
  barrier dir, or non-barrier attribute update) — same-path promotions,
  renames, and delete/create collisions must serialize;
- an active file at p blocks incoming files and barrier dirs at p;
- non-barrier dir updates at the same path still overlap with each
  other (attribute bumps are last-writer-wins, intentional).

TestDirVsDirConflict and TestFileUnderActiveDirConflict flip their
"same path does not conflict" assertions to match. New
TestSamePathBarrierSerialization covers all five same-path cases
explicitly.

* fix(filer.sync): serialize incoming barrier dir against same-path non-barrier update

Bug introduced by the previous same-path tightening commit and caught
in review (CodeRabbit, critical): a kindNonBarrierDir at /dir1 was not
indexed at its own path, so a later kindBarrierDir at /dir1 saw neither
activeBarrierDirPaths["/dir1"] nor descendantCount["/dir1"] (the latter
only counts strict descendants) and was admitted concurrently with the
in-flight attribute update. That violated the "barrier at p serializes
all work at p" rule.

Track non-barrier dir jobs in a new activeNonBarrierDirPaths map and
check it only from the incoming-barrier-dir branch of pathConflicts.
The map is deliberately invisible to the ancestor check, so non-barrier
updates still don't serialize file descendants — the loosening this PR
is about stays intact.

Regression test added in TestSamePathBarrierSerialization covers both
the admission conflict and the index cleanup on job completion.
This commit is contained in:
Chris Lu
2026-04-14 18:34:05 -07:00
committed by GitHub
parent 40c1797f8e
commit c2f5db3a02
2 changed files with 380 additions and 95 deletions

View File

@@ -27,10 +27,32 @@ func (h *tsMinHeap) Pop() any {
return x
}
// jobKind classifies a sync job for conflict detection. Directory events are
// split into "barrier" (create/delete/rename) and "non-barrier" (in-place
// attribute update) so that attribute-only directory updates — which do not
// reshape the namespace — no longer serialize every file operation in the
// subtree.
type jobKind int
const (
// kindFile is a regular file event.
kindFile jobKind = iota
// kindBarrierDir is a directory create, delete, or rename. It acts as a
// subtree barrier: it waits for all active descendants to drain, and it
// blocks every event under it from being admitted until it completes.
kindBarrierDir
// kindNonBarrierDir is a directory attribute update (mtime/xattr/chmod
// with the same parent and name). It does not block descendants and is
// not blocked by ancestor directories, but it still bumps the ancestor
// descendant counters so an incoming barrier dir on an ancestor path
// still waits for it to drain.
kindNonBarrierDir
)
type syncJobPaths struct {
path util.FullPath
newPath util.FullPath // empty for non-renames
isDirectory bool
path util.FullPath
newPath util.FullPath // empty for non-renames
kind jobKind
}
type MetadataProcessor struct {
@@ -44,9 +66,21 @@ type MetadataProcessor struct {
// Indexes for O(depth) conflict detection, replacing O(n) linear scan.
// activeFilePaths counts active file jobs at each exact path.
activeFilePaths map[util.FullPath]int
// activeDirPaths counts active directory jobs at each exact path.
activeDirPaths map[util.FullPath]int
// descendantCount counts active jobs (file or dir) strictly under each directory.
// activeBarrierDirPaths counts active barrier-dir jobs at each exact
// path. Only barrier dirs are tracked here; non-barrier dir updates are
// deliberately invisible to the ancestor check so that they don't
// serialize every file descendant.
activeBarrierDirPaths map[util.FullPath]int
// activeNonBarrierDirPaths counts active non-barrier dir jobs at each
// exact path. This is read *only* by incoming barrier dirs, so a
// delete/rename/create at p correctly waits for an in-flight chmod/
// xattr/mtime update at the same p. It is deliberately invisible to the
// ancestor check, so non-barrier updates still don't serialize file
// descendants.
activeNonBarrierDirPaths map[util.FullPath]int
// descendantCount counts active jobs (of any kind) strictly under each
// directory. Read by incoming barrier dirs so they wait for their whole
// subtree to drain before running, regardless of descendant kind.
descendantCount map[util.FullPath]int
// tsHeap is a min-heap of active job timestamps with lazy deletion,
@@ -56,12 +90,13 @@ type MetadataProcessor struct {
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*syncJobPaths),
concurrencyLimit: concurrency,
activeFilePaths: make(map[util.FullPath]int),
activeDirPaths: make(map[util.FullPath]int),
descendantCount: make(map[util.FullPath]int),
fn: fn,
activeJobs: make(map[int64]*syncJobPaths),
concurrencyLimit: concurrency,
activeFilePaths: make(map[util.FullPath]int),
activeBarrierDirPaths: make(map[util.FullPath]int),
activeNonBarrierDirPaths: make(map[util.FullPath]int),
descendantCount: make(map[util.FullPath]int),
}
t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
@@ -86,11 +121,14 @@ func pathAncestors(p util.FullPath) []util.FullPath {
// addPathToIndex registers a path in the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) {
if isDirectory {
t.activeDirPaths[p]++
} else {
func (t *MetadataProcessor) addPathToIndex(p util.FullPath, kind jobKind) {
switch kind {
case kindFile:
t.activeFilePaths[p]++
case kindBarrierDir:
t.activeBarrierDirPaths[p]++
case kindNonBarrierDir:
t.activeNonBarrierDirPaths[p]++
}
for _, ancestor := range pathAncestors(p) {
t.descendantCount[ancestor]++
@@ -99,19 +137,26 @@ func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) {
// removePathFromIndex unregisters a path from the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) {
if isDirectory {
if t.activeDirPaths[p] <= 1 {
delete(t.activeDirPaths, p)
} else {
t.activeDirPaths[p]--
}
} else {
func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, kind jobKind) {
switch kind {
case kindFile:
if t.activeFilePaths[p] <= 1 {
delete(t.activeFilePaths, p)
} else {
t.activeFilePaths[p]--
}
case kindBarrierDir:
if t.activeBarrierDirPaths[p] <= 1 {
delete(t.activeBarrierDirPaths, p)
} else {
t.activeBarrierDirPaths[p]--
}
case kindNonBarrierDir:
if t.activeNonBarrierDirPaths[p] <= 1 {
delete(t.activeNonBarrierDirPaths, p)
} else {
t.activeNonBarrierDirPaths[p]--
}
}
for _, ancestor := range pathAncestors(p) {
if t.descendantCount[ancestor] <= 1 {
@@ -123,26 +168,50 @@ func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory boo
}
// pathConflicts checks if a single path conflicts with any active job.
// Conflict rules match pairShouldWaitFor:
// - file vs file: exact same path
// - file vs dir: file.IsUnder(dir)
// - dir vs file: file.IsUnder(dir)
// - dir vs dir: either IsUnder the other
func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool {
if isDirectory {
// Any active job (file or dir) strictly under this directory?
if t.descendantCount[p] > 0 {
return true
}
} else {
// Exact same file already active?
if t.activeFilePaths[p] > 0 {
return true
}
// Conflict rules:
// - any kind vs same-path barrier dir: wait (a create/delete/rename on p
// must fully serialize against any other operation touching p, including
// non-barrier attribute updates and files at the same path)
// - incoming barrier dir vs same-path non-barrier dir update: wait (a
// delete/rename/create on p must wait for an in-flight chmod/xattr/mtime
// update at the same p to drain)
// - file vs same-path file: wait
// - file vs same-path barrier dir: wait (covered by the barrier-at-p check
// above; also serializes a file-to-dir / dir-to-file promotion)
// - barrier dir vs same-path file: wait
// - barrier dir vs any descendant (file or dir, barrier or not): wait
// - barrier ancestor: always wait, regardless of incoming kind
// - non-barrier dir vs descendants: never conflicts
// - non-barrier dir vs same-path non-barrier dir: never conflicts (attribute
// bumps are "last writer wins"; this intentionally lets rapid mtime /
// xattr updates overlap)
func (t *MetadataProcessor) pathConflicts(p util.FullPath, kind jobKind) bool {
// A barrier dir in flight at p serializes every new job at p. This is the
// strictest same-path rule and applies regardless of incoming kind.
if t.activeBarrierDirPaths[p] > 0 {
return true
}
// Any active directory that is a proper ancestor of p?
// An incoming barrier dir must also wait for any in-flight non-barrier
// dir update at the same path. Without this check, a delete or rename on
// a directory could overlap with an attribute bump in progress for the
// same directory.
if kind == kindBarrierDir && t.activeNonBarrierDirPaths[p] > 0 {
return true
}
// A file in flight at p blocks new file or barrier-dir jobs at p. A
// non-barrier dir update at p is allowed through — by construction files
// and dirs at the same path only coexist across a promotion, which is a
// barrier event handled by the check above.
if t.activeFilePaths[p] > 0 && (kind == kindFile || kind == kindBarrierDir) {
return true
}
// Barrier dirs additionally wait for their whole in-flight subtree.
if kind == kindBarrierDir && t.descendantCount[p] > 0 {
return true
}
// Any barrier dir on a proper ancestor blocks everything under it.
for _, ancestor := range pathAncestors(p) {
if t.activeDirPaths[ancestor] > 0 {
if t.activeBarrierDirPaths[ancestor] > 0 {
return true
}
}
@@ -150,11 +219,11 @@ func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) boo
}
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
p, newPath, isDirectory := extractPathsFromMetadata(resp)
if t.pathConflicts(p, isDirectory) {
p, newPath, kind := extractJobInfo(resp)
if t.pathConflicts(p, kind) {
return true
}
if newPath != "" && t.pathConflicts(newPath, isDirectory) {
if newPath != "" && t.pathConflicts(newPath, kind) {
return true
}
return false
@@ -172,13 +241,13 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
t.activeJobsCond.Wait()
}
p, newPath, isDirectory := extractPathsFromMetadata(resp)
jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory}
p, newPath, kind := extractJobInfo(resp)
jobPaths := &syncJobPaths{path: p, newPath: newPath, kind: kind}
t.activeJobs[resp.TsNs] = jobPaths
t.addPathToIndex(p, isDirectory)
t.addPathToIndex(p, kind)
if newPath != "" {
t.addPathToIndex(newPath, isDirectory)
t.addPathToIndex(newPath, kind)
}
heap.Push(&t.tsHeap, resp.TsNs)
@@ -195,9 +264,9 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
defer t.activeJobsLock.Unlock()
delete(t.activeJobs, resp.TsNs)
t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory)
t.removePathFromIndex(jobPaths.path, jobPaths.kind)
if jobPaths.newPath != "" {
t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory)
t.removePathFromIndex(jobPaths.newPath, jobPaths.kind)
}
// Lazy-clean stale entries from heap top (already-completed jobs).
@@ -216,28 +285,47 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
}()
}
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) {
// extractJobInfo derives the conflict-detection path(s) and job kind for a
// metadata event. A rename returns both the source and destination paths; all
// other event shapes return only the primary path.
func extractJobInfo(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, kind jobKind) {
oldEntry := resp.EventNotification.OldEntry
newEntry := resp.EventNotification.NewEntry
// create
if filer_pb.IsCreate(resp) {
p = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
kind = classifyDirEvent(newEntry.IsDirectory, false)
return
}
if filer_pb.IsDelete(resp) {
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
kind = classifyDirEvent(oldEntry.IsDirectory, false)
return
}
if filer_pb.IsUpdate(resp) {
p = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
// In-place attribute update: non-barrier when the entry is a dir.
kind = classifyDirEvent(newEntry.IsDirectory, true)
return
}
// renaming
// renaming: the namespace is reshaped on both sides, so a directory
// rename is a barrier on both source and destination.
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
kind = classifyDirEvent(oldEntry.IsDirectory, false)
return
}
// classifyDirEvent maps an entry's (isDirectory, isAttributeUpdate) pair to a
// jobKind. Attribute-only updates on directories are the only non-barrier
// case; everything else on a directory (create/delete/rename) is a barrier,
// and everything on a file is kindFile.
func classifyDirEvent(isDirectory, isAttributeUpdate bool) jobKind {
if !isDirectory {
return kindFile
}
if isAttributeUpdate {
return kindNonBarrierDir
}
return kindBarrierDir
}

View File

@@ -27,6 +27,20 @@ func makeResp(dir, name string, isDir bool, tsNs int64, isNew bool) *filer_pb.Su
return resp
}
// makeDirUpdateResp builds an in-place attribute update event for a directory
// (same parent and same name on both sides — matches filer_pb.IsUpdate).
func makeDirUpdateResp(parent, name string, tsNs int64) *filer_pb.SubscribeMetadataResponse {
return &filer_pb.SubscribeMetadataResponse{
Directory: parent,
TsNs: tsNs,
EventNotification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: name, IsDirectory: true},
NewEntry: &filer_pb.Entry{Name: name, IsDirectory: true},
NewParentPath: parent,
},
}
}
func makeRenameResp(oldDir, oldName, newDir, newName string, isDir bool, tsNs int64) *filer_pb.SubscribeMetadataResponse {
return &filer_pb.SubscribeMetadataResponse{
Directory: oldDir,
@@ -77,9 +91,9 @@ func TestFileVsFileConflict(t *testing.T) {
// Add a file job
active := makeResp("/dir1", "file.txt", false, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Same file should conflict
same := makeResp("/dir1", "file.txt", false, 2, true)
@@ -108,9 +122,9 @@ func TestFileUnderActiveDirConflict(t *testing.T) {
// Add a directory job at /dir1
active := makeResp("/", "dir1", true, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// File under /dir1 should conflict
under := makeResp("/dir1", "file.txt", false, 2, true)
@@ -130,11 +144,11 @@ func TestFileUnderActiveDirConflict(t *testing.T) {
t.Error("unexpected conflict for file outside active directory")
}
// File at /dir1 itself (not under, at) should not conflict
// because IsUnder is strict: "/dir1".IsUnder("/dir1") == false
// File at /dir1 itself (not under, at) SHOULD conflict with an active
// barrier dir at /dir1 — same-path promotions must serialize.
atSame := makeResp("/", "dir1", false, 5, true)
if p.conflictsWith(atSame) {
t.Error("unexpected conflict for file at same path as directory (IsUnder is strict)")
if !p.conflictsWith(atSame) {
t.Error("expected conflict for file at same path as active barrier dir")
}
}
@@ -146,9 +160,9 @@ func TestDirWithActiveFileUnder(t *testing.T) {
// Add file jobs under /dir1
f1 := makeResp("/dir1/sub", "file.txt", false, 1, true)
path, newPath, isDir := extractPathsFromMetadata(f1)
p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(f1)
p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Directory /dir1 should conflict (has active file under it)
dirOp := makeResp("/", "dir1", true, 2, true)
@@ -170,9 +184,9 @@ func TestDirVsDirConflict(t *testing.T) {
// Add directory job at /a/b
active := makeResp("/a", "b", true, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// /a/b/c (descendant) should conflict
desc := makeResp("/a/b", "c", true, 2, true)
@@ -186,10 +200,11 @@ func TestDirVsDirConflict(t *testing.T) {
t.Error("expected conflict for ancestor directory")
}
// Same directory should NOT conflict (IsUnder is strict, not equal)
// Same-path barrier dir SHOULD conflict: concurrent create/delete/rename
// on the same directory must serialize.
same := makeResp("/a", "b", true, 4, true)
if p.conflictsWith(same) {
t.Error("unexpected conflict for same directory (IsUnder is strict)")
if !p.conflictsWith(same) {
t.Error("expected conflict for same-path barrier directory")
}
// Sibling directory should not conflict
@@ -206,9 +221,9 @@ func TestRenameConflict(t *testing.T) {
// Add file job at /dir1/file.txt
f1 := makeResp("/dir1", "file.txt", false, 1, true)
path, newPath, isDir := extractPathsFromMetadata(f1)
p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(f1)
p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Rename from /dir2/a.txt to /dir1/file.txt should conflict (newPath matches)
rename := makeRenameResp("/dir2", "a.txt", "/dir1", "file.txt", false, 2)
@@ -236,11 +251,11 @@ func TestActiveRenameConflict(t *testing.T) {
// Add active rename job: /dir1/old.txt -> /dir2/new.txt
rename := makeRenameResp("/dir1", "old.txt", "/dir2", "new.txt", false, 1)
path, newPath, isDir := extractPathsFromMetadata(rename)
p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(rename)
p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
if newPath != "" {
p.addPathToIndex(newPath, isDir)
p.addPathToIndex(newPath, kind)
}
// File at /dir1/old.txt should conflict
@@ -271,9 +286,9 @@ func TestRootDirConflict(t *testing.T) {
// Note: a dir entry at "/" would be created as FullPath("/").Child("somedir")
// But let's test what happens with an active dir at /some/path and check root
active := makeResp("/some", "dir", true, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Root dir should conflict because active dir /some/dir is under /
// A new directory at "/" should see descendantCount["/"] > 0
@@ -289,7 +304,7 @@ func TestIndexCleanup(t *testing.T) {
// Add then remove a file job
path := util.FullPath("/a/b/c/file.txt")
p.addPathToIndex(path, false)
p.addPathToIndex(path, kindFile)
if p.activeFilePaths[path] != 1 {
t.Errorf("expected activeFilePaths count 1, got %d", p.activeFilePaths[path])
@@ -298,7 +313,7 @@ func TestIndexCleanup(t *testing.T) {
t.Errorf("expected descendantCount['/a/b/c'] = 1, got %d", p.descendantCount["/a/b/c"])
}
p.removePathFromIndex(path, false)
p.removePathFromIndex(path, kindFile)
if len(p.activeFilePaths) != 0 {
t.Errorf("expected empty activeFilePaths after removal, got %v", p.activeFilePaths)
@@ -316,8 +331,8 @@ func TestWatermarkWithHeap(t *testing.T) {
// Simulate adding jobs in order
for _, ts := range []int64{10, 20, 30} {
jobPath := util.FullPath("/file" + string(rune('0'+ts/10)))
p.activeJobs[ts] = &syncJobPaths{path: jobPath, isDirectory: false}
p.addPathToIndex(jobPath, false)
p.activeJobs[ts] = &syncJobPaths{path: jobPath, kind: kindFile}
p.addPathToIndex(jobPath, kindFile)
heap.Push(&p.tsHeap, ts)
}
@@ -327,7 +342,7 @@ func TestWatermarkWithHeap(t *testing.T) {
// Remove non-oldest (ts=20) — heap top should stay 10
delete(p.activeJobs, 20)
p.removePathFromIndex("/file2", false)
p.removePathFromIndex("/file2", kindFile)
// Lazy clean: top is 10 which is still active, so no pop
for p.tsHeap.Len() > 0 {
if _, active := p.activeJobs[p.tsHeap[0]]; active {
@@ -341,7 +356,7 @@ func TestWatermarkWithHeap(t *testing.T) {
// Remove oldest (ts=10) — lazy clean should find 30
delete(p.activeJobs, 10)
p.removePathFromIndex("/file1", false)
p.removePathFromIndex("/file1", kindFile)
for p.tsHeap.Len() > 0 {
if _, active := p.activeJobs[p.tsHeap[0]]; active {
break
@@ -353,6 +368,188 @@ func TestWatermarkWithHeap(t *testing.T) {
}
}
// TestNonBarrierDirUpdateDoesNotBlockDescendants verifies the loosened
// dir-conflict rule: an attribute-only directory update (same parent + same
// name) must NOT block file events under that directory. A barrier dir event
// (create/delete/rename) on the same path still must.
func TestNonBarrierDirUpdateDoesNotBlockDescendants(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
t.Run("attribute update on /dir1 does not block file under it", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
// Active non-barrier: attribute update on /dir1.
active := makeDirUpdateResp("/", "dir1", 1)
path, newPath, kind := extractJobInfo(active)
if kind != kindNonBarrierDir {
t.Fatalf("expected kindNonBarrierDir for dir attribute update, got %v", kind)
}
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// File under /dir1 should NOT conflict with the attribute update.
under := makeResp("/dir1", "file.txt", false, 2, true)
if p.conflictsWith(under) {
t.Error("file under a non-barrier dir update should not conflict")
}
// Nested file should also not conflict.
deep := makeResp("/dir1/sub/deep", "file.txt", false, 3, true)
if p.conflictsWith(deep) {
t.Error("deeply nested file under a non-barrier dir update should not conflict")
}
})
t.Run("barrier dir create at the same path still blocks descendants", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
active := makeResp("/", "dir1", true, 1, true) // create
path, newPath, kind := extractJobInfo(active)
if kind != kindBarrierDir {
t.Fatalf("expected kindBarrierDir for dir create, got %v", kind)
}
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
under := makeResp("/dir1", "file.txt", false, 2, true)
if !p.conflictsWith(under) {
t.Error("file under an active barrier dir create should still conflict")
}
})
t.Run("barrier dir delete still waits for in-flight descendants", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
// Active file under /dir1.
f := makeResp("/dir1", "file.txt", false, 1, true)
path, newPath, kind := extractJobInfo(f)
p.activeJobs[f.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Incoming barrier delete on /dir1 should still wait for the
// in-flight file descendant.
del := makeResp("/", "dir1", true, 2, false)
if !p.conflictsWith(del) {
t.Error("barrier dir delete should wait for descendant file job")
}
})
t.Run("non-barrier dir update still keeps ancestor barrier waiting", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
// Active non-barrier dir update at /a/b.
upd := makeDirUpdateResp("/a", "b", 1)
path, newPath, kind := extractJobInfo(upd)
p.activeJobs[upd.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// A barrier delete on /a (the ancestor) should wait for it.
del := makeResp("/", "a", true, 2, false)
if !p.conflictsWith(del) {
t.Error("barrier ancestor dir delete should wait for non-barrier descendant update")
}
})
}
// TestSamePathBarrierSerialization verifies the tightened same-path rules:
// a barrier dir in flight at p serializes every other job at p (file, barrier
// dir, or non-barrier update), and a file in flight at p serializes incoming
// files and barrier dirs at p.
func TestSamePathBarrierSerialization(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
t.Run("barrier dir at p blocks same-path file", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
active := makeResp("/", "dir1", true, 1, true) // dir create
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
file := makeResp("/", "dir1", false, 2, true)
if !p.conflictsWith(file) {
t.Error("file at path of active barrier dir should conflict")
}
})
t.Run("barrier dir at p blocks another same-path barrier dir", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
active := makeResp("/", "dir1", true, 1, true) // dir create
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
del := makeResp("/", "dir1", true, 2, false) // dir delete, same path
if !p.conflictsWith(del) {
t.Error("concurrent create/delete on same dir path should conflict")
}
})
t.Run("barrier dir at p blocks non-barrier update at same path", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
active := makeResp("/", "dir1", true, 1, true) // dir create
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
upd := makeDirUpdateResp("/", "dir1", 2)
if !p.conflictsWith(upd) {
t.Error("attribute update on dir being created should wait for the create")
}
})
t.Run("file at p blocks same-path barrier dir", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
active := makeResp("/", "thing", false, 1, true) // file create at /thing
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Barrier dir at /thing (e.g. a file→dir promotion) must wait.
promoteDir := makeResp("/", "thing", true, 2, true)
if !p.conflictsWith(promoteDir) {
t.Error("barrier dir at path of active file should conflict")
}
})
t.Run("non-barrier update at p blocks incoming barrier dir at same path", func(t *testing.T) {
// Regression test for a bug spotted in review: an in-flight
// attribute update on /dir1 must serialize against a later
// delete/rename/create on /dir1.
p := NewMetadataProcessor(noop, 100, 0)
active := makeDirUpdateResp("/", "dir1", 1)
path, newPath, kind := extractJobInfo(active)
if kind != kindNonBarrierDir {
t.Fatalf("expected kindNonBarrierDir, got %v", kind)
}
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
del := makeResp("/", "dir1", true, 2, false) // dir delete
if !p.conflictsWith(del) {
t.Error("barrier dir at path of active non-barrier update should conflict")
}
// Ensure the removal path also cleans up the non-barrier index.
p.removePathFromIndex(path, kind)
if len(p.activeNonBarrierDirPaths) != 0 {
t.Errorf("activeNonBarrierDirPaths not cleaned up, got %v", p.activeNonBarrierDirPaths)
}
})
t.Run("non-barrier update at p does NOT block same-path non-barrier update", func(t *testing.T) {
p := NewMetadataProcessor(noop, 100, 0)
active := makeDirUpdateResp("/", "dir1", 1)
path, newPath, kind := extractJobInfo(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
// Concurrent attribute bumps are allowed: last writer wins.
upd2 := makeDirUpdateResp("/", "dir1", 2)
if p.conflictsWith(upd2) {
t.Error("concurrent non-barrier dir updates should not conflict")
}
})
}
// benchResult prevents the compiler from optimizing away the conflict check.
var benchResult bool
@@ -369,9 +566,9 @@ func BenchmarkConflictCheck(b *testing.B) {
dir := fmt.Sprintf("/dir%d/sub%d", i/100, i%100)
name := fmt.Sprintf("file%d.txt", i)
resp := makeResp(dir, name, false, int64(i+1), true)
path, newPath, isDir := extractPathsFromMetadata(resp)
p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
path, newPath, kind := extractJobInfo(resp)
p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind}
p.addPathToIndex(path, kind)
}
// Benchmark conflict check for a non-conflicting event