Files
seaweedfs/weed/command/filer_sync.go
Chris Lu d605feb403 refactor(command): expand "~" in all path-style CLI flags (#9306)
* refactor(command): expand "~" in all path-style CLI flags

Many of weed's path-bearing flags (-s3.config, -s3.iam.config,
-admin.dataDir, -webdav.cacheDir, -volume.dir.idx, TLS cert/key
files, profile output paths, mount cache dirs, sftp key files, ...)
were never run through util.ResolvePath, so a value like "~/iam.json"
was used literally. Tilde only worked when the shell expanded it,
which silently fails for the common -flag=~/path form (bash leaves
the tilde literal in --opt=~/path).

- Extend util.ResolvePath to also handle "~user" / "~user/rest",
  matching shell tilde expansion. Add unit tests.
- Apply util.ResolvePath at the top of each shared start* function
  (s3, webdav, sftp) so mini/server/filer/standalone callers all
  inherit it; resolve at the few one-off use sites (mount cache
  dirs, volume idx folder, mini admin.dataDir, profile paths).
- Drop the duplicate expandHomeDir helper from admin.go in favor of
  the now-equivalent util.ResolvePath.

* fixup: handle comma-separated -dir flags for tilde expansion

`weed mini -dir`, `weed server -dir`, and `weed volume -dir` accept
comma-separated paths (`dir[,dir]...`). Calling util.ResolvePath on
the whole string mishandled multi-folder values with tilde, e.g.
"~/d1,~/d2" would resolve as if "d1,~/d2" were a single subpath.

- Add util.ResolveCommaSeparatedPaths: split on ",", run each entry
  through ResolvePath, rejoin. Short-circuits when no "~" present.
- Use it for *miniDataFolders (mini.go), *volumeDataFolders (server.go),
  and resolve each entry of v.folders in-place (volume.go) so all
  downstream consumers see resolved paths.
- Add 7-case TestResolveCommaSeparatedPaths covering empty, single,
  multiple, and mixed inputs.

* address PR review: metaFolder + Windows backslash

- master.go: resolve *m.metaFolder at the top of runMaster so
  util.FullPath(*m.metaFolder) on the next line sees an expanded
  path. Drop the now-redundant ResolvePath in TestFolderWritable.
- server.go: same treatment for *masterOptions.metaFolder, paired
  with the existing cpu/mem profile resolves. Drop the redundant
  inner ResolvePath at TestFolderWritable.
- file_util.go: ResolvePath now accepts filepath.Separator as a
  separator after the tilde, so "~\\data" works on Windows. Other
  platforms keep current behaviour (backslash stays literal because
  it is a valid filename character in usernames and paths).
- file_util_test.go: add two cases using filepath.Separator that
  exercise the new code path on Windows and remain a no-op on Unix.

* address PR review: resolve "~" in remaining command path flags

Comprehensive sweep of path-bearing flags across every weed
subcommand, applying util.ResolvePath in-place at the top of each
run* function so all downstream consumers see expanded paths.

- webdav.go: resolve *wo.cacheDir at the top of startWebDav so
  mini/server/filer/standalone callers all inherit it.
- mount_std.go: cpu/mem profile paths.
- filer_sync.go: cpu/mem profile paths.
- mq_broker.go: cpu/mem profile paths.
- benchmark.go: cpuprofile output path.
- backup.go: -dir resolved once at runBackup; drop the duplicated
  inline ResolvePath in NewVolume calls.
- compact.go: -dir resolved at runCompact; drop inline ResolvePath.
- export.go: -dir and -o resolved at runExport; drop inline
  ResolvePath in LoadFromIdx and ScanVolumeFile.
- download.go: -dir resolved at runDownload; drop inline.
- update.go: -dir resolved at runUpdate so filepath.Join uses the
  expanded path; drop inline ResolvePath in TestFolderWritable.
