From 43289e4b94b5d94370e976cfacb29ab10630a308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E7=AB=9E=E5=AE=81?= Date: Wed, 7 Sep 2022 22:40:45 +0800 Subject: [PATCH] feat: init for use --- .gitignore | 3 + cmd/main.go | 80 +++++++ copy.go | 406 ++++++++++++++++++++++++++++++++++++ go.mod | 20 ++ go.sum | 40 ++++ job.go | 57 +++++ mmap/manual_test_program.go | 56 +++++ mmap/mmap_darwin.go | 136 ++++++++++++ mmap/mmap_linux.go | 145 +++++++++++++ mmap/mmap_other.go | 86 ++++++++ mmap/mmap_test.go | 34 +++ mmap/mmap_windows.go | 141 +++++++++++++ opt.go | 159 ++++++++++++++ opt_device.go | 14 ++ progress_bar.go | 72 +++++++ report.go | 64 ++++++ status.go | 21 ++ 17 files changed, 1534 insertions(+) create mode 100644 cmd/main.go create mode 100644 copy.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 job.go create mode 100644 mmap/manual_test_program.go create mode 100644 mmap/mmap_darwin.go create mode 100644 mmap/mmap_linux.go create mode 100644 mmap/mmap_other.go create mode 100644 mmap/mmap_test.go create mode 100644 mmap/mmap_windows.go create mode 100644 opt.go create mode 100644 opt_device.go create mode 100644 progress_bar.go create mode 100644 report.go create mode 100644 status.go diff --git a/.gitignore b/.gitignore index 66fd13c..c74eceb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +./go.sum +output/ diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..94848a5 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "os" + "os/signal" + + "github.com/abc950309/acp" + "github.com/sirupsen/logrus" +) + +var ( + notOverwrite = flag.Bool("n", false, "not overwrite exist file") + noTarget = flag.Bool("notarget", false, "do not have target, aka sha256") + reportPath = flag.String("report", "", "json report storage path") + targetPaths []string +) + +func init() { + flag.Func("target", "use target flag to give multi target path", func(s string) error { + targetPaths = append(targetPaths, s) + return nil + }) +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + + flag.Parse() + sources := flag.Args() + if len(sources) == 0 { + logrus.Fatalf("cannot found source path") + } + + if !*noTarget && len(targetPaths) == 0 { + targetPaths = append(targetPaths, sources[len(sources)-1]) + sources = sources[:len(sources)-1] + } + if len(sources) == 0 { + logrus.Fatalf("cannot found source path") + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + go func() { + for sig := range signals { + if sig != os.Interrupt { + continue + } + cancel() + } + }() + + c, err := acp.New( + ctx, acp.Source(sources...), acp.Target(targetPaths...), + acp.Overwrite(!*notOverwrite), acp.WithProgressBar(true), acp.WithHash(true), + ) + if err != nil { + panic(err) + } + + report := c.Wait() + if *reportPath == "" { + return + } + + r, err := os.Create(*reportPath) + if err != nil { + logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err) + logrus.Infof("report: %s", report) + return + } + defer r.Close() + + if err := json.NewEncoder(r).Encode(report); err != nil { + logrus.Fatalf("export report fail, err= %s", err) + } +} diff --git a/copy.go b/copy.go new file mode 100644 index 0000000..df35f46 --- /dev/null +++ b/copy.go @@ -0,0 +1,406 @@ +package acp + +import ( + "context" + "encoding/hex" + "fmt" + "hash" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/abc950309/acp/mmap" + "github.com/hashicorp/go-multierror" + "github.com/minio/sha256-simd" + "github.com/schollz/progressbar/v3" + "github.com/sirupsen/logrus" +) + +const ( + unexpectFileMode = os.ModeType &^ os.ModeDir + batchSize = 1024 * 1024 +) + +var ( + sha256Pool = &sync.Pool{New: func() interface{} { return sha256.New() }} +) + +const ( + StageIndex = iota + StageCopy + StageFinished +) + +type Copyer struct { + *option + + createFlag int + stage int64 + copyedBytes int64 + totalBytes int64 + copyedFiles int64 + totalFiles int64 + + progressBarLock sync.Mutex + updateProgressBar func(func(bar *progressbar.ProgressBar)) + + jobs []*Job + writePipe chan *writeJob + metaPipe chan *metaJob + + wg sync.WaitGroup + reportLock sync.Mutex + errors []*Error + files []*File +} + +func New(ctx context.Context, opts ...Option) (*Copyer, error) { + opt := newOption() + for _, o := range opts { + if o == nil { + continue + } + opt = o(opt) + } + if err := opt.check(); err != nil { + return nil, err + } + + c := &Copyer{ + option: opt, + writePipe: make(chan *writeJob, 32), + metaPipe: make(chan *metaJob, 8), + updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {}, + } + + c.createFlag = os.O_WRONLY | os.O_CREATE + if c.overwrite { + c.createFlag |= os.O_TRUNC + } else { + c.createFlag |= os.O_EXCL + } + + c.wg.Add(1) + go c.run(ctx) + return c, nil +} + +func (c *Copyer) Wait() *Report { + c.wg.Wait() + return c.Report() +} + +func (c *Copyer) reportError(file string, err error) { + e := &Error{Path: file, Err: err} + logrus.Errorf(e.Error()) + + c.reportLock.Lock() + defer c.reportLock.Unlock() + c.errors = append(c.errors, e) +} + +func (c *Copyer) run(ctx context.Context) { + defer c.wg.Done() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if c.withProgressBar { + c.startProgressBar(ctx) + } + + c.index(ctx) + c.copy(ctx) +} + +func (c *Copyer) index(ctx context.Context) { + for _, s := range c.src { + c.walk(s.base, s.path) + } +} + +func (c *Copyer) copy(ctx context.Context) { + atomic.StoreInt64(&c.stage, StageCopy) + defer atomic.StoreInt64(&c.stage, StageFinished) + + go func() { + defer close(c.writePipe) + + for _, job := range c.jobs { + c.prepare(ctx, job) + + select { + case <-ctx.Done(): + return + default: + } + } + }() + + go func() { + defer close(c.metaPipe) + + for job := range c.writePipe { + c.write(ctx, job) + + select { + case <-ctx.Done(): + return + default: + } + } + }() + + for file := range c.metaPipe { + c.meta(file) + } +} + +func (c *Copyer) walk(base, path string) { + stat, err := os.Stat(path) + if err != nil { + c.reportError(path, fmt.Errorf("walk get stat, %w", err)) + return + } + + job, err := newJobFromFileInfo(base, path, stat) + if err != nil { + c.reportError(path, fmt.Errorf("make job fail, %w", err)) + return + } + if job.Mode&unexpectFileMode != 0 { + return + } + if !job.Mode.IsDir() { + atomic.AddInt64(&c.totalFiles, 1) + atomic.AddInt64(&c.totalBytes, job.Size) + c.jobs = append(c.jobs, job) + return + } + + enterJob := new(Job) + *enterJob = *job + enterJob.Type = JobTypeEnterDir + c.jobs = append(c.jobs, enterJob) + + files, err := os.ReadDir(path) + if err != nil { + c.reportError(path, fmt.Errorf("walk read dir, %w", err)) + return + } + + for _, file := range files { + c.walk(base, path+"/"+file.Name()) + } + + exitJob := new(Job) + *exitJob = *job + exitJob.Type = JobTypeExitDir + c.jobs = append(c.jobs, exitJob) +} + +func (c *Copyer) prepare(ctx context.Context, job *Job) { + switch job.Type { + case JobTypeEnterDir: + for _, d := range c.dst { + name := d + job.RelativePath + err := os.Mkdir(name, job.Mode&os.ModePerm) + if err != nil && !os.IsExist(err) { + c.reportError(name, fmt.Errorf("mkdir fail, %w", err)) + } + } + return + case JobTypeExitDir: + c.writePipe <- &writeJob{Job: job} + return + } + + name := job.Source + file, err := mmap.Open(name) + if err != nil { + c.reportError(name, fmt.Errorf("open src file fail, %w", err)) + return + } + + c.writePipe <- &writeJob{Job: job, src: file} +} + +func (c *Copyer) write(ctx context.Context, job *writeJob) { + if job.src == nil { + c.metaPipe <- &metaJob{Job: job.Job} + return + } + defer job.src.Close() + + num := atomic.AddInt64(&c.copyedFiles, 1) + go c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.RelativePath)) + }) + + var wg sync.WaitGroup + var lock sync.Mutex + var readErr error + chans := make([]chan []byte, 0, len(c.dst)+1) + next := &metaJob{Job: job.Job, failTarget: make(map[string]string)} + + if c.withHash { + sha := sha256Pool.Get().(hash.Hash) + sha.Reset() + + ch := make(chan []byte, 4) + chans = append(chans, ch) + + wg.Add(1) + go func() { + defer wg.Done() + defer sha256Pool.Put(sha) + + for buf := range ch { + sha.Write(buf) + } + + lock.Lock() + defer lock.Unlock() + next.hash = sha.Sum(nil) + }() + } + + for _, d := range c.dst { + name := d + job.RelativePath + file, err := os.OpenFile(name, c.createFlag, job.Mode) + if err != nil { + c.reportError(name, fmt.Errorf("open dst file fail, %w", err)) + + lock.Lock() + defer lock.Unlock() + next.failTarget[name] = fmt.Errorf("open dst file fail, %w", err).Error() + continue + } + + ch := make(chan []byte, 4) + chans = append(chans, ch) + + wg.Add(1) + go func() { + defer wg.Done() + + var rerr error + defer func() { + if rerr == nil { + lock.Lock() + defer lock.Unlock() + next.successTarget = append(next.successTarget, name) + return + } + + // avoid block channel + for range ch { + } + + if re := os.Remove(name); re != nil { + rerr = multierror.Append(rerr, re) + } + + c.reportError(name, rerr) + + lock.Lock() + defer lock.Unlock() + next.failTarget[name] = rerr.Error() + }() + + defer file.Close() + for buf := range ch { + nr := len(buf) + + n, err := file.Write(buf) + if n < 0 || nr < n { + if err == nil { + rerr = fmt.Errorf("write fail, unexpected return, byte_num= %d", n) + return + } + + rerr = fmt.Errorf("write fail, %w", err) + return + } + if nr != n { + rerr = fmt.Errorf("write fail, write and read bytes not equal, read= %d write= %d", nr, n) + return + } + } + + if readErr != nil { + rerr = readErr + } + }() + } + + defer func() { + for _, ch := range chans { + close(ch) + } + + wg.Wait() + c.metaPipe <- next + }() + + readErr = c.streamCopy(ctx, chans, job.src) +} + +func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.ReaderAt) error { + if src.Len() == 0 { + return nil + } + + for idx := int64(0); ; idx += batchSize { + buf, err := src.Slice(idx, batchSize) + if err != nil { + return fmt.Errorf("slice mmap fail, %w", err) + } + + for _, ch := range dsts { + ch <- buf + } + + nr := len(buf) + atomic.AddInt64(&c.copyedBytes, int64(nr)) + if nr < batchSize { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } +} + +func (c *Copyer) meta(job *metaJob) { + if job.Mode.IsDir() { + for _, d := range c.dst { + if err := os.Chtimes(d+job.RelativePath, job.ModTime, job.ModTime); err != nil { + c.reportError(d+job.RelativePath, fmt.Errorf("change info, chtimes fail, %w", err)) + } + } + return + } + + c.files = append(c.files, &File{ + Source: job.Source, + SuccessTarget: job.successTarget, + FailTarget: job.failTarget, + RelativePath: job.RelativePath, + Size: job.Size, + Mode: job.Mode, + ModTime: job.ModTime, + WriteTime: time.Now(), + SHA256: hex.EncodeToString(job.hash), + }) + + for _, name := range job.successTarget { + if err := os.Chtimes(name, job.ModTime, job.ModTime); err != nil { + c.reportError(name, fmt.Errorf("change info, chtimes fail, %w", err)) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d7bedac --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module github.com/abc950309/acp + +go 1.18 + +require ( + github.com/hashicorp/go-multierror v1.1.1 + github.com/minio/sha256-simd v1.0.0 + github.com/schollz/progressbar/v3 v3.10.1 + github.com/sirupsen/logrus v1.9.0 +) + +require ( + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/klauspost/cpuid/v2 v2.0.4 // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect + github.com/rivo/uniseg v0.3.4 // indirect + golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect + golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..70d3968 --- /dev/null +++ b/go.sum @@ -0,0 +1,40 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw= +github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= +github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.3.4 h1:3Z3Eu6FGHZWSfNKJTOUiPatWwfc7DzJRU04jFUqJODw= +github.com/rivo/uniseg v0.3.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/schollz/progressbar/v3 v3.10.1 h1:6A8v8TIcCJL4yemlUJS9gdcpZ++Gy6toOh1JzKQkz+U= +github.com/schollz/progressbar/v3 v3.10.1/go.mod h1:R2djRgv58sn00AGysc4fN0ip4piOGd3z88K+zVBjczs= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 h1:Q5284mrmYTpACcm+eAKjKJH48BBwSyfJqmmGDTtT8Vc= +golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job.go b/job.go new file mode 100644 index 0000000..7368501 --- /dev/null +++ b/job.go @@ -0,0 +1,57 @@ +package acp + +import ( + "fmt" + "os" + "strings" + "time" + + "github.com/abc950309/acp/mmap" +) + +type JobType uint8 + +const ( + JobTypeNormal = JobType(iota) + JobTypeEnterDir + JobTypeExitDir +) + +type Job struct { + Source string + RelativePath string + Type JobType + Name string // base name of the file + Size int64 // length in bytes for regular files; system-dependent for others + Mode os.FileMode // file mode bits + ModTime time.Time // modification time +} + +func newJobFromFileInfo(base, path string, info os.FileInfo) (*Job, error) { + if !strings.HasPrefix(path, base) { + return nil, fmt.Errorf("path do not contains base, path= '%s', base= '%s'", path, base) + } + + job := &Job{ + Source: path, + RelativePath: path[len(base):], + Name: info.Name(), + Size: info.Size(), + Mode: info.Mode(), + ModTime: info.ModTime(), + } + return job, nil +} + +type writeJob struct { + *Job + src *mmap.ReaderAt +} + +type metaJob struct { + *Job + + successTarget []string + failTarget map[string]string + hash []byte +} diff --git a/mmap/manual_test_program.go b/mmap/manual_test_program.go new file mode 100644 index 0000000..a1ab17b --- /dev/null +++ b/mmap/manual_test_program.go @@ -0,0 +1,56 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build ignore +// +build ignore + +// +// This build tag means that "go build" does not build this file. Use "go run +// manual_test_program.go" to run it. +// +// You will also need to change "debug = false" to "debug = true" in mmap_*.go. + +package main + +import ( + "log" + "math/rand" + "time" + + "golang.org/x/exp/mmap" +) + +var garbage []byte + +func main() { + const filename = "manual_test_program.go" + + for _, explicitClose := range []bool{false, true} { + r, err := mmap.Open(filename) + if err != nil { + log.Fatalf("Open: %v", err) + } + if explicitClose { + r.Close() + } else { + // Leak the *mmap.ReaderAt returned by mmap.Open. The finalizer + // should pick it up, if finalizers run at all. + } + } + + println("Finished all explicit Close calls.") + println("Creating and collecting garbage.") + println("Look for two munmap log messages.") + println("Hit Ctrl-C to exit.") + + rng := rand.New(rand.NewSource(1)) + now := time.Now() + for { + garbage = make([]byte, rng.Intn(1<<20)) + if time.Since(now) > 1*time.Second { + now = time.Now() + print(".") + } + } +} diff --git a/mmap/mmap_darwin.go b/mmap/mmap_darwin.go new file mode 100644 index 0000000..1882efb --- /dev/null +++ b/mmap/mmap_darwin.go @@ -0,0 +1,136 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build darwin +// +build darwin + +// Package mmap provides a way to memory-map a file. +package mmap + +import ( + "errors" + "fmt" + "io" + "os" + "runtime" + "syscall" +) + +// debug is whether to print debugging messages for manual testing. +// +// The runtime.SetFinalizer documentation says that, "The finalizer for x is +// scheduled to run at some arbitrary time after x becomes unreachable. There +// is no guarantee that finalizers will run before a program exits", so we +// cannot automatically test that the finalizer runs. Instead, set this to true +// when running the manual test. +const debug = false + +// ReaderAt reads a memory-mapped file. +// +// Like any io.ReaderAt, clients can execute parallel ReadAt calls, but it is +// not safe to call Close and reading methods concurrently. +type ReaderAt struct { + data []byte +} + +// Close closes the reader. +func (r *ReaderAt) Close() error { + if r.data == nil { + return nil + } + data := r.data + r.data = nil + if debug { + var p *byte + if len(data) != 0 { + p = &data[0] + } + println("munmap", r, p) + } + runtime.SetFinalizer(r, nil) + return syscall.Munmap(data) +} + +// Len returns the length of the underlying memory-mapped file. +func (r *ReaderAt) Len() int { + return len(r.data) +} + +// At returns the byte at index i. +func (r *ReaderAt) At(i int) byte { + return r.data[i] +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) ReadAt(p []byte, off int64) (int, error) { + if r.data == nil { + return 0, errors.New("mmap: closed") + } + if off < 0 || int64(len(r.data)) < off { + return 0, fmt.Errorf("mmap: invalid ReadAt offset %d", off) + } + n := copy(p, r.data[off:]) + if n < len(p) { + return n, io.EOF + } + return n, nil +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) Slice(off, limit int64) ([]byte, error) { + if r.data == nil { + return nil, errors.New("mmap: closed") + } + + l := int64(len(r.data)) + if off < 0 || limit < 0 || l < off { + return nil, fmt.Errorf("mmap: invalid ReadAt offset %d", off) + } + + if off+limit > l { + return r.data[off:], nil + } + + return r.data[off : off+limit], nil +} + +// Open memory-maps the named file for reading. +func Open(filename string) (*ReaderAt, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, err + } + + size := fi.Size() + if size == 0 { + return &ReaderAt{}, nil + } + if size < 0 { + return nil, fmt.Errorf("mmap: file %q has negative size", filename) + } + if size != int64(int(size)) { + return nil, fmt.Errorf("mmap: file %q is too large", filename) + } + + data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("create mmap fail, %q, %w", filename, err) + } + + r := &ReaderAt{data} + if debug { + var p *byte + if len(data) != 0 { + p = &data[0] + } + println("mmap", r, p) + } + runtime.SetFinalizer(r, (*ReaderAt).Close) + return r, nil +} diff --git a/mmap/mmap_linux.go b/mmap/mmap_linux.go new file mode 100644 index 0000000..fcaf52f --- /dev/null +++ b/mmap/mmap_linux.go @@ -0,0 +1,145 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build linux +// +build linux + +// Package mmap provides a way to memory-map a file. +package mmap + +import ( + "errors" + "fmt" + "io" + "os" + "runtime" + "syscall" +) + +const ( + prefetchMaxSize = 16 * 1024 * 1024 +) + +// debug is whether to print debugging messages for manual testing. +// +// The runtime.SetFinalizer documentation says that, "The finalizer for x is +// scheduled to run at some arbitrary time after x becomes unreachable. There +// is no guarantee that finalizers will run before a program exits", so we +// cannot automatically test that the finalizer runs. Instead, set this to true +// when running the manual test. +const debug = false + +// ReaderAt reads a memory-mapped file. +// +// Like any io.ReaderAt, clients can execute parallel ReadAt calls, but it is +// not safe to call Close and reading methods concurrently. +type ReaderAt struct { + data []byte +} + +// Close closes the reader. +func (r *ReaderAt) Close() error { + if r.data == nil { + return nil + } + data := r.data + r.data = nil + if debug { + var p *byte + if len(data) != 0 { + p = &data[0] + } + println("munmap", r, p) + } + runtime.SetFinalizer(r, nil) + return syscall.Munmap(data) +} + +// Len returns the length of the underlying memory-mapped file. +func (r *ReaderAt) Len() int { + return len(r.data) +} + +// At returns the byte at index i. +func (r *ReaderAt) At(i int) byte { + return r.data[i] +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) ReadAt(p []byte, off int64) (int, error) { + if r.data == nil { + return 0, errors.New("mmap: closed") + } + if off < 0 || int64(len(r.data)) < off { + return 0, fmt.Errorf("mmap: invalid ReadAt offset %d", off) + } + n := copy(p, r.data[off:]) + if n < len(p) { + return n, io.EOF + } + return n, nil +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) Slice(off, limit int64) ([]byte, error) { + if r.data == nil { + return nil, errors.New("mmap: closed") + } + + l := int64(len(r.data)) + if off < 0 || limit < 0 || l < off { + return nil, fmt.Errorf("mmap: invalid ReadAt offset %d", off) + } + + if off+limit > l { + return r.data[off:], nil + } + + return r.data[off : off+limit], nil +} + +// Open memory-maps the named file for reading. +func Open(filename string) (*ReaderAt, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, err + } + + size := fi.Size() + if size == 0 { + return &ReaderAt{}, nil + } + if size < 0 { + return nil, fmt.Errorf("mmap: file %q has negative size", filename) + } + if size != int64(int(size)) { + return nil, fmt.Errorf("mmap: file %q is too large", filename) + } + + data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("create mmap fail, %q, %w", filename, err) + } + if size <= prefetchMaxSize { + if err := syscall.Madvise(data, syscall.MADV_SEQUENTIAL|syscall.MADV_WILLNEED); err != nil { + return nil, fmt.Errorf("madvise fail, %q, %w", filename, err) + } + } + + r := &ReaderAt{data} + if debug { + var p *byte + if len(data) != 0 { + p = &data[0] + } + println("mmap", r, p) + } + runtime.SetFinalizer(r, (*ReaderAt).Close) + return r, nil +} diff --git a/mmap/mmap_other.go b/mmap/mmap_other.go new file mode 100644 index 0000000..8c4ef04 --- /dev/null +++ b/mmap/mmap_other.go @@ -0,0 +1,86 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !linux && !windows && !darwin +// +build !linux,!windows,!darwin + +// Package mmap provides a way to memory-map a file. +package mmap + +import ( + "fmt" + "os" +) + +// ReaderAt reads a memory-mapped file. +// +// Like any io.ReaderAt, clients can execute parallel ReadAt calls, but it is +// not safe to call Close and reading methods concurrently. +type ReaderAt struct { + f *os.File + len int +} + +// Close closes the reader. +func (r *ReaderAt) Close() error { + return r.f.Close() +} + +// Len returns the length of the underlying memory-mapped file. +func (r *ReaderAt) Len() int { + return r.len +} + +// At returns the byte at index i. +func (r *ReaderAt) At(i int) byte { + if i < 0 || r.len <= i { + panic("index out of range") + } + var b [1]byte + r.ReadAt(b[:], int64(i)) + return b[0] +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) ReadAt(p []byte, off int64) (int, error) { + return r.f.ReadAt(p, off) +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) Slice(off, limit int64) ([]byte, error) { + buf := make([]byte, limit) + n, err := r.ReadAt(buf, off) + if err != nil { + return nil, err + } + return buf[:n], nil +} + +// Open memory-maps the named file for reading. +func Open(filename string) (*ReaderAt, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + fi, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + + size := fi.Size() + if size < 0 { + f.Close() + return nil, fmt.Errorf("mmap: file %q has negative size", filename) + } + if size != int64(int(size)) { + f.Close() + return nil, fmt.Errorf("mmap: file %q is too large", filename) + } + + return &ReaderAt{ + f: f, + len: int(fi.Size()), + }, nil +} diff --git a/mmap/mmap_test.go b/mmap/mmap_test.go new file mode 100644 index 0000000..797fc5f --- /dev/null +++ b/mmap/mmap_test.go @@ -0,0 +1,34 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package mmap + +import ( + "bytes" + "io" + "io/ioutil" + "testing" +) + +func TestOpen(t *testing.T) { + const filename = "mmap_test.go" + r, err := Open(filename) + if err != nil { + t.Fatalf("Open: %v", err) + } + got := make([]byte, r.Len()) + if _, err := r.ReadAt(got, 0); err != nil && err != io.EOF { + t.Fatalf("ReadAt: %v", err) + } + want, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatalf("ioutil.ReadFile: %v", err) + } + if len(got) != len(want) { + t.Fatalf("got %d bytes, want %d", len(got), len(want)) + } + if !bytes.Equal(got, want) { + t.Fatalf("\ngot %q\nwant %q", string(got), string(want)) + } +} diff --git a/mmap/mmap_windows.go b/mmap/mmap_windows.go new file mode 100644 index 0000000..e0d3414 --- /dev/null +++ b/mmap/mmap_windows.go @@ -0,0 +1,141 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package mmap provides a way to memory-map a file. +package mmap + +import ( + "errors" + "fmt" + "io" + "os" + "runtime" + "syscall" + "unsafe" +) + +// debug is whether to print debugging messages for manual testing. +// +// The runtime.SetFinalizer documentation says that, "The finalizer for x is +// scheduled to run at some arbitrary time after x becomes unreachable. There +// is no guarantee that finalizers will run before a program exits", so we +// cannot automatically test that the finalizer runs. Instead, set this to true +// when running the manual test. +const debug = false + +// ReaderAt reads a memory-mapped file. +// +// Like any io.ReaderAt, clients can execute parallel ReadAt calls, but it is +// not safe to call Close and reading methods concurrently. +type ReaderAt struct { + data []byte +} + +// Close closes the reader. +func (r *ReaderAt) Close() error { + if r.data == nil { + return nil + } + data := r.data + r.data = nil + if debug { + var p *byte + if len(data) != 0 { + p = &data[0] + } + println("munmap", r, p) + } + runtime.SetFinalizer(r, nil) + return syscall.UnmapViewOfFile(uintptr(unsafe.Pointer(&data[0]))) +} + +// Len returns the length of the underlying memory-mapped file. +func (r *ReaderAt) Len() int { + return len(r.data) +} + +// At returns the byte at index i. +func (r *ReaderAt) At(i int) byte { + return r.data[i] +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) ReadAt(p []byte, off int64) (int, error) { + if r.data == nil { + return 0, errors.New("mmap: closed") + } + if off < 0 || int64(len(r.data)) < off { + return 0, fmt.Errorf("mmap: invalid ReadAt offset %d", off) + } + n := copy(p, r.data[off:]) + if n < len(p) { + return n, io.EOF + } + return n, nil +} + +// ReadAt implements the io.ReaderAt interface. +func (r *ReaderAt) Slice(off, limit int64) ([]byte, error) { + if r.data == nil { + return nil, errors.New("mmap: closed") + } + + l := int64(len(r.data)) + if off < 0 || limit < 0 || l < off { + return nil, fmt.Errorf("mmap: invalid ReadAt offset %d", off) + } + + if off+limit > l { + return r.data[off:], nil + } + + return r.data[off : off+limit], nil +} + +// Open memory-maps the named file for reading. +func Open(filename string) (*ReaderAt, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, err + } + + size := fi.Size() + if size == 0 { + return &ReaderAt{}, nil + } + if size < 0 { + return nil, fmt.Errorf("mmap: file %q has negative size", filename) + } + if size != int64(int(size)) { + return nil, fmt.Errorf("mmap: file %q is too large", filename) + } + + low, high := uint32(size), uint32(size>>32) + fmap, err := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil) + if err != nil { + return nil, err + } + defer syscall.CloseHandle(fmap) + ptr, err := syscall.MapViewOfFile(fmap, syscall.FILE_MAP_READ, 0, 0, uintptr(size)) + if err != nil { + return nil, err + } + data := unsafe.Slice((*byte)(unsafe.Pointer(ptr)), size) + + r := &ReaderAt{data: data} + if debug { + var p *byte + if len(data) != 0 { + p = &data[0] + } + println("mmap", r, p) + } + runtime.SetFinalizer(r, (*ReaderAt).Close) + return r, nil +} diff --git a/opt.go b/opt.go new file mode 100644 index 0000000..cb03c83 --- /dev/null +++ b/opt.go @@ -0,0 +1,159 @@ +package acp + +import ( + "fmt" + "os" + "path" + "sort" + "strings" +) + +type source struct { + path string + base string + name string +} + +type option struct { + src []source + dst []string + + fromDevice *deviceOption + toDevice *deviceOption + + overwrite bool + withProgressBar bool + // threads int + withHash bool +} + +func newOption() *option { + return &option{ + fromDevice: new(deviceOption), + toDevice: new(deviceOption), + } +} + +func (o *option) check() error { + filteredDst := make([]string, 0, len(o.dst)) + for _, p := range o.dst { + p = strings.TrimSpace(p) + if p == "" { + continue + } + if p[len(p)-1] != '/' { + p = p + "/" + } + + dstStat, err := os.Stat(p) + if err != nil { + return fmt.Errorf("check dst path '%s', %w", p, err) + } + if !dstStat.IsDir() { + return fmt.Errorf("dst path is not a dir") + } + + filteredDst = append(filteredDst, p) + } + o.dst = filteredDst + + if len(o.src) == 0 { + return fmt.Errorf("source path not found") + } + for _, s := range o.src { + if _, err := os.Stat(s.path); err != nil { + return fmt.Errorf("check src path '%s', %w", s.path, err) + } + } + + sort.Slice(o.src, func(i, j int) bool { + return o.src[i].name < o.src[j].name + }) + for idx := 1; idx < len(o.src); idx++ { + if o.src[idx].name == o.src[idx-1].name { + return fmt.Errorf("have same name source path, '%s' and '%s'", o.src[idx-1].path, o.src[idx].path) + } + } + + return nil +} + +type Option func(*option) *option + +func Source(paths ...string) Option { + return func(o *option) *option { + for _, p := range paths { + p = strings.TrimSpace(p) + if p == "" { + continue + } + + if p[len(p)-1] == '/' { + p = p[:len(p)-1] + } + + base, name := path.Split(p) + o.src = append(o.src, source{path: p, base: base, name: name}) + } + return o + } +} + +func Target(paths ...string) Option { + return func(o *option) *option { + o.dst = append(o.dst, paths...) + return o + } +} + +func SetFromDevice(opts ...DeviceOption) Option { + return func(o *option) *option { + for _, opt := range opts { + if opt == nil { + continue + } + o.fromDevice = opt(o.fromDevice) + } + return o + } +} + +func SetToDevice(opts ...DeviceOption) Option { + return func(o *option) *option { + for _, opt := range opts { + if opt == nil { + continue + } + o.toDevice = opt(o.toDevice) + } + return o + } +} + +func Overwrite(b bool) Option { + return func(o *option) *option { + o.overwrite = b + return o + } +} + +func WithProgressBar(b bool) Option { + return func(o *option) *option { + o.withProgressBar = b + return o + } +} + +// func WithThreads(threads int) Option { +// return func(o *option) *option { +// o.threads = threads +// return o +// } +// } + +func WithHash(b bool) Option { + return func(o *option) *option { + o.withHash = b + return o + } +} diff --git a/opt_device.go b/opt_device.go new file mode 100644 index 0000000..8ab75c8 --- /dev/null +++ b/opt_device.go @@ -0,0 +1,14 @@ +package acp + +type deviceOption struct { + linear bool +} + +type DeviceOption func(*deviceOption) *deviceOption + +func LinearDevice(b bool) DeviceOption { + return func(d *deviceOption) *deviceOption { + d.linear = b + return d + } +} diff --git a/progress_bar.go b/progress_bar.go new file mode 100644 index 0000000..a8b2742 --- /dev/null +++ b/progress_bar.go @@ -0,0 +1,72 @@ +package acp + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/schollz/progressbar/v3" +) + +func (c *Copyer) startProgressBar(ctx context.Context) { + progressBar := progressbar.DefaultBytes(0, "[0/0] indexing...") + c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) { + c.progressBarLock.Lock() + defer c.progressBarLock.Unlock() + + if progressBar == nil { + return + } + f(progressBar) + } + + go func() { + ticker := time.NewTicker(time.Nanosecond * 255002861) // around 255ms, avoid conflict with progress bar fresh by second + defer ticker.Stop() + + var lastCopyedBytes int64 + for range ticker.C { + c.progressBarLock.Lock() + + switch c.stage { + case StageIndex: + go c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes)) + bar.Describe(fmt.Sprintf("[0/%d] indexing...", atomic.LoadInt64(&c.totalFiles))) + }) + case StageCopy: + currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) + diff := int(currentCopyedBytes - lastCopyedBytes) + lastCopyedBytes = currentCopyedBytes + + go c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.Add(diff) + }) + case StageFinished: + currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) + diff := int(currentCopyedBytes - lastCopyedBytes) + lastCopyedBytes = currentCopyedBytes + + copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles) + + c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.Add(diff) + bar.Describe(fmt.Sprintf("[%d/%d] finished!", copyedFiles, totalFiles)) + }) + } + + select { + case <-ctx.Done(): + c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.Close() + progressBar = nil + }) + return + default: + } + + c.progressBarLock.Unlock() + } + }() +} diff --git a/report.go b/report.go new file mode 100644 index 0000000..b31e6b5 --- /dev/null +++ b/report.go @@ -0,0 +1,64 @@ +package acp + +import ( + "encoding/json" + "fmt" + "os" + "time" +) + +func (c *Copyer) Report() *Report { + c.reportLock.Lock() + defer c.reportLock.Unlock() + + return &Report{ + Files: c.files, + Errors: c.errors, + } +} + +var ( + _ = error(new(Error)) + _ = json.Marshaler(new(Error)) + _ = json.Unmarshaler(new(Error)) +) + +type Error struct { + Path string `json:"path,omitempty"` + Err error `json:"error,omitempty"` +} + +func (e *Error) Error() string { + return fmt.Sprintf("[%s]: %s", e.Path, e.Err) +} + +func (e *Error) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]string{"path": e.Path, "error": e.Err.Error()}) +} + +func (e *Error) UnmarshalJSON(buf []byte) error { + m := make(map[string]string, 2) + if err := json.Unmarshal(buf, &m); err != nil { + return err + } + + e.Path, e.Err = m["path"], fmt.Errorf(m["error"]) + return nil +} + +type File struct { + Source string `json:"source"` + SuccessTarget []string `json:"success_target"` + FailTarget map[string]string `json:"fail_target"` + RelativePath string `json:"relative_path"` + Size int64 `json:"size"` + Mode os.FileMode `json:"mode"` + ModTime time.Time `json:"mod_time"` + WriteTime time.Time `json:"write_time"` + SHA256 string `json:"sha256"` +} + +type Report struct { + Files []*File + Errors []*Error +} diff --git a/status.go b/status.go new file mode 100644 index 0000000..edf40ac --- /dev/null +++ b/status.go @@ -0,0 +1,21 @@ +package acp + +import "sync/atomic" + +type Status struct { + Stage int64 + CopyedBytes int64 + TotalBytes int64 + CopyedFiles int64 + TotalFiles int64 +} + +func (c *Copyer) Status() Status { + return Status{ + Stage: atomic.LoadInt64(&c.stage), + CopyedBytes: atomic.LoadInt64(&c.copyedBytes), + TotalBytes: atomic.LoadInt64(&c.totalBytes), + CopyedFiles: atomic.LoadInt64(&c.copyedFiles), + TotalFiles: atomic.LoadInt64(&c.totalFiles), + } +}