diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index f0f886394..a66604478 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -27,10 +27,32 @@ func (h *tsMinHeap) Pop() any { return x } +// jobKind classifies a sync job for conflict detection. Directory events are +// split into "barrier" (create/delete/rename) and "non-barrier" (in-place +// attribute update) so that attribute-only directory updates — which do not +// reshape the namespace — no longer serialize every file operation in the +// subtree. +type jobKind int + +const ( + // kindFile is a regular file event. + kindFile jobKind = iota + // kindBarrierDir is a directory create, delete, or rename. It acts as a + // subtree barrier: it waits for all active descendants to drain, and it + // blocks every event under it from being admitted until it completes. + kindBarrierDir + // kindNonBarrierDir is a directory attribute update (mtime/xattr/chmod + // with the same parent and name). It does not block descendants and is + // not blocked by ancestor directories, but it still bumps the ancestor + // descendant counters so an incoming barrier dir on an ancestor path + // still waits for it to drain. + kindNonBarrierDir +) + type syncJobPaths struct { - path util.FullPath - newPath util.FullPath // empty for non-renames - isDirectory bool + path util.FullPath + newPath util.FullPath // empty for non-renames + kind jobKind } type MetadataProcessor struct { @@ -44,9 +66,21 @@ type MetadataProcessor struct { // Indexes for O(depth) conflict detection, replacing O(n) linear scan. // activeFilePaths counts active file jobs at each exact path. activeFilePaths map[util.FullPath]int - // activeDirPaths counts active directory jobs at each exact path. - activeDirPaths map[util.FullPath]int - // descendantCount counts active jobs (file or dir) strictly under each directory. + // activeBarrierDirPaths counts active barrier-dir jobs at each exact + // path. Only barrier dirs are tracked here; non-barrier dir updates are + // deliberately invisible to the ancestor check so that they don't + // serialize every file descendant. + activeBarrierDirPaths map[util.FullPath]int + // activeNonBarrierDirPaths counts active non-barrier dir jobs at each + // exact path. This is read *only* by incoming barrier dirs, so a + // delete/rename/create at p correctly waits for an in-flight chmod/ + // xattr/mtime update at the same p. It is deliberately invisible to the + // ancestor check, so non-barrier updates still don't serialize file + // descendants. + activeNonBarrierDirPaths map[util.FullPath]int + // descendantCount counts active jobs (of any kind) strictly under each + // directory. Read by incoming barrier dirs so they wait for their whole + // subtree to drain before running, regardless of descendant kind. descendantCount map[util.FullPath]int // tsHeap is a min-heap of active job timestamps with lazy deletion, @@ -56,12 +90,13 @@ type MetadataProcessor struct { func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ - fn: fn, - activeJobs: make(map[int64]*syncJobPaths), - concurrencyLimit: concurrency, - activeFilePaths: make(map[util.FullPath]int), - activeDirPaths: make(map[util.FullPath]int), - descendantCount: make(map[util.FullPath]int), + fn: fn, + activeJobs: make(map[int64]*syncJobPaths), + concurrencyLimit: concurrency, + activeFilePaths: make(map[util.FullPath]int), + activeBarrierDirPaths: make(map[util.FullPath]int), + activeNonBarrierDirPaths: make(map[util.FullPath]int), + descendantCount: make(map[util.FullPath]int), } t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) @@ -86,11 +121,14 @@ func pathAncestors(p util.FullPath) []util.FullPath { // addPathToIndex registers a path in the conflict detection indexes. // Must be called under activeJobsLock. -func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) { - if isDirectory { - t.activeDirPaths[p]++ - } else { +func (t *MetadataProcessor) addPathToIndex(p util.FullPath, kind jobKind) { + switch kind { + case kindFile: t.activeFilePaths[p]++ + case kindBarrierDir: + t.activeBarrierDirPaths[p]++ + case kindNonBarrierDir: + t.activeNonBarrierDirPaths[p]++ } for _, ancestor := range pathAncestors(p) { t.descendantCount[ancestor]++ @@ -99,19 +137,26 @@ func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) { // removePathFromIndex unregisters a path from the conflict detection indexes. // Must be called under activeJobsLock. -func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) { - if isDirectory { - if t.activeDirPaths[p] <= 1 { - delete(t.activeDirPaths, p) - } else { - t.activeDirPaths[p]-- - } - } else { +func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, kind jobKind) { + switch kind { + case kindFile: if t.activeFilePaths[p] <= 1 { delete(t.activeFilePaths, p) } else { t.activeFilePaths[p]-- } + case kindBarrierDir: + if t.activeBarrierDirPaths[p] <= 1 { + delete(t.activeBarrierDirPaths, p) + } else { + t.activeBarrierDirPaths[p]-- + } + case kindNonBarrierDir: + if t.activeNonBarrierDirPaths[p] <= 1 { + delete(t.activeNonBarrierDirPaths, p) + } else { + t.activeNonBarrierDirPaths[p]-- + } } for _, ancestor := range pathAncestors(p) { if t.descendantCount[ancestor] <= 1 { @@ -123,26 +168,50 @@ func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory boo } // pathConflicts checks if a single path conflicts with any active job. -// Conflict rules match pairShouldWaitFor: -// - file vs file: exact same path -// - file vs dir: file.IsUnder(dir) -// - dir vs file: file.IsUnder(dir) -// - dir vs dir: either IsUnder the other -func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool { - if isDirectory { - // Any active job (file or dir) strictly under this directory? - if t.descendantCount[p] > 0 { - return true - } - } else { - // Exact same file already active? - if t.activeFilePaths[p] > 0 { - return true - } +// Conflict rules: +// - any kind vs same-path barrier dir: wait (a create/delete/rename on p +// must fully serialize against any other operation touching p, including +// non-barrier attribute updates and files at the same path) +// - incoming barrier dir vs same-path non-barrier dir update: wait (a +// delete/rename/create on p must wait for an in-flight chmod/xattr/mtime +// update at the same p to drain) +// - file vs same-path file: wait +// - file vs same-path barrier dir: wait (covered by the barrier-at-p check +// above; also serializes a file-to-dir / dir-to-file promotion) +// - barrier dir vs same-path file: wait +// - barrier dir vs any descendant (file or dir, barrier or not): wait +// - barrier ancestor: always wait, regardless of incoming kind +// - non-barrier dir vs descendants: never conflicts +// - non-barrier dir vs same-path non-barrier dir: never conflicts (attribute +// bumps are "last writer wins"; this intentionally lets rapid mtime / +// xattr updates overlap) +func (t *MetadataProcessor) pathConflicts(p util.FullPath, kind jobKind) bool { + // A barrier dir in flight at p serializes every new job at p. This is the + // strictest same-path rule and applies regardless of incoming kind. + if t.activeBarrierDirPaths[p] > 0 { + return true } - // Any active directory that is a proper ancestor of p? + // An incoming barrier dir must also wait for any in-flight non-barrier + // dir update at the same path. Without this check, a delete or rename on + // a directory could overlap with an attribute bump in progress for the + // same directory. + if kind == kindBarrierDir && t.activeNonBarrierDirPaths[p] > 0 { + return true + } + // A file in flight at p blocks new file or barrier-dir jobs at p. A + // non-barrier dir update at p is allowed through — by construction files + // and dirs at the same path only coexist across a promotion, which is a + // barrier event handled by the check above. + if t.activeFilePaths[p] > 0 && (kind == kindFile || kind == kindBarrierDir) { + return true + } + // Barrier dirs additionally wait for their whole in-flight subtree. + if kind == kindBarrierDir && t.descendantCount[p] > 0 { + return true + } + // Any barrier dir on a proper ancestor blocks everything under it. for _, ancestor := range pathAncestors(p) { - if t.activeDirPaths[ancestor] > 0 { + if t.activeBarrierDirPaths[ancestor] > 0 { return true } } @@ -150,11 +219,11 @@ func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) boo } func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { - p, newPath, isDirectory := extractPathsFromMetadata(resp) - if t.pathConflicts(p, isDirectory) { + p, newPath, kind := extractJobInfo(resp) + if t.pathConflicts(p, kind) { return true } - if newPath != "" && t.pathConflicts(newPath, isDirectory) { + if newPath != "" && t.pathConflicts(newPath, kind) { return true } return false @@ -172,13 +241,13 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) t.activeJobsCond.Wait() } - p, newPath, isDirectory := extractPathsFromMetadata(resp) - jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory} + p, newPath, kind := extractJobInfo(resp) + jobPaths := &syncJobPaths{path: p, newPath: newPath, kind: kind} t.activeJobs[resp.TsNs] = jobPaths - t.addPathToIndex(p, isDirectory) + t.addPathToIndex(p, kind) if newPath != "" { - t.addPathToIndex(newPath, isDirectory) + t.addPathToIndex(newPath, kind) } heap.Push(&t.tsHeap, resp.TsNs) @@ -195,9 +264,9 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) defer t.activeJobsLock.Unlock() delete(t.activeJobs, resp.TsNs) - t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory) + t.removePathFromIndex(jobPaths.path, jobPaths.kind) if jobPaths.newPath != "" { - t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory) + t.removePathFromIndex(jobPaths.newPath, jobPaths.kind) } // Lazy-clean stale entries from heap top (already-completed jobs). @@ -216,28 +285,47 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) }() } -func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) { +// extractJobInfo derives the conflict-detection path(s) and job kind for a +// metadata event. A rename returns both the source and destination paths; all +// other event shapes return only the primary path. +func extractJobInfo(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, kind jobKind) { oldEntry := resp.EventNotification.OldEntry newEntry := resp.EventNotification.NewEntry // create if filer_pb.IsCreate(resp) { p = util.FullPath(resp.Directory).Child(newEntry.Name) - isDirectory = newEntry.IsDirectory + kind = classifyDirEvent(newEntry.IsDirectory, false) return } if filer_pb.IsDelete(resp) { p = util.FullPath(resp.Directory).Child(oldEntry.Name) - isDirectory = oldEntry.IsDirectory + kind = classifyDirEvent(oldEntry.IsDirectory, false) return } if filer_pb.IsUpdate(resp) { p = util.FullPath(resp.Directory).Child(newEntry.Name) - isDirectory = newEntry.IsDirectory + // In-place attribute update: non-barrier when the entry is a dir. + kind = classifyDirEvent(newEntry.IsDirectory, true) return } - // renaming + // renaming: the namespace is reshaped on both sides, so a directory + // rename is a barrier on both source and destination. p = util.FullPath(resp.Directory).Child(oldEntry.Name) - isDirectory = oldEntry.IsDirectory newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name) + kind = classifyDirEvent(oldEntry.IsDirectory, false) return } + +// classifyDirEvent maps an entry's (isDirectory, isAttributeUpdate) pair to a +// jobKind. Attribute-only updates on directories are the only non-barrier +// case; everything else on a directory (create/delete/rename) is a barrier, +// and everything on a file is kindFile. +func classifyDirEvent(isDirectory, isAttributeUpdate bool) jobKind { + if !isDirectory { + return kindFile + } + if isAttributeUpdate { + return kindNonBarrierDir + } + return kindBarrierDir +} diff --git a/weed/command/filer_sync_jobs_test.go b/weed/command/filer_sync_jobs_test.go index c68b8e063..7b532fc86 100644 --- a/weed/command/filer_sync_jobs_test.go +++ b/weed/command/filer_sync_jobs_test.go @@ -27,6 +27,20 @@ func makeResp(dir, name string, isDir bool, tsNs int64, isNew bool) *filer_pb.Su return resp } +// makeDirUpdateResp builds an in-place attribute update event for a directory +// (same parent and same name on both sides — matches filer_pb.IsUpdate). +func makeDirUpdateResp(parent, name string, tsNs int64) *filer_pb.SubscribeMetadataResponse { + return &filer_pb.SubscribeMetadataResponse{ + Directory: parent, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: name, IsDirectory: true}, + NewEntry: &filer_pb.Entry{Name: name, IsDirectory: true}, + NewParentPath: parent, + }, + } +} + func makeRenameResp(oldDir, oldName, newDir, newName string, isDir bool, tsNs int64) *filer_pb.SubscribeMetadataResponse { return &filer_pb.SubscribeMetadataResponse{ Directory: oldDir, @@ -77,9 +91,9 @@ func TestFileVsFileConflict(t *testing.T) { // Add a file job active := makeResp("/dir1", "file.txt", false, 1, true) - path, newPath, isDir := extractPathsFromMetadata(active) - p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) // Same file should conflict same := makeResp("/dir1", "file.txt", false, 2, true) @@ -108,9 +122,9 @@ func TestFileUnderActiveDirConflict(t *testing.T) { // Add a directory job at /dir1 active := makeResp("/", "dir1", true, 1, true) - path, newPath, isDir := extractPathsFromMetadata(active) - p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) // File under /dir1 should conflict under := makeResp("/dir1", "file.txt", false, 2, true) @@ -130,11 +144,11 @@ func TestFileUnderActiveDirConflict(t *testing.T) { t.Error("unexpected conflict for file outside active directory") } - // File at /dir1 itself (not under, at) should not conflict - // because IsUnder is strict: "/dir1".IsUnder("/dir1") == false + // File at /dir1 itself (not under, at) SHOULD conflict with an active + // barrier dir at /dir1 — same-path promotions must serialize. atSame := makeResp("/", "dir1", false, 5, true) - if p.conflictsWith(atSame) { - t.Error("unexpected conflict for file at same path as directory (IsUnder is strict)") + if !p.conflictsWith(atSame) { + t.Error("expected conflict for file at same path as active barrier dir") } } @@ -146,9 +160,9 @@ func TestDirWithActiveFileUnder(t *testing.T) { // Add file jobs under /dir1 f1 := makeResp("/dir1/sub", "file.txt", false, 1, true) - path, newPath, isDir := extractPathsFromMetadata(f1) - p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(f1) + p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) // Directory /dir1 should conflict (has active file under it) dirOp := makeResp("/", "dir1", true, 2, true) @@ -170,9 +184,9 @@ func TestDirVsDirConflict(t *testing.T) { // Add directory job at /a/b active := makeResp("/a", "b", true, 1, true) - path, newPath, isDir := extractPathsFromMetadata(active) - p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) // /a/b/c (descendant) should conflict desc := makeResp("/a/b", "c", true, 2, true) @@ -186,10 +200,11 @@ func TestDirVsDirConflict(t *testing.T) { t.Error("expected conflict for ancestor directory") } - // Same directory should NOT conflict (IsUnder is strict, not equal) + // Same-path barrier dir SHOULD conflict: concurrent create/delete/rename + // on the same directory must serialize. same := makeResp("/a", "b", true, 4, true) - if p.conflictsWith(same) { - t.Error("unexpected conflict for same directory (IsUnder is strict)") + if !p.conflictsWith(same) { + t.Error("expected conflict for same-path barrier directory") } // Sibling directory should not conflict @@ -206,9 +221,9 @@ func TestRenameConflict(t *testing.T) { // Add file job at /dir1/file.txt f1 := makeResp("/dir1", "file.txt", false, 1, true) - path, newPath, isDir := extractPathsFromMetadata(f1) - p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(f1) + p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) // Rename from /dir2/a.txt to /dir1/file.txt should conflict (newPath matches) rename := makeRenameResp("/dir2", "a.txt", "/dir1", "file.txt", false, 2) @@ -236,11 +251,11 @@ func TestActiveRenameConflict(t *testing.T) { // Add active rename job: /dir1/old.txt -> /dir2/new.txt rename := makeRenameResp("/dir1", "old.txt", "/dir2", "new.txt", false, 1) - path, newPath, isDir := extractPathsFromMetadata(rename) - p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(rename) + p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) if newPath != "" { - p.addPathToIndex(newPath, isDir) + p.addPathToIndex(newPath, kind) } // File at /dir1/old.txt should conflict @@ -271,9 +286,9 @@ func TestRootDirConflict(t *testing.T) { // Note: a dir entry at "/" would be created as FullPath("/").Child("somedir") // But let's test what happens with an active dir at /some/path and check root active := makeResp("/some", "dir", true, 1, true) - path, newPath, isDir := extractPathsFromMetadata(active) - p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) // Root dir should conflict because active dir /some/dir is under / // A new directory at "/" should see descendantCount["/"] > 0 @@ -289,7 +304,7 @@ func TestIndexCleanup(t *testing.T) { // Add then remove a file job path := util.FullPath("/a/b/c/file.txt") - p.addPathToIndex(path, false) + p.addPathToIndex(path, kindFile) if p.activeFilePaths[path] != 1 { t.Errorf("expected activeFilePaths count 1, got %d", p.activeFilePaths[path]) @@ -298,7 +313,7 @@ func TestIndexCleanup(t *testing.T) { t.Errorf("expected descendantCount['/a/b/c'] = 1, got %d", p.descendantCount["/a/b/c"]) } - p.removePathFromIndex(path, false) + p.removePathFromIndex(path, kindFile) if len(p.activeFilePaths) != 0 { t.Errorf("expected empty activeFilePaths after removal, got %v", p.activeFilePaths) @@ -316,8 +331,8 @@ func TestWatermarkWithHeap(t *testing.T) { // Simulate adding jobs in order for _, ts := range []int64{10, 20, 30} { jobPath := util.FullPath("/file" + string(rune('0'+ts/10))) - p.activeJobs[ts] = &syncJobPaths{path: jobPath, isDirectory: false} - p.addPathToIndex(jobPath, false) + p.activeJobs[ts] = &syncJobPaths{path: jobPath, kind: kindFile} + p.addPathToIndex(jobPath, kindFile) heap.Push(&p.tsHeap, ts) } @@ -327,7 +342,7 @@ func TestWatermarkWithHeap(t *testing.T) { // Remove non-oldest (ts=20) — heap top should stay 10 delete(p.activeJobs, 20) - p.removePathFromIndex("/file2", false) + p.removePathFromIndex("/file2", kindFile) // Lazy clean: top is 10 which is still active, so no pop for p.tsHeap.Len() > 0 { if _, active := p.activeJobs[p.tsHeap[0]]; active { @@ -341,7 +356,7 @@ func TestWatermarkWithHeap(t *testing.T) { // Remove oldest (ts=10) — lazy clean should find 30 delete(p.activeJobs, 10) - p.removePathFromIndex("/file1", false) + p.removePathFromIndex("/file1", kindFile) for p.tsHeap.Len() > 0 { if _, active := p.activeJobs[p.tsHeap[0]]; active { break @@ -353,6 +368,188 @@ func TestWatermarkWithHeap(t *testing.T) { } } +// TestNonBarrierDirUpdateDoesNotBlockDescendants verifies the loosened +// dir-conflict rule: an attribute-only directory update (same parent + same +// name) must NOT block file events under that directory. A barrier dir event +// (create/delete/rename) on the same path still must. +func TestNonBarrierDirUpdateDoesNotBlockDescendants(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + + t.Run("attribute update on /dir1 does not block file under it", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + + // Active non-barrier: attribute update on /dir1. + active := makeDirUpdateResp("/", "dir1", 1) + path, newPath, kind := extractJobInfo(active) + if kind != kindNonBarrierDir { + t.Fatalf("expected kindNonBarrierDir for dir attribute update, got %v", kind) + } + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + // File under /dir1 should NOT conflict with the attribute update. + under := makeResp("/dir1", "file.txt", false, 2, true) + if p.conflictsWith(under) { + t.Error("file under a non-barrier dir update should not conflict") + } + + // Nested file should also not conflict. + deep := makeResp("/dir1/sub/deep", "file.txt", false, 3, true) + if p.conflictsWith(deep) { + t.Error("deeply nested file under a non-barrier dir update should not conflict") + } + }) + + t.Run("barrier dir create at the same path still blocks descendants", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + + active := makeResp("/", "dir1", true, 1, true) // create + path, newPath, kind := extractJobInfo(active) + if kind != kindBarrierDir { + t.Fatalf("expected kindBarrierDir for dir create, got %v", kind) + } + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + under := makeResp("/dir1", "file.txt", false, 2, true) + if !p.conflictsWith(under) { + t.Error("file under an active barrier dir create should still conflict") + } + }) + + t.Run("barrier dir delete still waits for in-flight descendants", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + + // Active file under /dir1. + f := makeResp("/dir1", "file.txt", false, 1, true) + path, newPath, kind := extractJobInfo(f) + p.activeJobs[f.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + // Incoming barrier delete on /dir1 should still wait for the + // in-flight file descendant. + del := makeResp("/", "dir1", true, 2, false) + if !p.conflictsWith(del) { + t.Error("barrier dir delete should wait for descendant file job") + } + }) + + t.Run("non-barrier dir update still keeps ancestor barrier waiting", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + + // Active non-barrier dir update at /a/b. + upd := makeDirUpdateResp("/a", "b", 1) + path, newPath, kind := extractJobInfo(upd) + p.activeJobs[upd.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + // A barrier delete on /a (the ancestor) should wait for it. + del := makeResp("/", "a", true, 2, false) + if !p.conflictsWith(del) { + t.Error("barrier ancestor dir delete should wait for non-barrier descendant update") + } + }) +} + +// TestSamePathBarrierSerialization verifies the tightened same-path rules: +// a barrier dir in flight at p serializes every other job at p (file, barrier +// dir, or non-barrier update), and a file in flight at p serializes incoming +// files and barrier dirs at p. +func TestSamePathBarrierSerialization(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + + t.Run("barrier dir at p blocks same-path file", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + active := makeResp("/", "dir1", true, 1, true) // dir create + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + file := makeResp("/", "dir1", false, 2, true) + if !p.conflictsWith(file) { + t.Error("file at path of active barrier dir should conflict") + } + }) + + t.Run("barrier dir at p blocks another same-path barrier dir", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + active := makeResp("/", "dir1", true, 1, true) // dir create + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + del := makeResp("/", "dir1", true, 2, false) // dir delete, same path + if !p.conflictsWith(del) { + t.Error("concurrent create/delete on same dir path should conflict") + } + }) + + t.Run("barrier dir at p blocks non-barrier update at same path", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + active := makeResp("/", "dir1", true, 1, true) // dir create + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + upd := makeDirUpdateResp("/", "dir1", 2) + if !p.conflictsWith(upd) { + t.Error("attribute update on dir being created should wait for the create") + } + }) + + t.Run("file at p blocks same-path barrier dir", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + active := makeResp("/", "thing", false, 1, true) // file create at /thing + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + // Barrier dir at /thing (e.g. a file→dir promotion) must wait. + promoteDir := makeResp("/", "thing", true, 2, true) + if !p.conflictsWith(promoteDir) { + t.Error("barrier dir at path of active file should conflict") + } + }) + + t.Run("non-barrier update at p blocks incoming barrier dir at same path", func(t *testing.T) { + // Regression test for a bug spotted in review: an in-flight + // attribute update on /dir1 must serialize against a later + // delete/rename/create on /dir1. + p := NewMetadataProcessor(noop, 100, 0) + active := makeDirUpdateResp("/", "dir1", 1) + path, newPath, kind := extractJobInfo(active) + if kind != kindNonBarrierDir { + t.Fatalf("expected kindNonBarrierDir, got %v", kind) + } + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + del := makeResp("/", "dir1", true, 2, false) // dir delete + if !p.conflictsWith(del) { + t.Error("barrier dir at path of active non-barrier update should conflict") + } + // Ensure the removal path also cleans up the non-barrier index. + p.removePathFromIndex(path, kind) + if len(p.activeNonBarrierDirPaths) != 0 { + t.Errorf("activeNonBarrierDirPaths not cleaned up, got %v", p.activeNonBarrierDirPaths) + } + }) + + t.Run("non-barrier update at p does NOT block same-path non-barrier update", func(t *testing.T) { + p := NewMetadataProcessor(noop, 100, 0) + active := makeDirUpdateResp("/", "dir1", 1) + path, newPath, kind := extractJobInfo(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) + + // Concurrent attribute bumps are allowed: last writer wins. + upd2 := makeDirUpdateResp("/", "dir1", 2) + if p.conflictsWith(upd2) { + t.Error("concurrent non-barrier dir updates should not conflict") + } + }) +} + // benchResult prevents the compiler from optimizing away the conflict check. var benchResult bool @@ -369,9 +566,9 @@ func BenchmarkConflictCheck(b *testing.B) { dir := fmt.Sprintf("/dir%d/sub%d", i/100, i%100) name := fmt.Sprintf("file%d.txt", i) resp := makeResp(dir, name, false, int64(i+1), true) - path, newPath, isDir := extractPathsFromMetadata(resp) - p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} - p.addPathToIndex(path, isDir) + path, newPath, kind := extractJobInfo(resp) + p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, kind: kind} + p.addPathToIndex(path, kind) } // Benchmark conflict check for a non-conflicting event