- scaffold.go: -output expanded before filepath.Join.
- worker.go: -workingDir expanded before being passed to runtime.

* address PR review: resolve option-struct paths at run* entry points

server.go:381 propagates s3Options.config to filerOptions.s3ConfigFile
*before* startS3Server runs, which meant the filer-side code saw the
unresolved tilde-prefixed pointer. Same pattern for webdavOptions and
sftpOptions (and equivalent in mini.go / filer.go).

The fix: hoist resolution from the shared start* functions up to the
run* entry points, where every shared pointer is set up before any
propagation happens.

- s3.go, webdav.go, sftp.go: extract a resolvePaths() method on each
  Options struct that runs every path field through util.ResolvePath
  in-place. Idempotent.
- runS3, runWebDav, runSftp: call the standalone struct's resolvePaths
  before starting metrics / loading security config.
- runServer, runMini, runFiler: call resolvePaths on every embedded
  options struct, plus resolve loose flags (serverIamConfig,
  miniS3Config, miniIamConfig, miniMasterOptions.metaFolder, and
  filer's defaultLevelDbDirectory) so they're expanded before any
  pointer copy or use.
- Drop the now-redundant inline ResolvePath at filer's
  defaultLevelDbDirectory composition.

* address PR review: re-resolve mini -dir post-config, cover misc paths

- mini.go: applyConfigFileOptions can overwrite -dir with a literal
  ~/data from mini.options. Re-resolve *miniDataFolders after the
  config-file apply, alongside the other path resolves, so the mini
  filer no longer ends up with a literal ~/data/filerldb2.
- benchmark.go: resolve *b.idListFile (-list).
- filer_sync.go: resolve *syncOptions.aSecurity / .bSecurity
  (-a.security / -b.security) before LoadClientTLSFromFile.
- filer_cat.go: resolve *filerCat.output (-o) before os.OpenFile.
- admin.go: drop trailing blank line at EOF (git diff --check).

* address PR review: resolve -a.security/-b.security/-config before use

Three follow-up fixes:

- filer_sync.go: the -a.security / -b.security resolves were placed
  *after* LoadClientTLSFromFile / LoadHTTPClientFromFile were called,
  so weed filer.sync -a.security=~/a.toml still passed the literal
  tilde path. Hoist the resolves above the security-loading block so
  TLS clients see expanded paths.
- filer_sync_verify.go: same flag pair was never resolved at all in
  the verify command; resolve at the top of runFilerSyncVerify.
- filer_meta_backup.go: -config (the backup_filer.toml path) was
  passed directly to viper. Resolve at the top of runFilerMetaBackup.
- mini.go: master.dir defaulted to the entire comma-joined
  miniDataFolders. With weed mini -dir=~/d1,~/d2 (or any multi-dir
  setup), TestFolderWritable then stat'd the joined string instead
  of a single directory. Default to the first entry via StringSplit
  to mirror the disk-space calculation a few lines below, and drop
  the now-redundant ResolvePath in TestFolderWritable.
2026-05-03 21:46:21 -07:00

776 lines
30 KiB
Go

package command
import (
"context"
"errors"
"fmt"
"os"
"regexp"
"strings"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
"google.golang.org/grpc"
)
type SyncOptions struct {
isActivePassive *bool
filerA *string
filerB *string
aPath *string
aExcludePaths *string
bPath *string
bExcludePaths *string
aReplication *string
bReplication *string
aCollection *string
bCollection *string
aTtlSec *int
bTtlSec *int
aDiskType *string
bDiskType *string
aDebug *bool
bDebug *bool
aFromTsMs *int64
bFromTsMs *int64
aProxyByFiler *bool
bProxyByFiler *bool
metricsHttpIp *string
metricsHttpPort *int
concurrency *int
chunkConcurrency *int
aDoDeleteFiles *bool
bDoDeleteFiles *bool
aSecurity *string
bSecurity *string
clientId int32
clientEpoch atomic.Int32
debug *bool
debugPort *int
}
const (
SyncKeyPrefix = "sync."
DefaultConcurrencyLimit = 32
)
// syncState tracks the current sync state for graceful shutdown checkpoint saving
type syncState struct {
processor *MetadataProcessor
grpcDialOption grpc.DialOption
targetFiler pb.ServerAddress
sourcePath string
sourceFilerSignature int32
}
var (
syncOptions SyncOptions
syncCpuProfile *string
syncMemProfile *string
// atomic pointers to current sync states for graceful shutdown
syncStateA2B atomic.Pointer[syncState]
syncStateB2A atomic.Pointer[syncState]
)
func init() {
cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
syncOptions.chunkConcurrency = cmdFilerSynchronize.Flag.Int("chunkConcurrency", 32, "The maximum number of chunks that will be replicated concurrently per file.")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
syncOptions.aSecurity = cmdFilerSynchronize.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates")
syncOptions.bSecurity = cmdFilerSynchronize.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates")
syncOptions.debug = cmdFilerSynchronize.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
syncOptions.debugPort = cmdFilerSynchronize.Flag.Int("debug.port", 6060, "http port for debugging")
syncOptions.clientId = util.RandomInt32()
}
var cmdFilerSynchronize = &Command{
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination. Different from filer.replicate:
* filer.sync only works between two filers.
* filer.sync does not need any special message queue setup.
* filer.sync supports both active-active and active-passive modes.
If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
A fresh sync will start from the earliest metadata logs.
`,
}
func runFilerSynchronize(cmd *Command, args []string) bool {
if *syncOptions.debug {
grace.StartDebugServer(*syncOptions.debugPort)
}
*syncCpuProfile = util.ResolvePath(*syncCpuProfile)
*syncMemProfile = util.ResolvePath(*syncMemProfile)
*syncOptions.aSecurity = util.ResolvePath(*syncOptions.aSecurity)
*syncOptions.bSecurity = util.ResolvePath(*syncOptions.bSecurity)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
// per-filer TLS when clusters use different certificates
grpcDialOptionA := grpcDialOption
grpcDialOptionB := grpcDialOption
if *syncOptions.aSecurity != "" {
var err error
if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncOptions.aSecurity, "grpc.client"); err != nil {
glog.Fatalf("load security config for filer A: %v", err)
}
}
if *syncOptions.bSecurity != "" {
var err error
if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncOptions.bSecurity, "grpc.client"); err != nil {
glog.Fatalf("load security config for filer B: %v", err)
}
}
// per-cluster HTTPS clients for volume server connections
var httpClientA, httpClientB *util_http_client.HTTPClient
if *syncOptions.aSecurity != "" {
var err error
if httpClientA, err = security.LoadHTTPClientFromFile(*syncOptions.aSecurity); err != nil {
glog.Fatalf("load HTTPS client config for filer A: %v", err)
}
}
if *syncOptions.bSecurity != "" {
var err error
if httpClientB, err = security.LoadHTTPClientFromFile(*syncOptions.bSecurity); err != nil {
glog.Fatalf("load HTTPS client config for filer B: %v", err)
}
}
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
filerA := pb.ServerAddress(*syncOptions.filerA)
filerB := pb.ServerAddress(*syncOptions.filerB)
// start filer.sync metrics server
go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
// read a filer signature
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOptionA, filerA)
if aFilerErr != nil {
glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
return true
}
// read b filer signature
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOptionB, filerB)
if bFilerErr != nil {
glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
return true
}
// register graceful shutdown hook to save checkpoints
grace.OnInterrupt(func() {
saveCheckpoint := func(name string, state *syncState) {
if state == nil || state.processor == nil {
return
}
offsetTsNs := state.processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return
}
if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil {
glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err)
} else {
glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs))
}
}
saveCheckpoint("A->B", syncStateA2B.Load())
saveCheckpoint("B->A", syncStateB2A.Load())
})
go func() {
// a->b
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOptionB, filerB, aFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
os.Exit(2)
}
for {
syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
syncOptions.clientEpoch.Load(),
grpcDialOptionA,
filerA,
*syncOptions.aPath,
util.StringSplit(*syncOptions.aExcludePaths, ","),
*syncOptions.aProxyByFiler,
grpcDialOptionB,
filerB,
*syncOptions.bPath,
*syncOptions.bReplication,
*syncOptions.bCollection,
*syncOptions.bTtlSec,
*syncOptions.bProxyByFiler,
*syncOptions.bDiskType,
*syncOptions.bDebug,
*syncOptions.concurrency,
*syncOptions.chunkConcurrency,
*syncOptions.bDoDeleteFiles,
aFilerSignature,
bFilerSignature,
&syncStateA2B,
httpClientA,
httpClientB)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
}
}
}()
if !*syncOptions.isActivePassive {
// b->a
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOptionA, filerA, bFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
os.Exit(2)
}
go func() {
for {
syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
syncOptions.clientEpoch.Load(),
grpcDialOptionB,
filerB,
*syncOptions.bPath,
util.StringSplit(*syncOptions.bExcludePaths, ","),
*syncOptions.bProxyByFiler,
grpcDialOptionA,
filerA,
*syncOptions.aPath,
*syncOptions.aReplication,
*syncOptions.aCollection,
*syncOptions.aTtlSec,
*syncOptions.aProxyByFiler,
*syncOptions.aDiskType,
*syncOptions.aDebug,
*syncOptions.concurrency,
*syncOptions.chunkConcurrency,
*syncOptions.aDoDeleteFiles,
bFilerSignature,
aFilerSignature,
&syncStateB2A,
httpClientB,
httpClientA)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
}
}
}()
}
select {}
}
// initOffsetFromTsMs Initialize offset
func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
if fromTsMs <= 0 {
return nil
}
// convert to nanosecond
fromTsNs := fromTsMs * 1000_000
// If not successful, exit the program.
setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
if setOffsetErr != nil {
return setOffsetErr
}
glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
return nil
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetGrpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState],
sourceHttpClient *util_http_client.HTTPClient, sinkHttpClient *util_http_client.HTTPClient) error {
// if first time, start from now
// if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := getOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
if err != nil {
return err
}
glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
filerSource.SetGrpcDialOption(sourceGrpcDialOption)
if sourceHttpClient != nil {
filerSource.SetHttpClient(sourceHttpClient)
}
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, targetGrpcDialOption, sinkWriteChunkByFiler)
filerSink.SetChunkConcurrency(chunkConcurrency)
if sinkHttpClient != nil {
filerSink.SetUploader(operation.NewUploaderWithHttpClient(sinkHttpClient))
}
filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, nil, nil, filerSink, doDeleteFiles, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil
}
}
return persistEventFn(resp)
}
if concurrency < 0 || concurrency > 1024 {
glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
concurrency = DefaultConcurrencyLimit
}
processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
// update sync state for graceful shutdown checkpoint saving
if statePtr != nil {
statePtr.Store(&syncState{
processor: processor,
grpcDialOption: targetGrpcDialOption,
targetFiler: targetFiler,
sourcePath: sourcePath,
sourceFilerSignature: sourceFilerSignature,
})
}
var lastLogTsNs = time.Now().UnixNano()
var lastProgressedTsNs int64
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
processor.AddSyncJob(resp)
return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
offsetTsNs := processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return nil
}
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
if offsetTsNs == lastProgressedTsNs {
for _, t := range filerSink.ActiveTransfers() {
if t.LastErr != "" {
glog.V(0).Infof(" %s %s: %d bytes received, %s, last error: %s",
t.ChunkFileId, t.Path, t.BytesReceived, t.Status, t.LastErr)
} else {
glog.V(0).Infof(" %s %s: %d bytes received, %s",
t.ChunkFileId, t.Path, t.BytesReceived, t.Status)
}
}
}
lastProgressedTsNs = offsetTsNs
// collect synchronous offset
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
})
prefix := sourcePath
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: clientName,
ClientId: clientId,
ClientEpoch: clientEpoch,
SelfSignature: targetFilerSignature,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: sourceFilerOffsetTsNs,
StopTsNs: 0,
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string {
// compatible historical version
if path == "/" {
return SyncKeyPrefix
} else {
return SyncKeyPrefix + path
}
}
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil {
return err
}
if len(resp.Error) != 0 {
return errors.New(resp.Error)
}
if len(resp.Value) < 8 {
return nil
}
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
return nil
})
return
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
Key: syncKey,
Value: valueBuf,
})
if err != nil {
return err
}
if len(resp.Error) != 0 {
return errors.New(resp.Error)
}
return nil
})
}
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
// Derive the target (new-side) directory once. MetadataEventTargetDirectory
// returns NewParentPath when set, falling back to resp.Directory for
// delete events or legacy events with an empty NewParentPath.
targetDir := filer_pb.MetadataEventTargetDirectory(resp)
var sourceOldKey, sourceNewKey util.FullPath
if message.OldEntry != nil {
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
}
if message.NewEntry != nil {
sourceNewKey = util.FullPath(targetDir).Child(message.NewEntry.Name)
}
if debug {
glog.V(0).Infof("received %v", resp)
}
if isMultipartUploadDir(resp.Directory + "/") {
return nil
}
// For rename events the key/directory is the old (source) path.
// Check both old and new directories so cross-boundary renames
// are not silently dropped. The downstream old/new key handling
// (lines below) already converts these to create or delete.
oldDirExcluded := matchesExcludePath(resp.Directory, excludePaths)
newDirExcluded := matchesExcludePath(targetDir, excludePaths)
oldDirInScope := util.IsEqualOrUnder(resp.Directory, sourcePath) && !oldDirExcluded
newDirInScope := message.NewEntry != nil &&
util.IsEqualOrUnder(targetDir, sourcePath) &&
!newDirExcluded
if !oldDirInScope && !newDirInScope {
return nil
}
// Compute per-side exclusion so that rename events crossing an
// exclude boundary are handled as delete + create rather than
// being entirely skipped.
oldExcluded := oldDirExcluded || isEntryExcluded(resp.Directory, message.OldEntry, reExcludeFileName, excludeFileNames, excludePathPatterns)
newExcluded := newDirExcluded || isEntryExcluded(targetDir, message.NewEntry, reExcludeFileName, excludeFileNames, excludePathPatterns)
if oldExcluded && newExcluded {
return nil
}
if oldExcluded {
// Old side is excluded — treat as pure create of new entry.
message.OldEntry = nil
}
if newExcluded {
// New side is excluded — treat as pure delete of old entry.
message.NewEntry = nil
sourceNewKey = ""
}
if dataSink.IsIncremental() {
doDeleteFiles = false
}
// handle deletions
if filer_pb.IsDelete(resp) {
if !doDeleteFiles {
return nil
}
if !util.IsEqualOrUnder(string(sourceOldKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
if filer_pb.IsCreate(resp) {
if !util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry1 : %w", err)
} else {
return nil
}
}
// this is something special?
if filer_pb.IsEmpty(resp) {
return nil
}
// handle updates
if util.IsEqualOrUnder(string(sourceOldKey), sourcePath) {
// old key is in the watched directory
if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
if doDeleteFiles {
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
var sinkNewParentPath string
if strings.HasSuffix(sourcePath, "/") {
sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath)-1:])
} else {
sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath):])
}
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, sinkNewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
return err
}
// not able to find old entry
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %w", oldKey, err)
}
}
// create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry2 : %w", err)
} else {
return nil
}
} else {
// new key is outside the watched directory
if doDeleteFiles {
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
}
} else {
// old key is outside the watched directory
if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry3 : %w", err)
} else {
return nil
}
} else {
// new key is also outside the watched directory
// skip
}
}
return nil
}
return processEventFn
}
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
var mTime int64
if message.NewEntry != nil && message.NewEntry.Attributes != nil {
mTime = message.NewEntry.Attributes.Mtime
} else if message.OldEntry != nil && message.OldEntry.Attributes != nil {
mTime = message.OldEntry.Attributes.Mtime
}
return destKey(dataSink, targetPath, sourcePath, sourceKey, mTime)
}
// destKey derives the sink-side key for a source entry. Shared between the
// event-log path (buildKey) and the initialSnapshot walk (both paths need the
// same target layout so a walk-seeded file and an event-replayed file resolve
// to the same destination key). Normalizing to a trailing-slash base avoids
// indexing past the end of sourceKey when callers differ on trailing-slash
// conventions or when sourceKey equals sourcePath exactly.
func destKey(dataSink sink.ReplicationSink, targetPath, sourcePath string, sourceKey util.FullPath, mTime int64) string {
base := strings.TrimSuffix(sourcePath, "/") + "/"
sk := string(sourceKey)
var relative string
switch {
case strings.HasPrefix(sk, base):
relative = sk[len(base):]
case sk == strings.TrimSuffix(sourcePath, "/"):
relative = ""
default:
relative = strings.TrimPrefix(sk, "/")
}
if !dataSink.IsIncremental() {
return escapeKey(util.Join(targetPath, relative))
}
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
return escapeKey(util.Join(targetPath, dateKey, relative))
}
// isEntryExcluded checks whether a single side (old or new) of an event is excluded
// by the deprecated filename regexp, the wildcard file-name matchers, or the
// wildcard path-pattern matchers.
func isEntryExcluded(dir string, entry *filer_pb.Entry, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher) bool {
if entry == nil {
return false
}
// deprecated regexp-based filename exclusion
if reExcludeFileName != nil && reExcludeFileName.MatchString(entry.Name) {
return true
}
// wildcard-based filename exclusion
if len(excludeFileNames) > 0 && matchesAnyWildcard(excludeFileNames, entry.Name) {
return true
}
// wildcard-based path-pattern exclusion: match against each directory
// component and the entry name itself
if len(excludePathPatterns) > 0 {
if pathContainsWildcardMatch(dir, excludePathPatterns) {
return true
}
if matchesAnyWildcard(excludePathPatterns, entry.Name) {
return true
}
}
return false
}
func matchesExcludePath(dir string, excludePaths []string) bool {
for _, excludePath := range excludePaths {
if util.IsEqualOrUnder(dir, excludePath) {
return true
}
}
return false
}
// compileExcludePattern compiles a regexp pattern string, returning nil if empty.
func compileExcludePattern(pattern string, label string) (*regexp.Regexp, error) {
if pattern == "" {
return nil, nil
}
re, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("error compile regexp %v for %s: %+v", pattern, label, err)
}
return re, nil
}
// matchesAnyWildcard returns true if any matcher matches the value.
// Returns false when matchers is empty (unlike wildcard.MatchesAnyWildcard
// which returns true for empty matchers).
func matchesAnyWildcard(matchers []*wildcard.WildcardMatcher, value string) bool {
for _, m := range matchers {
if m != nil && m.Match(value) {
return true
}
}
return false
}
// pathContainsWildcardMatch checks if any component of the given path matches
// any of the wildcard matchers, without allocating a slice.
func pathContainsWildcardMatch(path string, matchers []*wildcard.WildcardMatcher) bool {
for path != "" {
i := strings.IndexByte(path, '/')
var component string
if i < 0 {
component = path
path = ""
} else {
component = path[:i]
path = path[i+1:]
}
if component != "" && matchesAnyWildcard(matchers, component) {
return true
}
}
return false
}