mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* refactor(command): expand "~" in all path-style CLI flags Many of weed's path-bearing flags (-s3.config, -s3.iam.config, -admin.dataDir, -webdav.cacheDir, -volume.dir.idx, TLS cert/key files, profile output paths, mount cache dirs, sftp key files, ...) were never run through util.ResolvePath, so a value like "~/iam.json" was used literally. Tilde only worked when the shell expanded it, which silently fails for the common -flag=~/path form (bash leaves the tilde literal in --opt=~/path). - Extend util.ResolvePath to also handle "~user" / "~user/rest", matching shell tilde expansion. Add unit tests. - Apply util.ResolvePath at the top of each shared start* function (s3, webdav, sftp) so mini/server/filer/standalone callers all inherit it; resolve at the few one-off use sites (mount cache dirs, volume idx folder, mini admin.dataDir, profile paths). - Drop the duplicate expandHomeDir helper from admin.go in favor of the now-equivalent util.ResolvePath. * fixup: handle comma-separated -dir flags for tilde expansion `weed mini -dir`, `weed server -dir`, and `weed volume -dir` accept comma-separated paths (`dir[,dir]...`). Calling util.ResolvePath on the whole string mishandled multi-folder values with tilde, e.g. "~/d1,~/d2" would resolve as if "d1,~/d2" were a single subpath. - Add util.ResolveCommaSeparatedPaths: split on ",", run each entry through ResolvePath, rejoin. Short-circuits when no "~" present. - Use it for *miniDataFolders (mini.go), *volumeDataFolders (server.go), and resolve each entry of v.folders in-place (volume.go) so all downstream consumers see resolved paths. - Add 7-case TestResolveCommaSeparatedPaths covering empty, single, multiple, and mixed inputs. * address PR review: metaFolder + Windows backslash - master.go: resolve *m.metaFolder at the top of runMaster so util.FullPath(*m.metaFolder) on the next line sees an expanded path. Drop the now-redundant ResolvePath in TestFolderWritable. - server.go: same treatment for *masterOptions.metaFolder, paired with the existing cpu/mem profile resolves. Drop the redundant inner ResolvePath at TestFolderWritable. - file_util.go: ResolvePath now accepts filepath.Separator as a separator after the tilde, so "~\\data" works on Windows. Other platforms keep current behaviour (backslash stays literal because it is a valid filename character in usernames and paths). - file_util_test.go: add two cases using filepath.Separator that exercise the new code path on Windows and remain a no-op on Unix. * address PR review: resolve "~" in remaining command path flags Comprehensive sweep of path-bearing flags across every weed subcommand, applying util.ResolvePath in-place at the top of each run* function so all downstream consumers see expanded paths. - webdav.go: resolve *wo.cacheDir at the top of startWebDav so mini/server/filer/standalone callers all inherit it. - mount_std.go: cpu/mem profile paths. - filer_sync.go: cpu/mem profile paths. - mq_broker.go: cpu/mem profile paths. - benchmark.go: cpuprofile output path. - backup.go: -dir resolved once at runBackup; drop the duplicated inline ResolvePath in NewVolume calls. - compact.go: -dir resolved at runCompact; drop inline ResolvePath. - export.go: -dir and -o resolved at runExport; drop inline ResolvePath in LoadFromIdx and ScanVolumeFile. - download.go: -dir resolved at runDownload; drop inline. - update.go: -dir resolved at runUpdate so filepath.Join uses the expanded path; drop inline ResolvePath in TestFolderWritable. - scaffold.go: -output expanded before filepath.Join. - worker.go: -workingDir expanded before being passed to runtime. * address PR review: resolve option-struct paths at run* entry points server.go:381 propagates s3Options.config to filerOptions.s3ConfigFile *before* startS3Server runs, which meant the filer-side code saw the unresolved tilde-prefixed pointer. Same pattern for webdavOptions and sftpOptions (and equivalent in mini.go / filer.go). The fix: hoist resolution from the shared start* functions up to the run* entry points, where every shared pointer is set up before any propagation happens. - s3.go, webdav.go, sftp.go: extract a resolvePaths() method on each Options struct that runs every path field through util.ResolvePath in-place. Idempotent. - runS3, runWebDav, runSftp: call the standalone struct's resolvePaths before starting metrics / loading security config. - runServer, runMini, runFiler: call resolvePaths on every embedded options struct, plus resolve loose flags (serverIamConfig, miniS3Config, miniIamConfig, miniMasterOptions.metaFolder, and filer's defaultLevelDbDirectory) so they're expanded before any pointer copy or use. - Drop the now-redundant inline ResolvePath at filer's defaultLevelDbDirectory composition. * address PR review: re-resolve mini -dir post-config, cover misc paths - mini.go: applyConfigFileOptions can overwrite -dir with a literal ~/data from mini.options. Re-resolve *miniDataFolders after the config-file apply, alongside the other path resolves, so the mini filer no longer ends up with a literal ~/data/filerldb2. - benchmark.go: resolve *b.idListFile (-list). - filer_sync.go: resolve *syncOptions.aSecurity / .bSecurity (-a.security / -b.security) before LoadClientTLSFromFile. - filer_cat.go: resolve *filerCat.output (-o) before os.OpenFile. - admin.go: drop trailing blank line at EOF (git diff --check). * address PR review: resolve -a.security/-b.security/-config before use Three follow-up fixes: - filer_sync.go: the -a.security / -b.security resolves were placed *after* LoadClientTLSFromFile / LoadHTTPClientFromFile were called, so weed filer.sync -a.security=~/a.toml still passed the literal tilde path. Hoist the resolves above the security-loading block so TLS clients see expanded paths. - filer_sync_verify.go: same flag pair was never resolved at all in the verify command; resolve at the top of runFilerSyncVerify. - filer_meta_backup.go: -config (the backup_filer.toml path) was passed directly to viper. Resolve at the top of runFilerMetaBackup. - mini.go: master.dir defaulted to the entire comma-joined miniDataFolders. With weed mini -dir=~/d1,~/d2 (or any multi-dir setup), TestFolderWritable then stat'd the joined string instead of a single directory. Default to the first entry via StringSplit to mirror the disk-space calculation a few lines below, and drop the now-redundant ResolvePath in TestFolderWritable.
677 lines
22 KiB
Go
677 lines
22 KiB
Go
package command
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type SyncVerifyOptions struct {
|
|
filerA *string
|
|
filerB *string
|
|
aPath *string
|
|
bPath *string
|
|
aSecurity *string
|
|
bSecurity *string
|
|
isActivePassive *bool
|
|
modifiedTimeAgo *time.Duration
|
|
jsonOutput *bool
|
|
}
|
|
|
|
var syncVerifyOptions SyncVerifyOptions
|
|
|
|
func init() {
|
|
cmdFilerSyncVerify.Run = runFilerSyncVerify // break init cycle
|
|
syncVerifyOptions.filerA = cmdFilerSyncVerify.Flag.String("a", "", "filer A in one SeaweedFS cluster")
|
|
syncVerifyOptions.filerB = cmdFilerSyncVerify.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
|
|
syncVerifyOptions.aPath = cmdFilerSyncVerify.Flag.String("a.path", "/", "directory to verify on filer A")
|
|
syncVerifyOptions.bPath = cmdFilerSyncVerify.Flag.String("b.path", "/", "directory to verify on filer B")
|
|
syncVerifyOptions.aSecurity = cmdFilerSyncVerify.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates")
|
|
syncVerifyOptions.bSecurity = cmdFilerSyncVerify.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates")
|
|
syncVerifyOptions.isActivePassive = cmdFilerSyncVerify.Flag.Bool("isActivePassive", false, "one directional comparison from A to B; entries only in B are not reported")
|
|
syncVerifyOptions.modifiedTimeAgo = cmdFilerSyncVerify.Flag.Duration("modifiedTimeAgo", 0, "only verify files modified before this duration ago (e.g. 1h) for sync-lag tolerance")
|
|
syncVerifyOptions.jsonOutput = cmdFilerSyncVerify.Flag.Bool("jsonOutput", false, "emit NDJSON output (one JSON object per line) for external tooling")
|
|
}
|
|
|
|
var cmdFilerSyncVerify = &Command{
|
|
UsageLine: "filer.sync.verify -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
|
|
Short: "compare entries between two filers and report differences",
|
|
Long: `compare entries between two filers and report differences, then exit.
|
|
|
|
Useful for validating active/passive sync targets agree with the source.
|
|
Reports MISSING (in A but not in B), ONLY_IN_B (in B but not in A; suppressed
|
|
in active-passive mode), SIZE_MISMATCH, and ETAG_MISMATCH. Honors
|
|
-modifiedTimeAgo to skip recently-modified files (sync-lag tolerance) and
|
|
-isActivePassive for unidirectional comparison.
|
|
|
|
Exits with code 0 on agreement, 2 on differences or operational errors.
|
|
|
|
`,
|
|
}
|
|
|
|
func runFilerSyncVerify(cmd *Command, args []string) bool {
|
|
*syncVerifyOptions.aSecurity = util.ResolvePath(*syncVerifyOptions.aSecurity)
|
|
*syncVerifyOptions.bSecurity = util.ResolvePath(*syncVerifyOptions.bSecurity)
|
|
util.LoadSecurityConfiguration()
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
|
|
grpcDialOptionA := grpcDialOption
|
|
grpcDialOptionB := grpcDialOption
|
|
if *syncVerifyOptions.aSecurity != "" {
|
|
var err error
|
|
if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncVerifyOptions.aSecurity, "grpc.client"); err != nil {
|
|
glog.Fatalf("load security config for filer A: %v", err)
|
|
}
|
|
}
|
|
if *syncVerifyOptions.bSecurity != "" {
|
|
var err error
|
|
if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncVerifyOptions.bSecurity, "grpc.client"); err != nil {
|
|
glog.Fatalf("load security config for filer B: %v", err)
|
|
}
|
|
}
|
|
|
|
filerA := pb.ServerAddress(*syncVerifyOptions.filerA)
|
|
filerB := pb.ServerAddress(*syncVerifyOptions.filerB)
|
|
|
|
if err := runVerifySync(filerA, filerB, *syncVerifyOptions.aPath, *syncVerifyOptions.bPath,
|
|
*syncVerifyOptions.isActivePassive, *syncVerifyOptions.modifiedTimeAgo,
|
|
*syncVerifyOptions.jsonOutput,
|
|
grpcDialOptionA, grpcDialOptionB); err != nil {
|
|
glog.Errorf("verify sync: %v", err)
|
|
os.Exit(2)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// verifySyncConcurrency caps concurrent directory I/O across the entire
|
|
// recursive walk. A single shared semaphore is created in runVerifySync and
|
|
// passed down so the limit applies globally — a per-call semaphore would only
|
|
// cap concurrency per directory level, allowing fanout to grow as
|
|
// verifySyncConcurrency^depth on deep trees.
|
|
//
|
|
// Trade-off: higher values reduce wall time on wide trees by parallelizing
|
|
// listEntries RPCs, at the cost of more concurrent load on both filers and
|
|
// more memory from queued goroutines (each waiting goroutine ~2KB stack).
|
|
// Each compareDirectory holds a slot only for its own listing+compare phase
|
|
// and releases before recursing, so a parent never blocks waiting for
|
|
// children to acquire slots.
|
|
const verifySyncConcurrency = 5
|
|
|
|
// isTooRecent reports whether entry's mtime is past cutoff (sync-lag tolerance).
|
|
// Returns false when cutoff is zero or attributes are missing.
|
|
func isTooRecent(entry *filer_pb.Entry, cutoff time.Time) bool {
|
|
return !cutoff.IsZero() && entry != nil && entry.Attributes != nil && entry.Attributes.Mtime > cutoff.Unix()
|
|
}
|
|
|
|
type VerifyResult struct {
|
|
dirCount atomic.Int64
|
|
fileCount atomic.Int64
|
|
missingCount atomic.Int64
|
|
sizeMismatch atomic.Int64
|
|
etagMismatch atomic.Int64
|
|
onlyInB atomic.Int64
|
|
skippedRecent atomic.Int64
|
|
|
|
// outputMu serializes writes to stdout. Multiple goroutines call
|
|
// reportDiff concurrently from compareDirectory worker pool.
|
|
outputMu sync.Mutex
|
|
jsonOutput bool
|
|
}
|
|
|
|
type verifyDiffType int
|
|
|
|
const (
|
|
diffMissing verifyDiffType = iota // in A but not in B
|
|
diffOnlyInB // in B but not in A
|
|
diffSizeMismatch // size differs
|
|
diffETagMismatch // etag differs
|
|
)
|
|
|
|
// diffRecord is the JSON Lines schema for a single diff entry.
|
|
type diffRecord struct {
|
|
Type string `json:"type"`
|
|
Path string `json:"path"`
|
|
IsDirectory bool `json:"isDirectory,omitempty"`
|
|
A *entryRecord `json:"a,omitempty"`
|
|
B *entryRecord `json:"b,omitempty"`
|
|
MtimeRelation string `json:"mtimeRelation,omitempty"` // EQUAL | A_NEWER | B_NEWER
|
|
MtimeDelta string `json:"mtimeDelta,omitempty"` // human-readable, e.g. "5d", "12h"
|
|
Hint string `json:"hint,omitempty"` // late_updates_skip_likely | sync_lag_or_event_miss
|
|
}
|
|
|
|
type entryRecord struct {
|
|
Size uint64 `json:"size"`
|
|
Mtime int64 `json:"mtime"`
|
|
ETag string `json:"etag,omitempty"`
|
|
}
|
|
|
|
type summaryRecord struct {
|
|
Type string `json:"type"`
|
|
Directories int64 `json:"directories"`
|
|
Files int64 `json:"files"`
|
|
SkippedRecent int64 `json:"skippedRecent"`
|
|
Missing int64 `json:"missing"`
|
|
SizeMismatch int64 `json:"sizeMismatch"`
|
|
ETagMismatch int64 `json:"etagMismatch"`
|
|
OnlyInB int64 `json:"onlyInB"`
|
|
TotalErrors int64 `json:"totalErrors"`
|
|
}
|
|
|
|
// simpleFilerClient implements filer_pb.FilerClient for gRPC connections
|
|
type simpleFilerClient struct {
|
|
grpcAddress pb.ServerAddress
|
|
grpcDialOption grpc.DialOption
|
|
}
|
|
|
|
func (c *simpleFilerClient) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
|
return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
|
return fn(client)
|
|
}, c.grpcAddress.ToGrpcAddress(), false, c.grpcDialOption)
|
|
}
|
|
|
|
func (c *simpleFilerClient) AdjustedUrl(location *filer_pb.Location) string {
|
|
return location.Url
|
|
}
|
|
|
|
func (c *simpleFilerClient) GetDataCenter() string {
|
|
return ""
|
|
}
|
|
|
|
func runVerifySync(filerA, filerB pb.ServerAddress, aPath, bPath string,
|
|
isActivePassive bool, modifiedTimeAgo time.Duration,
|
|
jsonOutput bool,
|
|
grpcDialOptionA, grpcDialOptionB grpc.DialOption) error {
|
|
|
|
clientA := &simpleFilerClient{grpcAddress: filerA, grpcDialOption: grpcDialOptionA}
|
|
clientB := &simpleFilerClient{grpcAddress: filerB, grpcDialOption: grpcDialOptionB}
|
|
|
|
var cutoffTime time.Time
|
|
if modifiedTimeAgo > 0 {
|
|
cutoffTime = time.Now().Add(-modifiedTimeAgo)
|
|
}
|
|
|
|
if !jsonOutput {
|
|
if !cutoffTime.IsZero() {
|
|
fmt.Fprintf(os.Stdout, "Verifying files modified before %v (modifiedTimeAgo=%v)\n",
|
|
cutoffTime.Format(time.RFC3339), modifiedTimeAgo)
|
|
}
|
|
fmt.Fprintf(os.Stdout, "Comparing %s%s => %s%s (isActivePassive=%v)\n\n",
|
|
filerA, aPath, filerB, bPath, isActivePassive)
|
|
}
|
|
|
|
result := &VerifyResult{jsonOutput: jsonOutput}
|
|
ctx := context.Background()
|
|
sem := make(chan struct{}, verifySyncConcurrency)
|
|
|
|
if err := compareDirectory(ctx, clientA, clientB, aPath, bPath, isActivePassive, cutoffTime, sem, result); err != nil {
|
|
return err
|
|
}
|
|
|
|
totalErrors := result.missingCount.Load() + result.sizeMismatch.Load() + result.etagMismatch.Load()
|
|
if !isActivePassive {
|
|
totalErrors += result.onlyInB.Load()
|
|
}
|
|
|
|
if jsonOutput {
|
|
summary := summaryRecord{
|
|
Type: "SUMMARY",
|
|
Directories: result.dirCount.Load(),
|
|
Files: result.fileCount.Load(),
|
|
SkippedRecent: result.skippedRecent.Load(),
|
|
Missing: result.missingCount.Load(),
|
|
SizeMismatch: result.sizeMismatch.Load(),
|
|
ETagMismatch: result.etagMismatch.Load(),
|
|
OnlyInB: result.onlyInB.Load(),
|
|
TotalErrors: totalErrors,
|
|
}
|
|
writeJSONLine(result, summary)
|
|
} else {
|
|
fmt.Fprintf(os.Stdout, "\nSummary:\n")
|
|
fmt.Fprintf(os.Stdout, " Directories compared: %d\n", result.dirCount.Load())
|
|
fmt.Fprintf(os.Stdout, " Files verified: %d\n", result.fileCount.Load())
|
|
if result.skippedRecent.Load() > 0 {
|
|
fmt.Fprintf(os.Stdout, " Skipped (too recent): %d\n", result.skippedRecent.Load())
|
|
}
|
|
fmt.Fprintf(os.Stdout, " Missing in B: %d\n", result.missingCount.Load())
|
|
fmt.Fprintf(os.Stdout, " Size mismatch: %d\n", result.sizeMismatch.Load())
|
|
fmt.Fprintf(os.Stdout, " ETag mismatch: %d\n", result.etagMismatch.Load())
|
|
if !isActivePassive {
|
|
fmt.Fprintf(os.Stdout, " Only in B: %d\n", result.onlyInB.Load())
|
|
}
|
|
fmt.Fprintf(os.Stdout, " Total errors: %d\n", totalErrors)
|
|
}
|
|
|
|
if totalErrors > 0 {
|
|
return fmt.Errorf("found %d differences", totalErrors)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// entryStream is a sorted, streaming view of a single directory's entries.
|
|
// A background goroutine pages through the directory via ReadDirAllEntries
|
|
// and forwards each entry to a buffered channel; the caller consumes entries
|
|
// one at a time through peek/advance. Memory usage is O(channel buffer) —
|
|
// independent of directory size — rather than O(total entries).
|
|
type entryStream struct {
|
|
ch <-chan *filer_pb.Entry
|
|
head *filer_pb.Entry
|
|
done bool
|
|
err error // written before ch is closed; safe to read once done==true
|
|
}
|
|
|
|
// newEntryStream starts the background goroutine. It exits when listing
|
|
// completes, an error occurs, or ctx is cancelled; the channel is always
|
|
// closed before exit so consumers do not block indefinitely.
|
|
func newEntryStream(ctx context.Context, client filer_pb.FilerClient, dir string) *entryStream {
|
|
ch := make(chan *filer_pb.Entry, 64)
|
|
s := &entryStream{ch: ch}
|
|
go func() {
|
|
defer close(ch)
|
|
s.err = filer_pb.ReadDirAllEntries(ctx, client, util.FullPath(dir), "",
|
|
func(entry *filer_pb.Entry, isLast bool) error {
|
|
select {
|
|
case ch <- entry:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
})
|
|
}()
|
|
return s
|
|
}
|
|
|
|
// peek returns the next entry without consuming it, or nil at end-of-stream.
|
|
func (s *entryStream) peek() *filer_pb.Entry {
|
|
if s.done {
|
|
return nil
|
|
}
|
|
if s.head == nil {
|
|
e, ok := <-s.ch
|
|
if !ok {
|
|
s.done = true
|
|
return nil
|
|
}
|
|
s.head = e
|
|
}
|
|
return s.head
|
|
}
|
|
|
|
// advance consumes and returns the next entry.
|
|
func (s *entryStream) advance() *filer_pb.Entry {
|
|
e := s.peek()
|
|
s.head = nil
|
|
return e
|
|
}
|
|
|
|
func compareDirectory(ctx context.Context,
|
|
clientA, clientB filer_pb.FilerClient,
|
|
dirA, dirB string,
|
|
isActivePassive bool,
|
|
cutoffTime time.Time,
|
|
sem chan struct{},
|
|
result *VerifyResult) error {
|
|
|
|
// Hold a slot only for this directory's I/O phase (listings + merge).
|
|
// Released before recursing so parents never block waiting for children
|
|
// to acquire slots — see verifySyncConcurrency for the rationale.
|
|
sem <- struct{}{}
|
|
released := false
|
|
releaseSlot := func() {
|
|
if !released {
|
|
released = true
|
|
<-sem
|
|
}
|
|
}
|
|
defer releaseSlot()
|
|
|
|
result.dirCount.Add(1)
|
|
|
|
// A child context ensures that stream goroutines are cancelled and their
|
|
// channels are closed if compareDirectory returns early (e.g. on error).
|
|
mergeCtx, cancelMerge := context.WithCancel(ctx)
|
|
defer cancelMerge()
|
|
|
|
streamA := newEntryStream(mergeCtx, clientA, dirA)
|
|
streamB := newEntryStream(mergeCtx, clientB, dirB)
|
|
|
|
// collect subdirectories for recursive comparison
|
|
type dirPair struct{ a, b string }
|
|
var subDirs []dirPair
|
|
|
|
for streamA.peek() != nil || streamB.peek() != nil {
|
|
eA := streamA.peek()
|
|
eB := streamB.peek()
|
|
|
|
switch {
|
|
case eA != nil && (eB == nil || eA.Name < eB.Name):
|
|
// entry only in A
|
|
entryA := streamA.advance()
|
|
if entryA.IsDirectory {
|
|
// Always recurse for missing-in-B directories: a recent
|
|
// child write can bump the parent's mtime even though
|
|
// older missing files exist underneath. The cutoff is
|
|
// applied per-file inside countMissingRecursive.
|
|
reportDiff(diffMissing, dirA, entryA, nil, result)
|
|
countMissingRecursive(ctx, clientA, path.Join(dirA, entryA.Name), cutoffTime, result)
|
|
} else if isTooRecent(entryA, cutoffTime) {
|
|
result.skippedRecent.Add(1)
|
|
} else {
|
|
reportDiff(diffMissing, dirA, entryA, nil, result)
|
|
}
|
|
|
|
case eB != nil && (eA == nil || eB.Name < eA.Name):
|
|
// entry only in B
|
|
entryB := streamB.advance()
|
|
if !isActivePassive {
|
|
if isTooRecent(entryB, cutoffTime) {
|
|
result.skippedRecent.Add(1)
|
|
} else {
|
|
reportDiff(diffOnlyInB, dirB, entryB, nil, result)
|
|
}
|
|
}
|
|
|
|
default:
|
|
// same name in both
|
|
entryA := streamA.advance()
|
|
entryB := streamB.advance()
|
|
|
|
if entryA.IsDirectory && entryB.IsDirectory {
|
|
subDirs = append(subDirs, dirPair{
|
|
a: path.Join(dirA, entryA.Name),
|
|
b: path.Join(dirB, entryB.Name),
|
|
})
|
|
} else if !entryA.IsDirectory && !entryB.IsDirectory {
|
|
// Skip if either side was modified recently (sync-lag tolerance).
|
|
if isTooRecent(entryA, cutoffTime) || isTooRecent(entryB, cutoffTime) {
|
|
result.skippedRecent.Add(1)
|
|
} else {
|
|
compareEntries(dirA, entryA, entryB, result)
|
|
}
|
|
} else {
|
|
// type mismatch: one is dir, other is file
|
|
reportDiff(diffMissing, dirA, entryA, nil, result)
|
|
if !isActivePassive {
|
|
reportDiff(diffOnlyInB, dirB, entryB, nil, result)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Both channels are closed: close happens-before the receive of the zero
|
|
// value, so stream.err is visible here without additional synchronisation.
|
|
if err := streamA.err; err != nil && err != context.Canceled {
|
|
return fmt.Errorf("list %s on filer A: %v", dirA, err)
|
|
}
|
|
if err := streamB.err; err != nil && err != context.Canceled {
|
|
return fmt.Errorf("list %s on filer B: %v", dirB, err)
|
|
}
|
|
|
|
// Release our slot before recursing so children can acquire it. Holding
|
|
// it across wg.Wait would deadlock once depth exceeds verifySyncConcurrency.
|
|
releaseSlot()
|
|
|
|
if len(subDirs) > 0 {
|
|
// Bounded worker pool: cap goroutines per directory level instead
|
|
// of spawning one per child. A directory with thousands of subdirs
|
|
// would otherwise park ~2KB per waiting goroutine even though
|
|
// only `verifySyncConcurrency` can do I/O at once.
|
|
workers := verifySyncConcurrency
|
|
if len(subDirs) < workers {
|
|
workers = len(subDirs)
|
|
}
|
|
jobs := make(chan dirPair, len(subDirs))
|
|
errCh := make(chan error, 1) // first error wins; others dropped
|
|
var wg sync.WaitGroup
|
|
|
|
for i := 0; i < workers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for pair := range jobs {
|
|
if err := compareDirectory(ctx, clientA, clientB, pair.a, pair.b, isActivePassive, cutoffTime, sem, result); err != nil {
|
|
select {
|
|
case errCh <- err:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
for _, pair := range subDirs {
|
|
jobs <- pair
|
|
}
|
|
close(jobs)
|
|
wg.Wait()
|
|
close(errCh)
|
|
if err := <-errCh; err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
|
|
func compareEntries(dir string, entryA, entryB *filer_pb.Entry, result *VerifyResult) {
|
|
result.fileCount.Add(1)
|
|
|
|
sizeA := filer.FileSize(entryA)
|
|
sizeB := filer.FileSize(entryB)
|
|
if sizeA != sizeB {
|
|
reportDiff(diffSizeMismatch, dir, entryA, entryB, result)
|
|
return
|
|
}
|
|
|
|
etagA := filer.ETag(entryA)
|
|
etagB := filer.ETag(entryB)
|
|
if etagA != etagB {
|
|
reportDiff(diffETagMismatch, dir, entryA, entryB, result)
|
|
return
|
|
}
|
|
}
|
|
|
|
// mtimeRelation classifies B.mtime vs A.mtime. Both entries must be non-nil.
|
|
// Returns relation, absolute delta in seconds, and human-readable string.
|
|
func mtimeRelation(entryA, entryB *filer_pb.Entry) (relation, deltaStr string, deltaSec int64) {
|
|
if entryA == nil || entryB == nil || entryA.Attributes == nil || entryB.Attributes == nil {
|
|
return "", "", 0
|
|
}
|
|
delta := entryB.Attributes.Mtime - entryA.Attributes.Mtime
|
|
abs := delta
|
|
if abs < 0 {
|
|
abs = -abs
|
|
}
|
|
switch {
|
|
case delta == 0:
|
|
return "EQUAL", "0s", 0
|
|
case delta > 0:
|
|
return "B_NEWER", formatSeconds(abs), abs
|
|
default:
|
|
return "A_NEWER", formatSeconds(abs), abs
|
|
}
|
|
}
|
|
|
|
func formatSeconds(s int64) string {
|
|
switch {
|
|
case s < 60:
|
|
return fmt.Sprintf("%ds", s)
|
|
case s < 3600:
|
|
return fmt.Sprintf("%dm", s/60)
|
|
case s < 86400:
|
|
return fmt.Sprintf("%dh", s/3600)
|
|
default:
|
|
return fmt.Sprintf("%dd", s/86400)
|
|
}
|
|
}
|
|
|
|
// hintFor returns an automatic interpretation hint based on mtime relation.
|
|
// Only emitted for SIZE_MISMATCH and ETAG_MISMATCH where both entries exist.
|
|
func hintFor(relation string) string {
|
|
switch relation {
|
|
case "B_NEWER":
|
|
return "late_updates_skip_likely"
|
|
case "A_NEWER":
|
|
return "sync_lag_or_event_miss"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func reportDiff(diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry, result *VerifyResult) {
|
|
switch diffType {
|
|
case diffMissing:
|
|
result.missingCount.Add(1)
|
|
case diffOnlyInB:
|
|
result.onlyInB.Add(1)
|
|
case diffSizeMismatch:
|
|
result.sizeMismatch.Add(1)
|
|
case diffETagMismatch:
|
|
result.etagMismatch.Add(1)
|
|
}
|
|
|
|
if result.jsonOutput {
|
|
writeJSONDiff(result, diffType, dir, entryA, entryB)
|
|
} else {
|
|
writeTextDiff(result, diffType, dir, entryA, entryB)
|
|
}
|
|
}
|
|
|
|
func writeTextDiff(result *VerifyResult, diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry) {
|
|
entryPath := path.Join(dir, entryA.Name)
|
|
|
|
result.outputMu.Lock()
|
|
defer result.outputMu.Unlock()
|
|
|
|
switch diffType {
|
|
case diffMissing:
|
|
if entryA.IsDirectory {
|
|
fmt.Fprintf(os.Stdout, "[MISSING] %s/ (directory)\n", entryPath)
|
|
} else {
|
|
fmt.Fprintf(os.Stdout, "[MISSING] %s (size=%d, etag=%s)\n",
|
|
entryPath, filer.FileSize(entryA), filer.ETag(entryA))
|
|
}
|
|
case diffOnlyInB:
|
|
fmt.Fprintf(os.Stdout, "[ONLY_IN_B] %s\n", entryPath)
|
|
case diffSizeMismatch:
|
|
ann := annotation(entryA, entryB)
|
|
fmt.Fprintf(os.Stdout, "[SIZE_MISMATCH] %s (a=%d, b=%d%s)\n",
|
|
entryPath, filer.FileSize(entryA), filer.FileSize(entryB), ann)
|
|
case diffETagMismatch:
|
|
ann := annotation(entryA, entryB)
|
|
fmt.Fprintf(os.Stdout, "[ETAG_MISMATCH] %s (a=%s, b=%s%s)\n",
|
|
entryPath, filer.ETag(entryA), filer.ETag(entryB), ann)
|
|
}
|
|
}
|
|
|
|
// annotation builds the trailing ", mtime: ... [hint]" segment for text output.
|
|
// Returns empty string if entries are unavailable.
|
|
func annotation(entryA, entryB *filer_pb.Entry) string {
|
|
relation, delta, _ := mtimeRelation(entryA, entryB)
|
|
if relation == "" {
|
|
return ""
|
|
}
|
|
switch relation {
|
|
case "EQUAL":
|
|
return ", mtime equal [chunk-level issue]"
|
|
case "B_NEWER":
|
|
return fmt.Sprintf(", B newer +%s [late-updates skip likely]", delta)
|
|
case "A_NEWER":
|
|
return fmt.Sprintf(", A newer +%s [sync lag or event miss]", delta)
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func writeJSONDiff(result *VerifyResult, diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry) {
|
|
rec := diffRecord{Path: path.Join(dir, entryA.Name)}
|
|
|
|
switch diffType {
|
|
case diffMissing:
|
|
rec.Type = "MISSING"
|
|
rec.IsDirectory = entryA.IsDirectory
|
|
rec.A = toEntryRecord(entryA)
|
|
case diffOnlyInB:
|
|
rec.Type = "ONLY_IN_B"
|
|
// for diffOnlyInB the existing convention passes the entry as entryA
|
|
rec.IsDirectory = entryA.IsDirectory
|
|
rec.B = toEntryRecord(entryA)
|
|
case diffSizeMismatch:
|
|
rec.Type = "SIZE_MISMATCH"
|
|
rec.A = toEntryRecord(entryA)
|
|
rec.B = toEntryRecord(entryB)
|
|
relation, delta, _ := mtimeRelation(entryA, entryB)
|
|
rec.MtimeRelation = relation
|
|
rec.MtimeDelta = delta
|
|
rec.Hint = hintFor(relation)
|
|
case diffETagMismatch:
|
|
rec.Type = "ETAG_MISMATCH"
|
|
rec.A = toEntryRecord(entryA)
|
|
rec.B = toEntryRecord(entryB)
|
|
relation, delta, _ := mtimeRelation(entryA, entryB)
|
|
rec.MtimeRelation = relation
|
|
rec.MtimeDelta = delta
|
|
rec.Hint = hintFor(relation)
|
|
}
|
|
|
|
writeJSONLine(result, rec)
|
|
}
|
|
|
|
func toEntryRecord(entry *filer_pb.Entry) *entryRecord {
|
|
if entry == nil {
|
|
return nil
|
|
}
|
|
r := &entryRecord{
|
|
Size: filer.FileSize(entry),
|
|
ETag: filer.ETag(entry),
|
|
}
|
|
if entry.Attributes != nil {
|
|
r.Mtime = entry.Attributes.Mtime
|
|
}
|
|
return r
|
|
}
|
|
|
|
// writeJSONLine emits a single JSON object followed by newline. Holds outputMu
|
|
// across marshal+write so concurrent goroutines never interleave.
|
|
func writeJSONLine(result *VerifyResult, v any) {
|
|
data, err := json.Marshal(v)
|
|
if err != nil {
|
|
glog.Warningf("marshal verify record: %v", err)
|
|
return
|
|
}
|
|
result.outputMu.Lock()
|
|
defer result.outputMu.Unlock()
|
|
os.Stdout.Write(data)
|
|
os.Stdout.Write([]byte{'\n'})
|
|
}
|
|
|
|
func countMissingRecursive(ctx context.Context, client filer_pb.FilerClient, dir string, cutoffTime time.Time, result *VerifyResult) {
|
|
err := filer_pb.ReadDirAllEntries(ctx, client, util.FullPath(dir), "",
|
|
func(entry *filer_pb.Entry, isLast bool) error {
|
|
if entry.IsDirectory {
|
|
countMissingRecursive(ctx, client, path.Join(dir, entry.Name), cutoffTime, result)
|
|
return nil
|
|
}
|
|
if !cutoffTime.IsZero() && entry.Attributes != nil && entry.Attributes.Mtime > cutoffTime.Unix() {
|
|
result.skippedRecent.Add(1)
|
|
return nil
|
|
}
|
|
reportDiff(diffMissing, dir, entry, nil, result)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
glog.Warningf("list missing directory %s: %v", dir, err)
|
|
}
|
|
}
|