Files
seaweedfs/weed/command/filer_sync_verify_test.go
Jaehoon Kim be451d22b5 feat(filer.sync): add -verifySync mode to filer.sync for cross-cluster file comparison (#9284)
* Add -verifySync flag to filer.sync for cross-cluster file comparison

Add a verification mode to filer.sync that compares entries between two
filers without performing actual synchronization. Uses directory-level
sorted merge of ListEntries to detect missing files, size mismatches,
and ETag mismatches. Supports -isActivePassive for unidirectional check
and -modifyTimeAgo to skip recently modified files during sync lag.

* Add mtime annotation and JSON output to filer.sync -verifySync

Add automatic mtime relation analysis for SIZE_MISMATCH and
ETAG_MISMATCH diffs, and an NDJSON output mode for external tooling.

mtime classification:
- B_NEWER => "late_updates_skip_likely" hint. Surfaces the case
  where target has a stub entry whose mtime is ahead of source's
  real file, causing UpdateEntry's mtime guard in filersink to
  permanently skip the update.
- A_NEWER => "sync_lag_or_event_miss" hint.
- EQUAL   => no hint (chunk-level issue suspected).

Text output example:
  [SIZE_MISMATCH] /path (a=996, b=0, B newer +274d [late-updates skip likely])

Add -verifyJsonOutput flag. When set, emits one JSON object per
line (NDJSON) for diffs and a final SUMMARY object, suitable for
piping into external diagnostic pipelines.

Concurrent writes from the directory worker pool are now serialized
via outputMu to keep both text lines and JSON records atomic.

* fix(filer.sync): use shared global semaphore in verifySync to bound goroutine explosion

Replace the per-call local semaphore in compareDirectory with a single
shared semaphore created in runVerifySync. The old per-level semaphore
applied a limit of verifySyncConcurrency only within each directory level,
allowing effective concurrency to grow as verifySyncConcurrency^depth on
deep trees.

The shared semaphore is held only for each directory's I/O phase
(listEntries + merge) and released before recursing into subdirectories,
so a parent never blocks waiting for children to acquire slots — which
would deadlock once tree depth exceeds the semaphore capacity.

Extract the capacity into a named constant (verifySyncConcurrency = 5)
with a comment explaining the memory vs. performance trade-off.

Add unit tests:
- correctness: missing file, only-in-B, size mismatch, active-passive mode
- concurrency bound: peak concurrent listings ≤ verifySyncConcurrency
- no-deadlock: binary tree of depth 10 completes within timeout

* fix(filer.sync): stream directory entries to prevent OOM on large directories

Replace the listEntries helper (which accumulated all entries into a
single []filer_pb.Entry slice) with an entryStream type that pages
through the directory in the background and forwards entries one at a
time through a buffered channel. Memory per directory comparison is now
O(channel buffer size = 64) regardless of how many entries the directory
contains.

Key design points:
- entryStream wraps a goroutine + buffered channel with a one-entry
  lookahead (peek/advance) so the two-pointer sorted merge in
  compareDirectory can work without buffering any full listing.
- A child context (mergeCtx) is passed to both stream goroutines so
  they are cancelled promptly if compareDirectory returns early (e.g.
  on error); the ctx.Done() select arm in the callback prevents
  goroutine leaks when the consumer stops reading.
- stream.err is written by the goroutine before close(ch), so it is
  safe to read after the channel is exhausted (Go memory model:
  channel close happens-before the zero-value receive).
- countMissingRecursive is rewritten to use ReadDirAllEntries with a
  direct callback, eliminating its own slice allocation.
- listEntries is removed; it is no longer called anywhere.

* fix(filer.sync): address verifySync review findings

Four real bugs found and fixed; one finding already resolved (shared
semaphore was introduced in a prior commit).

path.Join for child paths (filer_sync_verify.go)
  fmt.Sprintf("%s/%s", dir, name) produced "//name" when dir was "/".
  Replace all child-path concatenations with path.Join so root-level
  walks emit clean paths.

cutoffTime check for ONLY_IN_B entries (filer_sync_verify.go)
  The B-only branch ignored -modifyTimeAgo, so files recently written
  to B were reported as ONLY_IN_B instead of being skipped. Mirror the
  A-side mtime guard: skip and increment skippedRecent when the entry
  is newer than cutoffTime.

Summary emitted before error check (filer_sync_verify.go)
  A filer I/O error mid-walk still caused a SUMMARY record (or text
  summary) to be printed, making partial runs appear complete. Move the
  error check to before summary emission; on error, return immediately
  without printing any summary.

Return false on verification failure (filer_sync.go)
  runVerifySync returned true (exit 0) even when diffs were found or the
  walk failed. Return false so the main binary sets exit status 1,
  consistent with how all other commands signal failure.

* test(filer.sync): add missing verifySync test coverage

Four new tests covering gaps identified during review:

TestVerifySyncETagMismatch
  Verifies that two files with identical size but different Md5 checksums
  are counted as etagMismatch (not sizeMismatch). Exercises the second
  branch of compareEntries that was previously untested.

TestVerifySyncCutoffTime (4 subtests)
  A-only recent  — recent file skipped (skippedRecent++), not MISSING
  A-only old     — old file reported as MISSING
  B-only recent  — recent file skipped (skippedRecent++), not ONLY_IN_B
  B-only old     — old file reported as ONLY_IN_B
  The B-only subtests specifically cover the cutoffTime fix added in the
  previous commit.

TestVerifySyncRootPath
  Regression for the path.Join fix: walks from "/" and verifies that the
  child directory is reached and compared correctly (the old Sprintf
  produced "//data" which would silently produce wrong results).
  Asserts dirCount=2 and fileCount=1 to confirm the full tree is walked.

* fix(filer.sync): use os.Exit(2) instead of return false on verify failure

return false triggered weed.go's error handler which printed the full
command usage — appropriate for invalid arguments, not for a completed
verification that found differences. Use os.Exit(2) consistent with
the existing pattern in filer_sync.go (lines 251, 293).

* refactor(filer.sync.verify): split verify into its own command

The verify mode is a one-shot batch operation with a fundamentally
different lifecycle from the long-running sync subscriber, and most of
filer.sync's flags (replication, metrics port, debug pprof, concurrency,
etc.) do not apply to it. Extract it into a sibling command alongside
filer.copy/filer.backup/filer.export rather than a flag mode on
filer.sync.

