mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* refactor(command): expand "~" in all path-style CLI flags Many of weed's path-bearing flags (-s3.config, -s3.iam.config, -admin.dataDir, -webdav.cacheDir, -volume.dir.idx, TLS cert/key files, profile output paths, mount cache dirs, sftp key files, ...) were never run through util.ResolvePath, so a value like "~/iam.json" was used literally. Tilde only worked when the shell expanded it, which silently fails for the common -flag=~/path form (bash leaves the tilde literal in --opt=~/path). - Extend util.ResolvePath to also handle "~user" / "~user/rest", matching shell tilde expansion. Add unit tests. - Apply util.ResolvePath at the top of each shared start* function (s3, webdav, sftp) so mini/server/filer/standalone callers all inherit it; resolve at the few one-off use sites (mount cache dirs, volume idx folder, mini admin.dataDir, profile paths). - Drop the duplicate expandHomeDir helper from admin.go in favor of the now-equivalent util.ResolvePath. * fixup: handle comma-separated -dir flags for tilde expansion `weed mini -dir`, `weed server -dir`, and `weed volume -dir` accept comma-separated paths (`dir[,dir]...`). Calling util.ResolvePath on the whole string mishandled multi-folder values with tilde, e.g. "~/d1,~/d2" would resolve as if "d1,~/d2" were a single subpath. - Add util.ResolveCommaSeparatedPaths: split on ",", run each entry through ResolvePath, rejoin. Short-circuits when no "~" present. - Use it for *miniDataFolders (mini.go), *volumeDataFolders (server.go), and resolve each entry of v.folders in-place (volume.go) so all downstream consumers see resolved paths. - Add 7-case TestResolveCommaSeparatedPaths covering empty, single, multiple, and mixed inputs. * address PR review: metaFolder + Windows backslash - master.go: resolve *m.metaFolder at the top of runMaster so util.FullPath(*m.metaFolder) on the next line sees an expanded path. Drop the now-redundant ResolvePath in TestFolderWritable. - server.go: same treatment for *masterOptions.metaFolder, paired with the existing cpu/mem profile resolves. Drop the redundant inner ResolvePath at TestFolderWritable. - file_util.go: ResolvePath now accepts filepath.Separator as a separator after the tilde, so "~\\data" works on Windows. Other platforms keep current behaviour (backslash stays literal because it is a valid filename character in usernames and paths). - file_util_test.go: add two cases using filepath.Separator that exercise the new code path on Windows and remain a no-op on Unix. * address PR review: resolve "~" in remaining command path flags Comprehensive sweep of path-bearing flags across every weed subcommand, applying util.ResolvePath in-place at the top of each run* function so all downstream consumers see expanded paths. - webdav.go: resolve *wo.cacheDir at the top of startWebDav so mini/server/filer/standalone callers all inherit it. - mount_std.go: cpu/mem profile paths. - filer_sync.go: cpu/mem profile paths. - mq_broker.go: cpu/mem profile paths. - benchmark.go: cpuprofile output path. - backup.go: -dir resolved once at runBackup; drop the duplicated inline ResolvePath in NewVolume calls. - compact.go: -dir resolved at runCompact; drop inline ResolvePath. - export.go: -dir and -o resolved at runExport; drop inline ResolvePath in LoadFromIdx and ScanVolumeFile. - download.go: -dir resolved at runDownload; drop inline. - update.go: -dir resolved at runUpdate so filepath.Join uses the expanded path; drop inline ResolvePath in TestFolderWritable. - scaffold.go: -output expanded before filepath.Join. - worker.go: -workingDir expanded before being passed to runtime. * address PR review: resolve option-struct paths at run* entry points server.go:381 propagates s3Options.config to filerOptions.s3ConfigFile *before* startS3Server runs, which meant the filer-side code saw the unresolved tilde-prefixed pointer. Same pattern for webdavOptions and sftpOptions (and equivalent in mini.go / filer.go). The fix: hoist resolution from the shared start* functions up to the run* entry points, where every shared pointer is set up before any propagation happens. - s3.go, webdav.go, sftp.go: extract a resolvePaths() method on each Options struct that runs every path field through util.ResolvePath in-place. Idempotent. - runS3, runWebDav, runSftp: call the standalone struct's resolvePaths before starting metrics / loading security config. - runServer, runMini, runFiler: call resolvePaths on every embedded options struct, plus resolve loose flags (serverIamConfig, miniS3Config, miniIamConfig, miniMasterOptions.metaFolder, and filer's defaultLevelDbDirectory) so they're expanded before any pointer copy or use. - Drop the now-redundant inline ResolvePath at filer's defaultLevelDbDirectory composition. * address PR review: re-resolve mini -dir post-config, cover misc paths - mini.go: applyConfigFileOptions can overwrite -dir with a literal ~/data from mini.options. Re-resolve *miniDataFolders after the config-file apply, alongside the other path resolves, so the mini filer no longer ends up with a literal ~/data/filerldb2. - benchmark.go: resolve *b.idListFile (-list). - filer_sync.go: resolve *syncOptions.aSecurity / .bSecurity (-a.security / -b.security) before LoadClientTLSFromFile. - filer_cat.go: resolve *filerCat.output (-o) before os.OpenFile. - admin.go: drop trailing blank line at EOF (git diff --check). * address PR review: resolve -a.security/-b.security/-config before use Three follow-up fixes: - filer_sync.go: the -a.security / -b.security resolves were placed *after* LoadClientTLSFromFile / LoadHTTPClientFromFile were called, so weed filer.sync -a.security=~/a.toml still passed the literal tilde path. Hoist the resolves above the security-loading block so TLS clients see expanded paths. - filer_sync_verify.go: same flag pair was never resolved at all in the verify command; resolve at the top of runFilerSyncVerify. - filer_meta_backup.go: -config (the backup_filer.toml path) was passed directly to viper. Resolve at the top of runFilerMetaBackup. - mini.go: master.dir defaulted to the entire comma-joined miniDataFolders. With weed mini -dir=~/d1,~/d2 (or any multi-dir setup), TestFolderWritable then stat'd the joined string instead of a single directory. Default to the first entry via StringSplit to mirror the disk-space calculation a few lines below, and drop the now-redundant ResolvePath in TestFolderWritable.
776 lines
30 KiB
Go
776 lines
30 KiB
Go
package command
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"regexp"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
|
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type SyncOptions struct {
|
|
isActivePassive *bool
|
|
filerA *string
|
|
filerB *string
|
|
aPath *string
|
|
aExcludePaths *string
|
|
bPath *string
|
|
bExcludePaths *string
|
|
aReplication *string
|
|
bReplication *string
|
|
aCollection *string
|
|
bCollection *string
|
|
aTtlSec *int
|
|
bTtlSec *int
|
|
aDiskType *string
|
|
bDiskType *string
|
|
aDebug *bool
|
|
bDebug *bool
|
|
aFromTsMs *int64
|
|
bFromTsMs *int64
|
|
aProxyByFiler *bool
|
|
bProxyByFiler *bool
|
|
metricsHttpIp *string
|
|
metricsHttpPort *int
|
|
concurrency *int
|
|
chunkConcurrency *int
|
|
aDoDeleteFiles *bool
|
|
bDoDeleteFiles *bool
|
|
aSecurity *string
|
|
bSecurity *string
|
|
clientId int32
|
|
clientEpoch atomic.Int32
|
|
debug *bool
|
|
debugPort *int
|
|
}
|
|
|
|
const (
|
|
SyncKeyPrefix = "sync."
|
|
DefaultConcurrencyLimit = 32
|
|
)
|
|
|
|
// syncState tracks the current sync state for graceful shutdown checkpoint saving
|
|
type syncState struct {
|
|
processor *MetadataProcessor
|
|
grpcDialOption grpc.DialOption
|
|
targetFiler pb.ServerAddress
|
|
sourcePath string
|
|
sourceFilerSignature int32
|
|
}
|
|
|
|
var (
|
|
syncOptions SyncOptions
|
|
syncCpuProfile *string
|
|
syncMemProfile *string
|
|
// atomic pointers to current sync states for graceful shutdown
|
|
syncStateA2B atomic.Pointer[syncState]
|
|
syncStateB2A atomic.Pointer[syncState]
|
|
)
|
|
|
|
func init() {
|
|
cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
|
|
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
|
|
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
|
|
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
|
|
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
|
|
syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
|
|
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
|
|
syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
|
|
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
|
|
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
|
|
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
|
|
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
|
|
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
|
|
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
|
|
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
|
|
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
|
|
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
|
|
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
|
|
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
|
|
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
|
|
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
|
|
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
|
|
syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
|
|
syncOptions.chunkConcurrency = cmdFilerSynchronize.Flag.Int("chunkConcurrency", 32, "The maximum number of chunks that will be replicated concurrently per file.")
|
|
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
|
|
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
|
|
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
|
|
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
|
|
syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
|
|
syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
|
|
syncOptions.aSecurity = cmdFilerSynchronize.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates")
|
|
syncOptions.bSecurity = cmdFilerSynchronize.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates")
|
|
syncOptions.debug = cmdFilerSynchronize.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
|
|
syncOptions.debugPort = cmdFilerSynchronize.Flag.Int("debug.port", 6060, "http port for debugging")
|
|
syncOptions.clientId = util.RandomInt32()
|
|
}
|
|
|
|
var cmdFilerSynchronize = &Command{
|
|
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
|
|
Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
|
|
Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
|
|
|
|
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
|
|
and write to the other destination. Different from filer.replicate:
|
|
|
|
* filer.sync only works between two filers.
|
|
* filer.sync does not need any special message queue setup.
|
|
* filer.sync supports both active-active and active-passive modes.
|
|
|
|
If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
|
|
A fresh sync will start from the earliest metadata logs.
|
|
|
|
`,
|
|
}
|
|
|
|
func runFilerSynchronize(cmd *Command, args []string) bool {
|
|
if *syncOptions.debug {
|
|
grace.StartDebugServer(*syncOptions.debugPort)
|
|
}
|
|
|
|
*syncCpuProfile = util.ResolvePath(*syncCpuProfile)
|
|
*syncMemProfile = util.ResolvePath(*syncMemProfile)
|
|
*syncOptions.aSecurity = util.ResolvePath(*syncOptions.aSecurity)
|
|
*syncOptions.bSecurity = util.ResolvePath(*syncOptions.bSecurity)
|
|
|
|
util.LoadSecurityConfiguration()
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
|
|
// per-filer TLS when clusters use different certificates
|
|
grpcDialOptionA := grpcDialOption
|
|
grpcDialOptionB := grpcDialOption
|
|
if *syncOptions.aSecurity != "" {
|
|
var err error
|
|
if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncOptions.aSecurity, "grpc.client"); err != nil {
|
|
glog.Fatalf("load security config for filer A: %v", err)
|
|
}
|
|
}
|
|
if *syncOptions.bSecurity != "" {
|
|
var err error
|
|
if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncOptions.bSecurity, "grpc.client"); err != nil {
|
|
glog.Fatalf("load security config for filer B: %v", err)
|
|
}
|
|
}
|
|
|
|
// per-cluster HTTPS clients for volume server connections
|
|
var httpClientA, httpClientB *util_http_client.HTTPClient
|
|
if *syncOptions.aSecurity != "" {
|
|
var err error
|
|
if httpClientA, err = security.LoadHTTPClientFromFile(*syncOptions.aSecurity); err != nil {
|
|
glog.Fatalf("load HTTPS client config for filer A: %v", err)
|
|
}
|
|
}
|
|
if *syncOptions.bSecurity != "" {
|
|
var err error
|
|
if httpClientB, err = security.LoadHTTPClientFromFile(*syncOptions.bSecurity); err != nil {
|
|
glog.Fatalf("load HTTPS client config for filer B: %v", err)
|
|
}
|
|
}
|
|
|
|
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
|
|
|
|
filerA := pb.ServerAddress(*syncOptions.filerA)
|
|
filerB := pb.ServerAddress(*syncOptions.filerB)
|
|
|
|
// start filer.sync metrics server
|
|
go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
|
|
|
|
// read a filer signature
|
|
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOptionA, filerA)
|
|
if aFilerErr != nil {
|
|
glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
|
|
return true
|
|
}
|
|
// read b filer signature
|
|
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOptionB, filerB)
|
|
if bFilerErr != nil {
|
|
glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
|
|
return true
|
|
}
|
|
|
|
// register graceful shutdown hook to save checkpoints
|
|
grace.OnInterrupt(func() {
|
|
saveCheckpoint := func(name string, state *syncState) {
|
|
if state == nil || state.processor == nil {
|
|
return
|
|
}
|
|
offsetTsNs := state.processor.processedTsWatermark.Load()
|
|
if offsetTsNs == 0 {
|
|
return
|
|
}
|
|
if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil {
|
|
glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err)
|
|
} else {
|
|
glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs))
|
|
}
|
|
}
|
|
|
|
saveCheckpoint("A->B", syncStateA2B.Load())
|
|
saveCheckpoint("B->A", syncStateB2A.Load())
|
|
})
|
|
|
|
go func() {
|
|
// a->b
|
|
// set synchronization start timestamp to offset
|
|
initOffsetError := initOffsetFromTsMs(grpcDialOptionB, filerB, aFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
|
|
if initOffsetError != nil {
|
|
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
|
|
os.Exit(2)
|
|
}
|
|
for {
|
|
syncOptions.clientEpoch.Add(1)
|
|
err := doSubscribeFilerMetaChanges(
|
|
syncOptions.clientId,
|
|
syncOptions.clientEpoch.Load(),
|
|
grpcDialOptionA,
|
|
filerA,
|
|
*syncOptions.aPath,
|
|
util.StringSplit(*syncOptions.aExcludePaths, ","),
|
|
*syncOptions.aProxyByFiler,
|
|
grpcDialOptionB,
|
|
filerB,
|
|
*syncOptions.bPath,
|
|
*syncOptions.bReplication,
|
|
*syncOptions.bCollection,
|
|
*syncOptions.bTtlSec,
|
|
*syncOptions.bProxyByFiler,
|
|
*syncOptions.bDiskType,
|
|
*syncOptions.bDebug,
|
|
*syncOptions.concurrency,
|
|
*syncOptions.chunkConcurrency,
|
|
*syncOptions.bDoDeleteFiles,
|
|
aFilerSignature,
|
|
bFilerSignature,
|
|
&syncStateA2B,
|
|
httpClientA,
|
|
httpClientB)
|
|
if err != nil {
|
|
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
|
|
time.Sleep(1747 * time.Millisecond)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if !*syncOptions.isActivePassive {
|
|
// b->a
|
|
// set synchronization start timestamp to offset
|
|
initOffsetError := initOffsetFromTsMs(grpcDialOptionA, filerA, bFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
|
|
if initOffsetError != nil {
|
|
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
|
|
os.Exit(2)
|
|
}
|
|
go func() {
|
|
for {
|
|
syncOptions.clientEpoch.Add(1)
|
|
err := doSubscribeFilerMetaChanges(
|
|
syncOptions.clientId,
|
|
syncOptions.clientEpoch.Load(),
|
|
grpcDialOptionB,
|
|
filerB,
|
|
*syncOptions.bPath,
|
|
util.StringSplit(*syncOptions.bExcludePaths, ","),
|
|
*syncOptions.bProxyByFiler,
|
|
grpcDialOptionA,
|
|
filerA,
|
|
*syncOptions.aPath,
|
|
*syncOptions.aReplication,
|
|
*syncOptions.aCollection,
|
|
*syncOptions.aTtlSec,
|
|
*syncOptions.aProxyByFiler,
|
|
*syncOptions.aDiskType,
|
|
*syncOptions.aDebug,
|
|
*syncOptions.concurrency,
|
|
*syncOptions.chunkConcurrency,
|
|
*syncOptions.aDoDeleteFiles,
|
|
bFilerSignature,
|
|
aFilerSignature,
|
|
&syncStateB2A,
|
|
httpClientB,
|
|
httpClientA)
|
|
if err != nil {
|
|
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
|
|
time.Sleep(2147 * time.Millisecond)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
select {}
|
|
}
|
|
|
|
// initOffsetFromTsMs Initialize offset
|
|
func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
|
|
if fromTsMs <= 0 {
|
|
return nil
|
|
}
|
|
// convert to nanosecond
|
|
fromTsNs := fromTsMs * 1000_000
|
|
// If not successful, exit the program.
|
|
setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
|
|
if setOffsetErr != nil {
|
|
return setOffsetErr
|
|
}
|
|
glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
|
|
return nil
|
|
}
|
|
|
|
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetGrpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, targetPath string,
|
|
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState],
|
|
sourceHttpClient *util_http_client.HTTPClient, sinkHttpClient *util_http_client.HTTPClient) error {
|
|
|
|
// if first time, start from now
|
|
// if has previously synced, resume from that point of time
|
|
sourceFilerOffsetTsNs, err := getOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
|
|
|
|
// create filer sink
|
|
filerSource := &source.FilerSource{}
|
|
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
|
|
filerSource.SetGrpcDialOption(sourceGrpcDialOption)
|
|
if sourceHttpClient != nil {
|
|
filerSource.SetHttpClient(sourceHttpClient)
|
|
}
|
|
filerSink := &filersink.FilerSink{}
|
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, targetGrpcDialOption, sinkWriteChunkByFiler)
|
|
filerSink.SetChunkConcurrency(chunkConcurrency)
|
|
if sinkHttpClient != nil {
|
|
filerSink.SetUploader(operation.NewUploaderWithHttpClient(sinkHttpClient))
|
|
}
|
|
filerSink.SetSourceFiler(filerSource)
|
|
|
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, nil, nil, filerSink, doDeleteFiles, debug)
|
|
|
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
message := resp.EventNotification
|
|
for _, sig := range message.Signatures {
|
|
if sig == targetFilerSignature && targetFilerSignature != 0 {
|
|
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
|
|
return nil
|
|
}
|
|
}
|
|
return persistEventFn(resp)
|
|
}
|
|
|
|
if concurrency < 0 || concurrency > 1024 {
|
|
glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
|
|
concurrency = DefaultConcurrencyLimit
|
|
}
|
|
processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
|
|
|
|
// update sync state for graceful shutdown checkpoint saving
|
|
if statePtr != nil {
|
|
statePtr.Store(&syncState{
|
|
processor: processor,
|
|
grpcDialOption: targetGrpcDialOption,
|
|
targetFiler: targetFiler,
|
|
sourcePath: sourcePath,
|
|
sourceFilerSignature: sourceFilerSignature,
|
|
})
|
|
}
|
|
|
|
var lastLogTsNs = time.Now().UnixNano()
|
|
var lastProgressedTsNs int64
|
|
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
|
|
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
processor.AddSyncJob(resp)
|
|
return nil
|
|
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
|
offsetTsNs := processor.processedTsWatermark.Load()
|
|
if offsetTsNs == 0 {
|
|
return nil
|
|
}
|
|
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job
|
|
now := time.Now().UnixNano()
|
|
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
|
|
lastLogTsNs = now
|
|
if offsetTsNs == lastProgressedTsNs {
|
|
for _, t := range filerSink.ActiveTransfers() {
|
|
if t.LastErr != "" {
|
|
glog.V(0).Infof(" %s %s: %d bytes received, %s, last error: %s",
|
|
t.ChunkFileId, t.Path, t.BytesReceived, t.Status, t.LastErr)
|
|
} else {
|
|
glog.V(0).Infof(" %s %s: %d bytes received, %s",
|
|
t.ChunkFileId, t.Path, t.BytesReceived, t.Status)
|
|
}
|
|
}
|
|
}
|
|
lastProgressedTsNs = offsetTsNs
|
|
// collect synchronous offset
|
|
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
|
|
return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
|
|
})
|
|
|
|
prefix := sourcePath
|
|
if !strings.HasSuffix(prefix, "/") {
|
|
prefix = prefix + "/"
|
|
}
|
|
|
|
metadataFollowOption := &pb.MetadataFollowOption{
|
|
ClientName: clientName,
|
|
ClientId: clientId,
|
|
ClientEpoch: clientEpoch,
|
|
SelfSignature: targetFilerSignature,
|
|
PathPrefix: prefix,
|
|
AdditionalPathPrefixes: nil,
|
|
DirectoriesToWatch: nil,
|
|
StartTsNs: sourceFilerOffsetTsNs,
|
|
StopTsNs: 0,
|
|
EventErrorType: pb.RetryForeverOnError,
|
|
}
|
|
|
|
return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
|
|
|
}
|
|
|
|
// When each business is distinguished according to path, and offsets need to be maintained separately.
|
|
func getSignaturePrefixByPath(path string) string {
|
|
// compatible historical version
|
|
if path == "/" {
|
|
return SyncKeyPrefix
|
|
} else {
|
|
return SyncKeyPrefix + path
|
|
}
|
|
}
|
|
|
|
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
|
|
|
|
readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
syncKey := []byte(signaturePrefix + "____")
|
|
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
|
|
|
|
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(resp.Error) != 0 {
|
|
return errors.New(resp.Error)
|
|
}
|
|
if len(resp.Value) < 8 {
|
|
return nil
|
|
}
|
|
|
|
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
|
|
|
|
return nil
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
|
|
return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
syncKey := []byte(signaturePrefix + "____")
|
|
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
|
|
|
|
valueBuf := make([]byte, 8)
|
|
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
|
|
|
|
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
|
|
Key: syncKey,
|
|
Value: valueBuf,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(resp.Error) != 0 {
|
|
return errors.New(resp.Error)
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
// process function
|
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
message := resp.EventNotification
|
|
|
|
// Derive the target (new-side) directory once. MetadataEventTargetDirectory
|
|
// returns NewParentPath when set, falling back to resp.Directory for
|
|
// delete events or legacy events with an empty NewParentPath.
|
|
targetDir := filer_pb.MetadataEventTargetDirectory(resp)
|
|
|
|
var sourceOldKey, sourceNewKey util.FullPath
|
|
if message.OldEntry != nil {
|
|
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
|
|
}
|
|
if message.NewEntry != nil {
|
|
sourceNewKey = util.FullPath(targetDir).Child(message.NewEntry.Name)
|
|
}
|
|
|
|
if debug {
|
|
glog.V(0).Infof("received %v", resp)
|
|
}
|
|
|
|
if isMultipartUploadDir(resp.Directory + "/") {
|
|
return nil
|
|
}
|
|
|
|
// For rename events the key/directory is the old (source) path.
|
|
// Check both old and new directories so cross-boundary renames
|
|
// are not silently dropped. The downstream old/new key handling
|
|
// (lines below) already converts these to create or delete.
|
|
oldDirExcluded := matchesExcludePath(resp.Directory, excludePaths)
|
|
newDirExcluded := matchesExcludePath(targetDir, excludePaths)
|
|
oldDirInScope := util.IsEqualOrUnder(resp.Directory, sourcePath) && !oldDirExcluded
|
|
newDirInScope := message.NewEntry != nil &&
|
|
util.IsEqualOrUnder(targetDir, sourcePath) &&
|
|
!newDirExcluded
|
|
if !oldDirInScope && !newDirInScope {
|
|
return nil
|
|
}
|
|
// Compute per-side exclusion so that rename events crossing an
|
|
// exclude boundary are handled as delete + create rather than
|
|
// being entirely skipped.
|
|
oldExcluded := oldDirExcluded || isEntryExcluded(resp.Directory, message.OldEntry, reExcludeFileName, excludeFileNames, excludePathPatterns)
|
|
newExcluded := newDirExcluded || isEntryExcluded(targetDir, message.NewEntry, reExcludeFileName, excludeFileNames, excludePathPatterns)
|
|
|
|
if oldExcluded && newExcluded {
|
|
return nil
|
|
}
|
|
if oldExcluded {
|
|
// Old side is excluded — treat as pure create of new entry.
|
|
message.OldEntry = nil
|
|
}
|
|
if newExcluded {
|
|
// New side is excluded — treat as pure delete of old entry.
|
|
message.NewEntry = nil
|
|
sourceNewKey = ""
|
|
}
|
|
if dataSink.IsIncremental() {
|
|
doDeleteFiles = false
|
|
}
|
|
// handle deletions
|
|
if filer_pb.IsDelete(resp) {
|
|
if !doDeleteFiles {
|
|
return nil
|
|
}
|
|
if !util.IsEqualOrUnder(string(sourceOldKey), sourcePath) {
|
|
return nil
|
|
}
|
|
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
|
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
|
}
|
|
|
|
// handle new entries
|
|
if filer_pb.IsCreate(resp) {
|
|
if !util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
|
|
return nil
|
|
}
|
|
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
|
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
|
|
return fmt.Errorf("create entry1 : %w", err)
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// this is something special?
|
|
if filer_pb.IsEmpty(resp) {
|
|
return nil
|
|
}
|
|
|
|
// handle updates
|
|
if util.IsEqualOrUnder(string(sourceOldKey), sourcePath) {
|
|
// old key is in the watched directory
|
|
if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
|
|
// new key is also in the watched directory
|
|
if doDeleteFiles {
|
|
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
|
var sinkNewParentPath string
|
|
if strings.HasSuffix(sourcePath, "/") {
|
|
sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath)-1:])
|
|
} else {
|
|
sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath):])
|
|
}
|
|
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, sinkNewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
|
|
if foundExisting {
|
|
return err
|
|
}
|
|
|
|
// not able to find old entry
|
|
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
|
|
return fmt.Errorf("delete old entry %v: %w", oldKey, err)
|
|
}
|
|
}
|
|
// create the new entry
|
|
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
|
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
|
|
return fmt.Errorf("create entry2 : %w", err)
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
} else {
|
|
// new key is outside the watched directory
|
|
if doDeleteFiles {
|
|
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
|
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
|
}
|
|
}
|
|
} else {
|
|
// old key is outside the watched directory
|
|
if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
|
|
// new key is in the watched directory
|
|
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
|
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
|
|
return fmt.Errorf("create entry3 : %w", err)
|
|
} else {
|
|
return nil
|
|
}
|
|
} else {
|
|
// new key is also outside the watched directory
|
|
// skip
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
return processEventFn
|
|
}
|
|
|
|
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
|
|
var mTime int64
|
|
if message.NewEntry != nil && message.NewEntry.Attributes != nil {
|
|
mTime = message.NewEntry.Attributes.Mtime
|
|
} else if message.OldEntry != nil && message.OldEntry.Attributes != nil {
|
|
mTime = message.OldEntry.Attributes.Mtime
|
|
}
|
|
return destKey(dataSink, targetPath, sourcePath, sourceKey, mTime)
|
|
}
|
|
|
|
// destKey derives the sink-side key for a source entry. Shared between the
|
|
// event-log path (buildKey) and the initialSnapshot walk (both paths need the
|
|
// same target layout so a walk-seeded file and an event-replayed file resolve
|
|
// to the same destination key). Normalizing to a trailing-slash base avoids
|
|
// indexing past the end of sourceKey when callers differ on trailing-slash
|
|
// conventions or when sourceKey equals sourcePath exactly.
|
|
func destKey(dataSink sink.ReplicationSink, targetPath, sourcePath string, sourceKey util.FullPath, mTime int64) string {
|
|
base := strings.TrimSuffix(sourcePath, "/") + "/"
|
|
sk := string(sourceKey)
|
|
var relative string
|
|
switch {
|
|
case strings.HasPrefix(sk, base):
|
|
relative = sk[len(base):]
|
|
case sk == strings.TrimSuffix(sourcePath, "/"):
|
|
relative = ""
|
|
default:
|
|
relative = strings.TrimPrefix(sk, "/")
|
|
}
|
|
if !dataSink.IsIncremental() {
|
|
return escapeKey(util.Join(targetPath, relative))
|
|
}
|
|
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
|
|
return escapeKey(util.Join(targetPath, dateKey, relative))
|
|
}
|
|
|
|
// isEntryExcluded checks whether a single side (old or new) of an event is excluded
|
|
// by the deprecated filename regexp, the wildcard file-name matchers, or the
|
|
// wildcard path-pattern matchers.
|
|
func isEntryExcluded(dir string, entry *filer_pb.Entry, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher) bool {
|
|
if entry == nil {
|
|
return false
|
|
}
|
|
// deprecated regexp-based filename exclusion
|
|
if reExcludeFileName != nil && reExcludeFileName.MatchString(entry.Name) {
|
|
return true
|
|
}
|
|
// wildcard-based filename exclusion
|
|
if len(excludeFileNames) > 0 && matchesAnyWildcard(excludeFileNames, entry.Name) {
|
|
return true
|
|
}
|
|
// wildcard-based path-pattern exclusion: match against each directory
|
|
// component and the entry name itself
|
|
if len(excludePathPatterns) > 0 {
|
|
if pathContainsWildcardMatch(dir, excludePathPatterns) {
|
|
return true
|
|
}
|
|
if matchesAnyWildcard(excludePathPatterns, entry.Name) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func matchesExcludePath(dir string, excludePaths []string) bool {
|
|
for _, excludePath := range excludePaths {
|
|
if util.IsEqualOrUnder(dir, excludePath) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// compileExcludePattern compiles a regexp pattern string, returning nil if empty.
|
|
func compileExcludePattern(pattern string, label string) (*regexp.Regexp, error) {
|
|
if pattern == "" {
|
|
return nil, nil
|
|
}
|
|
re, err := regexp.Compile(pattern)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error compile regexp %v for %s: %+v", pattern, label, err)
|
|
}
|
|
return re, nil
|
|
}
|
|
|
|
// matchesAnyWildcard returns true if any matcher matches the value.
|
|
// Returns false when matchers is empty (unlike wildcard.MatchesAnyWildcard
|
|
// which returns true for empty matchers).
|
|
func matchesAnyWildcard(matchers []*wildcard.WildcardMatcher, value string) bool {
|
|
for _, m := range matchers {
|
|
if m != nil && m.Match(value) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// pathContainsWildcardMatch checks if any component of the given path matches
|
|
// any of the wildcard matchers, without allocating a slice.
|
|
func pathContainsWildcardMatch(path string, matchers []*wildcard.WildcardMatcher) bool {
|
|
for path != "" {
|
|
i := strings.IndexByte(path, '/')
|
|
var component string
|
|
if i < 0 {
|
|
component = path
|
|
path = ""
|
|
} else {
|
|
component = path[:i]
|
|
path = path[i+1:]
|
|
}
|
|
if component != "" && matchesAnyWildcard(matchers, component) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|