From 8502bf37dc002d24deddd754b243c3f94a3c16cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E7=AB=9E=E5=AE=81?= Date: Sun, 7 May 2023 16:19:11 +0800 Subject: [PATCH] feat: add wildcard job --- acp.go | 15 +++++-- cache.go | 15 +++++++ cleanup.go | 2 +- cmd/acp/main.go | 3 +- copy.go | 38 ++++++++---------- fs.go | 56 +++++++++++++------------- fs_test.go | 12 ++++++ go.mod | 5 ++- index.go | 92 ++++++++++++++++++++++++++++++++++--------- job.go | 32 ++++----------- opt.go | 72 ++++++---------------------------- opt_wildcard.go | 102 ++++++++++++++++++++++++++++++++++++++++++++++++ prepare.go | 4 +- 13 files changed, 287 insertions(+), 161 deletions(-) create mode 100644 cache.go create mode 100644 fs_test.go create mode 100644 opt_wildcard.go diff --git a/acp.go b/acp.go index 63811c1..321e070 100644 --- a/acp.go +++ b/acp.go @@ -9,8 +9,9 @@ import ( type Copyer struct { *option - running sync.WaitGroup - eventCh chan Event + running sync.WaitGroup + eventCh chan Event + getDevice func(in string) string } func New(ctx context.Context, opts ...Option) (*Copyer, error) { @@ -25,9 +26,15 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) { return nil, err } + getDevice, err := getMountpointCache() + if err != nil { + return nil, err + } + c := &Copyer{ - option: opt, - eventCh: make(chan Event, 128), + option: opt, + eventCh: make(chan Event, 128), + getDevice: getDevice, } c.running.Add(1) diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..7da638a --- /dev/null +++ b/cache.go @@ -0,0 +1,15 @@ +package acp + +func Cache[i comparable, o any](f func(in i) o) func(in i) o { + cache := make(map[i]o, 0) + return func(in i) o { + cached, has := cache[in] + if has { + return cached + } + + out := f(in) + cache[in] = out + return out + } +} diff --git a/cleanup.go b/cleanup.go index 26fa873..401ef59 100644 --- a/cleanup.go +++ b/cleanup.go @@ -15,7 +15,7 @@ func (c *Copyer) cleanupJob(ctx context.Context, copyed <-chan *baseJob) { } for _, name := range job.successTargets { if err := os.Chtimes(name, job.modTime, job.modTime); err != nil { - c.reportError(job.source.src(), name, fmt.Errorf("change info, chtimes fail, %w", err)) + c.reportError(job.path, name, fmt.Errorf("change info, chtimes fail, %w", err)) } } diff --git a/cmd/acp/main.go b/cmd/acp/main.go index d41430a..01c90c3 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -63,7 +63,7 @@ func main() { }() opts := make([]acp.Option, 0, 8) - opts = append(opts, acp.Source(sources...)) + opts = append(opts, acp.WildcardJob(acp.Source(sources...), acp.Target(targetPaths...))) // if *continueReport != "" { // f, err := os.Open(*continueReport) // if err != nil { @@ -81,7 +81,6 @@ func main() { // } // } - opts = append(opts, acp.Target(targetPaths...)) opts = append(opts, acp.WithHash(*reportPath != "")) opts = append(opts, acp.Overwrite(!*notOverwrite)) diff --git a/copy.go b/copy.go index 594a108..3cf7b75 100644 --- a/copy.go +++ b/copy.go @@ -68,9 +68,6 @@ func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *ba if !ok { return } - if badDsts.Cardinality() >= len(c.dst) { - return - } wrap(ctx, func() { c.write(ctx, job, ch, cntr, badDsts) }) } @@ -93,7 +90,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c }() atomic.AddInt64(&cntr.files, 1) - chans := make([]chan []byte, 0, len(c.dst)+1) + chans := make([]chan []byte, 0, len(job.targets)+1) defer func() { for _, ch := range chans { close(ch) @@ -121,32 +118,31 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c } var readErr error - for _, d := range c.dst { - dst := d - name := job.source.dst(dst) - - if badDsts.Contains(dst) { - job.fail(name, fmt.Errorf("bad target path")) + for _, target := range job.targets { + dev := c.getDevice(target) + if badDsts.Contains(dev) { + job.fail(target, fmt.Errorf("bad target path")) continue } - if err := os.MkdirAll(path.Dir(name), os.ModePerm); err != nil { + + if err := os.MkdirAll(path.Dir(target), os.ModePerm); err != nil { // if no space if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { - badDsts.Add(dst) + badDsts.Add(dev) } - job.fail(name, fmt.Errorf("mkdir dst dir fail, %w", err)) + job.fail(target, fmt.Errorf("mkdir dst dir fail, %w", err)) continue } - file, err := os.OpenFile(name, c.createFlag, job.mode) + file, err := os.OpenFile(target, c.createFlag, job.mode) if err != nil { // if no space if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { - badDsts.Add(dst) + badDsts.Add(dev) } - job.fail(name, fmt.Errorf("open dst file fail, %w", err)) + job.fail(target, fmt.Errorf("open dst file fail, %w", err)) continue } @@ -160,7 +156,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c var rerr error defer func() { if rerr == nil { - job.succes(name) + job.succes(target) return } @@ -170,15 +166,15 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c // if no space if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { - badDsts.Add(dst) + badDsts.Add(dev) } - if re := os.Remove(name); re != nil { + if re := os.Remove(target); re != nil { rerr = multierror.Append(rerr, re) } - c.reportError(job.source.src(), name, rerr) - job.fail(name, rerr) + c.reportError(job.path, target, rerr) + job.fail(target, rerr) }() defer file.Close() diff --git a/fs.go b/fs.go index edfcf15..53eecf9 100644 --- a/fs.go +++ b/fs.go @@ -2,40 +2,42 @@ package acp import ( "fmt" + "strings" - "golang.org/x/sys/unix" + mapset "github.com/deckarep/golang-set/v2" + "github.com/moby/sys/mountinfo" ) -type fileSystem struct { - // TypeName string - // MountPoint string - TotalSize int64 - AvailableSize int64 -} - -func getFileSystem(path string) (*fileSystem, error) { - stat := new(unix.Statfs_t) - if err := unix.Statfs(path, stat); err != nil { - return nil, fmt.Errorf("read statfs fail, err= %w", err) +func getMountpointCache() (func(string) string, error) { + mounts, err := mountinfo.GetMounts(nil) + if err != nil { + return nil, fmt.Errorf("get mounts fail, %w", err) } - return &fileSystem{ - // TypeName: unpaddingInt8s(stat.Fstypename[:]), - // MountPoint: unpaddingInt8s(stat.Mntonname[:]), - TotalSize: int64(stat.Blocks) * int64(stat.Bsize), - AvailableSize: int64(stat.Bavail) * int64(stat.Bsize), - }, nil -} - -func unpaddingInt8s(buf []int8) string { - result := make([]byte, 0, len(buf)) - for _, c := range buf { - if c == 0x00 { - break + mountPoints := mapset.NewThreadUnsafeSet[string]() + for _, mount := range mounts { + if mount == nil { + continue + } + if mount.Mountpoint == "" { + continue } - result = append(result, byte(c)) + mp := mount.Mountpoint + if !strings.HasSuffix(mp, "/") { + mp = mp + "/" + } + + mountPoints.Add(mp) } - return string(result) + mps := mountPoints.ToSlice() + return Cache(func(path string) string { + for _, mp := range mps { + if strings.HasPrefix(path, mp) { + return mp + } + } + return "" + }), nil } diff --git a/fs_test.go b/fs_test.go new file mode 100644 index 0000000..cad9c41 --- /dev/null +++ b/fs_test.go @@ -0,0 +1,12 @@ +package acp + +import "testing" + +func TestFS(t *testing.T) { + mpCache, err := getMountpointCache() + if err != nil { + panic(err) + } + + t.Log("mp cahce", mpCache("/Users/cuijingning/go/src/github.com/abc950309/acp")) +} diff --git a/go.mod b/go.mod index 48a6da6..c9a83ec 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,11 @@ require ( github.com/json-iterator/go v1.1.12 github.com/klauspost/cpuid/v2 v2.0.4 github.com/minio/sha256-simd v1.0.0 + github.com/moby/sys/mountinfo v0.6.2 github.com/modern-go/reflect2 v1.0.2 + github.com/samber/lo v1.38.1 github.com/schollz/progressbar/v3 v3.10.1 github.com/sirupsen/logrus v1.9.0 - golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 ) require ( @@ -21,5 +22,7 @@ require ( github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/rivo/uniseg v0.3.4 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect + golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect ) diff --git a/index.go b/index.go index 59e74a6..f04ef66 100644 --- a/index.go +++ b/index.go @@ -5,8 +5,11 @@ import ( "fmt" "os" "sort" + "strings" "sync/atomic" "time" + + "github.com/samber/lo" ) const ( @@ -18,8 +21,7 @@ type counter struct { } func (c *Copyer) index(ctx context.Context) (<-chan *baseJob, error) { - jobs := c.walk(ctx) - filtered, err := c.joinJobs(jobs) + jobs, err := c.walk(ctx) if err != nil { return nil, err } @@ -28,7 +30,7 @@ func (c *Copyer) index(ctx context.Context) (<-chan *baseJob, error) { go wrap(ctx, func() { defer close(ch) - for _, job := range filtered { + for _, job := range jobs { select { case <-ctx.Done(): return @@ -40,7 +42,7 @@ func (c *Copyer) index(ctx context.Context) (<-chan *baseJob, error) { return ch, nil } -func (c *Copyer) walk(ctx context.Context) []*baseJob { +func (c *Copyer) walk(ctx context.Context) ([]*baseJob, error) { done := make(chan struct{}) defer close(done) @@ -60,13 +62,19 @@ func (c *Copyer) walk(ctx context.Context) []*baseJob { jobs := make([]*baseJob, 0, 64) appendJob := func(job *baseJob) { + if !job.mode.IsRegular() { + c.reportError(job.path, "", fmt.Errorf("unexpected file mode, not regular file, mode= %s", job.mode)) + return + } + + c.submit(&EventUpdateJob{job.report()}) jobs = append(jobs, job) atomic.AddInt64(&cntr.files, 1) atomic.AddInt64(&cntr.bytes, job.size) } - var walk func(src *source) - walk = func(src *source) { + var walk func(src *source, dsts []string) + walk = func(src *source, dsts []string) { path := src.src() stat, err := os.Stat(path) @@ -77,13 +85,21 @@ func (c *Copyer) walk(ctx context.Context) []*baseJob { mode := stat.Mode() if mode.IsRegular() { - job, err := c.newJobFromFileInfo(src, stat) - if err != nil { - c.reportError(path, "", fmt.Errorf("make job fail, %w", err)) - return + targets := make([]string, 0, len(dsts)) + for _, d := range dsts { + targets = append(targets, src.dst(d)) } - appendJob(job) + appendJob(&baseJob{ + copyer: c, + path: path, + + size: stat.Size(), + mode: stat.Mode(), + modTime: stat.ModTime(), + + targets: targets, + }) return } if mode&UnexpectFileMode != 0 { @@ -96,25 +112,65 @@ func (c *Copyer) walk(ctx context.Context) []*baseJob { return } for _, file := range files { - walk(src.append(file.Name())) + walk(src.append(file.Name()), dsts) } } - for _, s := range c.src { - walk(s) + + results := make([]*baseJob, 0, 64) + for _, j := range c.wildcardJobs { + for _, s := range j.src { + walk(s, j.dst) + } + + if len(jobs) == 0 { + continue + } + + joined, err := c.joinJobs(jobs) + if err != nil { + return nil, err + } + + results = append(results, joined...) + jobs = jobs[:0] } - return jobs + + for _, j := range c.accurateJobs { + stat, err := os.Stat(j.src) + if err != nil { + c.reportError(j.src, "", fmt.Errorf("accurate job get stat, %w", err)) + continue + } + if !stat.Mode().IsRegular() { + continue + } + + appendJob(&baseJob{ + copyer: c, + src: &source{base: "/", path: lo.Filter(strings.Split(j.src, "/"), func(s string, _ int) bool { return s != "" })}, + path: j.src, + + size: stat.Size(), + mode: stat.Mode(), + modTime: stat.ModTime(), + + targets: j.dsts, + }) + } + + return results, nil } func (c *Copyer) joinJobs(jobs []*baseJob) ([]*baseJob, error) { sort.Slice(jobs, func(i int, j int) bool { - return comparePath(jobs[i].source.path, jobs[j].source.path) < 0 + return comparePath(jobs[i].src.path, jobs[j].src.path) < 0 }) var last *baseJob filtered := make([]*baseJob, 0, len(jobs)) for _, job := range jobs { - if last != nil && comparePath(last.source.path, job.source.path) == 0 { - c.reportError(last.source.src(), "", fmt.Errorf("same relative path, ignored, '%s'", job.source.src())) + if last != nil && comparePath(last.src.path, job.src.path) == 0 { + c.reportError(last.path, "", fmt.Errorf("same relative path, ignored, '%s'", job.path)) continue } diff --git a/job.go b/job.go index 8f8331f..71d73a6 100644 --- a/job.go +++ b/job.go @@ -2,8 +2,7 @@ package acp import ( "encoding/hex" - "fmt" - "os" + "io/fs" "sync" "time" @@ -32,38 +31,23 @@ var ( type baseJob struct { copyer *Copyer - source *source + src *source + path string size int64 // length in bytes for regular files; system-dependent for others - mode os.FileMode // file mode bits + mode fs.FileMode // file mode bits modTime time.Time // modification time lock sync.Mutex writeTime time.Time status jobStatus + targets []string successTargets []string failedTargets map[string]error hash []byte } -func (c *Copyer) newJobFromFileInfo(source *source, info os.FileInfo) (*baseJob, error) { - job := &baseJob{ - copyer: c, - source: source, - - size: info.Size(), - mode: info.Mode(), - modTime: info.ModTime(), - } - if !job.mode.IsRegular() { - return nil, fmt.Errorf("unexpected file, path= %s", source.src()) - } - - c.submit(&EventUpdateJob{job.report()}) - return job, nil -} - func (j *baseJob) setStatus(s jobStatus) { j.lock.Lock() defer j.lock.Unlock() @@ -105,8 +89,8 @@ func (j *baseJob) fail(path string, err error) { func (j *baseJob) report() *Job { return &Job{ - Base: j.source.base, - Path: j.source.path, + Base: j.src.base, + Path: j.src.path, Status: statusMapping[j.status], SuccessTargets: j.successTargets, @@ -161,7 +145,7 @@ type Job struct { FailTargets map[string]error `json:"fail_target,omitempty"` Size int64 `json:"size"` - Mode os.FileMode `json:"mode"` + Mode fs.FileMode `json:"mode"` ModTime time.Time `json:"mod_time"` WriteTime time.Time `json:"write_time"` SHA256 string `json:"sha256"` diff --git a/opt.go b/opt.go index f6aaeb9..3dd5989 100644 --- a/opt.go +++ b/opt.go @@ -1,11 +1,8 @@ package acp import ( - "fmt" "os" "path" - "sort" - "strings" "github.com/sirupsen/logrus" ) @@ -32,8 +29,8 @@ func (s *source) append(next ...string) *source { } type option struct { - src []*source - dst []string + accurateJobs []*accurateJob + wildcardJobs []*wildcardJob fromDevice *deviceOption toDevice *deviceOption @@ -54,38 +51,9 @@ func newOption() *option { } func (o *option) check() error { - filteredDst := make([]string, 0, len(o.dst)) - for _, p := range o.dst { - p = strings.TrimSpace(p) - if p == "" { - continue - } - if p[len(p)-1] != '/' { - p = p + "/" - } - - dstStat, err := os.Stat(p) - if err != nil { - return fmt.Errorf("check dst path '%s', %w", p, err) - } - if !dstStat.IsDir() { - return fmt.Errorf("dst path is not a dir") - } - - filteredDst = append(filteredDst, p) - } - o.dst = filteredDst - - if len(o.src) == 0 { - return fmt.Errorf("source path not found") - } - sort.Slice(o.src, func(i, j int) bool { - return comparePath(o.src[i].path, o.src[j].path) < 0 - }) - for _, s := range o.src { - src := s.src() - if _, err := os.Stat(src); err != nil { - return fmt.Errorf("check src path '%s', %w", src, err) + for _, job := range o.wildcardJobs { + if err := job.check(); err != nil { + return err } } @@ -98,38 +66,20 @@ func (o *option) check() error { if o.logger == nil { o.logger = logrus.StandardLogger() } + return nil } type Option func(*option) *option -func Source(paths ...string) Option { - return func(o *option) *option { - for _, p := range paths { - p = path.Clean(p) - if p[len(p)-1] == '/' { - p = p[:len(p)-1] - } - - base, name := path.Split(p) - o.src = append(o.src, &source{base: base, path: []string{name}}) - } - return o - } +type accurateJob struct { + src string + dsts []string } -func AccurateSource(base string, paths ...[]string) Option { +func AccurateJob(src string, dsts []string) Option { return func(o *option) *option { - for _, path := range paths { - o.src = append(o.src, &source{base: base, path: path}) - } - return o - } -} - -func Target(paths ...string) Option { - return func(o *option) *option { - o.dst = append(o.dst, paths...) + o.accurateJobs = append(o.accurateJobs, &accurateJob{src: src, dsts: dsts}) return o } } diff --git a/opt_wildcard.go b/opt_wildcard.go new file mode 100644 index 0000000..b2781d7 --- /dev/null +++ b/opt_wildcard.go @@ -0,0 +1,102 @@ +package acp + +import ( + "fmt" + "os" + "path" + "sort" + "strings" +) + +type wildcardJob struct { + src []*source + dst []string +} + +func (job *wildcardJob) check() error { + filteredDst := make([]string, 0, len(job.dst)) + for _, p := range job.dst { + p = strings.TrimSpace(p) + if p == "" { + continue + } + if p[len(p)-1] != '/' { + p = p + "/" + } + + dstStat, err := os.Stat(p) + if err != nil { + return fmt.Errorf("check dst path '%s', %w", p, err) + } + if !dstStat.IsDir() { + return fmt.Errorf("dst path is not a dir") + } + + filteredDst = append(filteredDst, p) + } + job.dst = filteredDst + + if len(job.src) == 0 { + return fmt.Errorf("source path not found") + } + sort.Slice(job.src, func(i, j int) bool { + return comparePath(job.src[i].path, job.src[j].path) < 0 + }) + for _, s := range job.src { + src := s.src() + if _, err := os.Stat(src); err != nil { + return fmt.Errorf("check src path '%s', %w", src, err) + } + } + + return nil +} + +func WildcardJob(opts ...WildcardJobOption) Option { + return func(o *option) *option { + j := new(wildcardJob) + for _, opt := range opts { + j = opt(j) + } + + if len(j.src) == 0 { + return o + } + + o.wildcardJobs = append(o.wildcardJobs, j) + return o + } +} + +type WildcardJobOption func(*wildcardJob) *wildcardJob + +func Source(paths ...string) WildcardJobOption { + return func(j *wildcardJob) *wildcardJob { + for _, p := range paths { + p = path.Clean(p) + if p[len(p)-1] == '/' { + p = p[:len(p)-1] + } + + base, name := path.Split(p) + j.src = append(j.src, &source{base: base, path: []string{name}}) + } + return j + } +} + +func AccurateSource(base string, paths ...[]string) WildcardJobOption { + return func(j *wildcardJob) *wildcardJob { + for _, path := range paths { + j.src = append(j.src, &source{base: base, path: path}) + } + return j + } +} + +func Target(paths ...string) WildcardJobOption { + return func(j *wildcardJob) *wildcardJob { + j.dst = append(j.dst, paths...) + return j + } +} diff --git a/prepare.go b/prepare.go index ff2aee7..df3a035 100644 --- a/prepare.go +++ b/prepare.go @@ -39,9 +39,9 @@ func (c *Copyer) prepare(ctx context.Context, indexed <-chan *baseJob) <-chan *w job.setStatus(jobStatusPreparing) - file, err := mmap.Open(job.source.src()) + file, err := mmap.Open(job.path) if err != nil { - c.reportError(job.source.src(), "", fmt.Errorf("open src file fail, %w", err)) + c.reportError(job.path, "", fmt.Errorf("open src file fail, %w", err)) return }