Also rename modifyTimeAgo to modifiedTimeAgo (grammatical) and drop the
verifyJsonOutput prefix to plain jsonOutput now that the verify context
is implicit in the command name.

* fix(filer.sync.verify): address review comments

- Bounded worker pool: cap subdirectory goroutines per level via a
  jobs channel and min(verifySyncConcurrency, len(subDirs)) workers
  instead of spawning one goroutine per child. Wide directories no
  longer park ~2KB per queued goroutine.

- Don't gate recursion on a directory's mtime: a fresh child write
  bumps the parent mtime, but older files inside should still be
  reported as missing. Always recurse for missing-in-B directories
  and apply the cutoff per-file inside countMissingRecursive.

- Apply -modifiedTimeAgo symmetrically: matched-name files now skip
  the comparison when EITHER side is recently modified, not just A.
  This restores lag tolerance when B was just rewritten.

Adds tests for both new behaviors and a shared isTooRecent helper.

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
2026-04-29 12:33:53 -07:00

530 lines
18 KiB
Go

package command
import (
"context"
"fmt"
"io"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// --- stream / inner-client / outer-client mocks ---
type verifyTestStream struct {
entries []*filer_pb.Entry
idx int
}
func (s *verifyTestStream) Recv() (*filer_pb.ListEntriesResponse, error) {
if s.idx >= len(s.entries) {
return nil, io.EOF
}
resp := &filer_pb.ListEntriesResponse{Entry: s.entries[s.idx]}
s.idx++
return resp, nil
}
func (s *verifyTestStream) Header() (metadata.MD, error) { return metadata.MD{}, nil }
func (s *verifyTestStream) Trailer() metadata.MD { return metadata.MD{} }
func (s *verifyTestStream) CloseSend() error { return nil }
func (s *verifyTestStream) Context() context.Context { return context.Background() }
func (s *verifyTestStream) SendMsg(_ any) error { return nil }
func (s *verifyTestStream) RecvMsg(_ any) error { return nil }
// verifyTestInnerClient is the SeaweedFilerClient passed to fn inside WithFilerClient.
type verifyTestInnerClient struct {
filer_pb.SeaweedFilerClient // embed for unimplemented RPCs
entriesByDir map[string][]*filer_pb.Entry
}
func (c *verifyTestInnerClient) ListEntries(_ context.Context, in *filer_pb.ListEntriesRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) {
return &verifyTestStream{entries: c.entriesByDir[in.Directory]}, nil
}
// verifyTestFilerClient implements filer_pb.FilerClient and tracks concurrent
// WithFilerClient invocations to let tests verify the global concurrency bound.
type verifyTestFilerClient struct {
entriesByDir map[string][]*filer_pb.Entry
inFlight int64 // accessed via atomic
peakFlight int64 // accessed via atomic
delay time.Duration
}
func (c *verifyTestFilerClient) WithFilerClient(_ bool, fn func(filer_pb.SeaweedFilerClient) error) error {
// track peak concurrent in-flight listings
n := atomic.AddInt64(&c.inFlight, 1)
defer atomic.AddInt64(&c.inFlight, -1)
for {
peak := atomic.LoadInt64(&c.peakFlight)
if n <= peak || atomic.CompareAndSwapInt64(&c.peakFlight, peak, n) {
break
}
}
if c.delay > 0 {
time.Sleep(c.delay)
}
return fn(&verifyTestInnerClient{entriesByDir: c.entriesByDir})
}
func (c *verifyTestFilerClient) AdjustedUrl(_ *filer_pb.Location) string { return "" }
func (c *verifyTestFilerClient) GetDataCenter() string { return "" }
// --- entry helpers ---
func verifyFileEntry(name string, size uint64) *filer_pb.Entry {
return &filer_pb.Entry{
Name: name,
Attributes: &filer_pb.FuseAttributes{FileSize: size},
}
}
func verifyDirEntry(name string) *filer_pb.Entry {
return &filer_pb.Entry{Name: name, IsDirectory: true}
}
// --- tests ---
// TestVerifySyncMissingFile confirms that a file present in A but absent in B
// is counted as missing.
func TestVerifySyncMissingFile(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {verifyFileEntry("file.txt", 100)},
},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {},
},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
err := compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", false, time.Time{}, sem, result)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.missingCount.Load(); got != 1 {
t.Errorf("missingCount = %d, want 1", got)
}
if got := result.sizeMismatch.Load(); got != 0 {
t.Errorf("sizeMismatch = %d, want 0", got)
}
}
// TestVerifySyncOnlyInB confirms that a file present only in B is counted
// (non-active-passive mode) or ignored (active-passive mode).
func TestVerifySyncOnlyInB(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/root": {}},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {verifyFileEntry("extra.txt", 50)},
},
}
t.Run("bidirectional", func(t *testing.T) {
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", false, time.Time{}, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.onlyInB.Load(); got != 1 {
t.Errorf("onlyInB = %d, want 1", got)
}
})
t.Run("active-passive ignores onlyInB", func(t *testing.T) {
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", true, time.Time{}, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.onlyInB.Load(); got != 0 {
t.Errorf("onlyInB = %d, want 0 in active-passive mode", got)
}
})
}
// TestVerifySyncSizeMismatch confirms that a file with differing sizes is
// counted as a size mismatch and not as missing.
func TestVerifySyncSizeMismatch(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {verifyFileEntry("data.bin", 1024)},
},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {verifyFileEntry("data.bin", 512)},
},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
err := compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", false, time.Time{}, sem, result)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.sizeMismatch.Load(); got != 1 {
t.Errorf("sizeMismatch = %d, want 1", got)
}
if got := result.missingCount.Load(); got != 0 {
t.Errorf("missingCount = %d, want 0", got)
}
}
// TestVerifySyncConcurrencyBound verifies that the shared semaphore keeps peak
// concurrent filer listings at or below verifySyncConcurrency at all times.
// A 5ms delay per WithFilerClient call makes the concurrency overlap observable.
func TestVerifySyncConcurrencyBound(t *testing.T) {
// Wide, shallow tree: root with 20 identical subdirectories.
const fanout = 20
entriesA := make(map[string][]*filer_pb.Entry)
entriesB := make(map[string][]*filer_pb.Entry)
rootDirs := make([]*filer_pb.Entry, fanout)
for i := range fanout {
name := fmt.Sprintf("sub%02d", i)
rootDirs[i] = verifyDirEntry(name)
entriesA["/root/"+name] = []*filer_pb.Entry{verifyFileEntry("f.txt", 10)}
entriesB["/root/"+name] = []*filer_pb.Entry{verifyFileEntry("f.txt", 10)}
}
entriesA["/root"] = rootDirs
entriesB["/root"] = rootDirs
clientA := &verifyTestFilerClient{entriesByDir: entriesA, delay: 5 * time.Millisecond}
clientB := &verifyTestFilerClient{entriesByDir: entriesB, delay: 5 * time.Millisecond}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", false, time.Time{}, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.missingCount.Load() != 0 || result.sizeMismatch.Load() != 0 {
t.Errorf("unexpected diffs in identical tree")
}
if peak := atomic.LoadInt64(&clientA.peakFlight); peak > verifySyncConcurrency {
t.Errorf("clientA peak concurrent listings = %d, want ≤ %d (verifySyncConcurrency)",
peak, verifySyncConcurrency)
}
if peak := atomic.LoadInt64(&clientB.peakFlight); peak > verifySyncConcurrency {
t.Errorf("clientB peak concurrent listings = %d, want ≤ %d (verifySyncConcurrency)",
peak, verifySyncConcurrency)
}
}
// TestVerifySyncETagMismatch confirms that two files with the same size but
// different Md5 checksums are counted as an ETag mismatch, not a size mismatch.
func TestVerifySyncETagMismatch(t *testing.T) {
newEntry := func(name string, md5 []byte) *filer_pb.Entry {
return &filer_pb.Entry{
Name: name,
Attributes: &filer_pb.FuseAttributes{
FileSize: 100,
Md5: md5,
},
}
}
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {newEntry("data.bin", []byte{0x11, 0x22, 0x33})},
},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/root": {newEntry("data.bin", []byte{0x44, 0x55, 0x66})},
},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", false, time.Time{}, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.etagMismatch.Load(); got != 1 {
t.Errorf("etagMismatch = %d, want 1", got)
}
if got := result.sizeMismatch.Load(); got != 0 {
t.Errorf("sizeMismatch = %d, want 0 (same size should not trigger size mismatch)", got)
}
}
// TestVerifySyncCutoffTime verifies that entries newer than cutoffTime are
// skipped in both the A-only (MISSING) and B-only (ONLY_IN_B) branches.
func TestVerifySyncCutoffTime(t *testing.T) {
cutoff := time.Unix(1000, 0)
recentEntry := func(name string) *filer_pb.Entry {
return &filer_pb.Entry{
Name: name,
Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 2000}, // > cutoff
}
}
oldEntry := func(name string) *filer_pb.Entry {
return &filer_pb.Entry{
Name: name,
Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 500}, // < cutoff
}
}
t.Run("A-only recent file is skipped, not reported missing", func(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {recentEntry("new.txt")}},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, cutoff, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.skippedRecent.Load(); got != 1 {
t.Errorf("skippedRecent = %d, want 1", got)
}
if got := result.missingCount.Load(); got != 0 {
t.Errorf("missingCount = %d, want 0 (recent file should be skipped)", got)
}
})
t.Run("A-only old file is reported missing", func(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {oldEntry("old.txt")}},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, cutoff, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.missingCount.Load(); got != 1 {
t.Errorf("missingCount = %d, want 1", got)
}
})
t.Run("B-only recent file is skipped, not reported as ONLY_IN_B", func(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {recentEntry("new.txt")}},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, cutoff, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.skippedRecent.Load(); got != 1 {
t.Errorf("skippedRecent = %d, want 1", got)
}
if got := result.onlyInB.Load(); got != 0 {
t.Errorf("onlyInB = %d, want 0 (recent B-only file should be skipped)", got)
}
})
t.Run("B-only old file is reported as ONLY_IN_B", func(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {oldEntry("old.txt")}},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, cutoff, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.onlyInB.Load(); got != 1 {
t.Errorf("onlyInB = %d, want 1", got)
}
})
}
// TestVerifySyncCutoffMatchedFileBSideRecent verifies that when matched-name
// files differ but only the B side is recently modified, the comparison is
// skipped (sync-lag tolerance) rather than reporting a spurious mismatch.
func TestVerifySyncCutoffMatchedFileBSideRecent(t *testing.T) {
cutoff := time.Unix(1000, 0)
entry := func(size uint64, mtime int64) *filer_pb.Entry {
return &filer_pb.Entry{
Name: "data.bin",
Attributes: &filer_pb.FuseAttributes{FileSize: size, Mtime: mtime},
}
}
// A is old (size 100), B is recently rewritten with a different size.
// Without the B-side cutoff check this would surface as SIZE_MISMATCH.
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {entry(100, 500)}},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{"/": {entry(200, 2000)}},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, cutoff, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := result.skippedRecent.Load(); got != 1 {
t.Errorf("skippedRecent = %d, want 1 (B-side recent should skip)", got)
}
if got := result.sizeMismatch.Load(); got != 0 {
t.Errorf("sizeMismatch = %d, want 0 (recent B should not surface as mismatch)", got)
}
}
// TestVerifySyncMissingDirRecursesEvenWithRecentMtime verifies that a
// directory missing in B with a recent mtime still has its subtree walked,
// so older missing files inside are reported. A recent child write can bump
// the parent mtime even though older missing files exist underneath.
func TestVerifySyncMissingDirRecursesEvenWithRecentMtime(t *testing.T) {
cutoff := time.Unix(1000, 0)
recentDir := &filer_pb.Entry{
Name: "subdir",
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{Mtime: 2000}, // > cutoff
}
oldChild := &filer_pb.Entry{
Name: "old.txt",
Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 500}, // < cutoff
}
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/": {recentDir},
"/subdir": {oldChild},
},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/": {},
},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, cutoff, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Expect: directory MISSING + recursed-old-file MISSING = 2 missing.
if got := result.missingCount.Load(); got != 2 {
t.Errorf("missingCount = %d, want 2 (recent dir + old child inside)", got)
}
if got := result.skippedRecent.Load(); got != 0 {
t.Errorf("skippedRecent = %d, want 0 (dir mtime should not gate recursion)", got)
}
}
// TestVerifySyncRootPath is a regression test for the path.Join fix.
// fmt.Sprintf("%s/%s", "/", name) produced "//name"; path.Join produces "/name".
// This test walks from "/" and verifies the child directory is found and
// compared correctly (not silently skipped due to a malformed path).
func TestVerifySyncRootPath(t *testing.T) {
clientA := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/": {verifyDirEntry("data")},
"/data": {verifyFileEntry("file.txt", 42)},
},
}
clientB := &verifyTestFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/": {verifyDirEntry("data")},
"/data": {verifyFileEntry("file.txt", 42)},
},
}
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
if err := compareDirectory(context.Background(), clientA, clientB,
"/", "/", false, time.Time{}, sem, result); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.missingCount.Load() != 0 || result.sizeMismatch.Load() != 0 {
t.Errorf("identical trees from root should have no diffs: missing=%d size=%d",
result.missingCount.Load(), result.sizeMismatch.Load())
}
// 2 directories traversed: "/" and "/data"
if got := result.dirCount.Load(); got != 2 {
t.Errorf("dirCount = %d, want 2 (root + /data)", got)
}
// 1 file compared: /data/file.txt
if got := result.fileCount.Load(); got != 1 {
t.Errorf("fileCount = %d, want 1", got)
}
}
// TestVerifySyncNoDeadlockDeepTree ensures that a tree deeper than
// verifySyncConcurrency completes without deadlocking. With a per-call
// semaphore the walk would still complete (just with unbounded goroutines);
// this test mainly guards that the shared-semaphore release-before-recurse
// invariant holds — i.e. the walk finishes within the timeout.
func TestVerifySyncNoDeadlockDeepTree(t *testing.T) {
// Build a binary tree of depth 10 (well past verifySyncConcurrency=5).
const depth = 10
entriesA := make(map[string][]*filer_pb.Entry)
entriesB := make(map[string][]*filer_pb.Entry)
var buildTree func(path string, d int)
buildTree = func(path string, d int) {
if d == 0 {
entriesA[path] = []*filer_pb.Entry{verifyFileEntry("leaf.txt", 1)}
entriesB[path] = []*filer_pb.Entry{verifyFileEntry("leaf.txt", 1)}
return
}
children := []*filer_pb.Entry{verifyDirEntry("left"), verifyDirEntry("right")}
entriesA[path] = children
entriesB[path] = children
buildTree(path+"/left", d-1)
buildTree(path+"/right", d-1)
}
buildTree("/root", depth)
clientA := &verifyTestFilerClient{entriesByDir: entriesA}
clientB := &verifyTestFilerClient{entriesByDir: entriesB}
done := make(chan error, 1)
go func() {
result := &VerifyResult{}
sem := make(chan struct{}, verifySyncConcurrency)
done <- compareDirectory(context.Background(), clientA, clientB,
"/root", "/root", false, time.Time{}, sem, result)
}()
select {
case err := <-done:
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
case <-time.After(10 * time.Second):
t.Fatal("compareDirectory did not complete within 10s — possible deadlock")
}
}