From 47c9852741020bced4784a54299939a4c32ca5c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E7=AB=9E=E5=AE=81?= Date: Thu, 8 Sep 2022 20:49:34 +0800 Subject: [PATCH] feat: with multi threads --- acp.go | 126 ++++++++++++++++++ cmd/acp/main.go | 31 +++-- copy.go | 347 +++++++++++++++--------------------------------- index.go | 138 +++++++++++++++++++ job.go | 163 ++++++++++++++++++----- opt.go | 58 +++++--- progress_bar.go | 61 ++++----- report.go | 37 +++--- 8 files changed, 613 insertions(+), 348 deletions(-) create mode 100644 acp.go create mode 100644 index.go diff --git a/acp.go b/acp.go new file mode 100644 index 0000000..a3ffa36 --- /dev/null +++ b/acp.go @@ -0,0 +1,126 @@ +package acp + +import ( + "context" + "os" + "sync" + + "github.com/schollz/progressbar/v3" + "github.com/sirupsen/logrus" +) + +const ( + StageIndex = iota + StageCopy + StageFinished +) + +type Copyer struct { + *option + + createFlag int + stage int64 + copyedBytes int64 + totalBytes int64 + copyedFiles int64 + totalFiles int64 + + updateProgressBar func(func(bar *progressbar.ProgressBar)) + + jobsLock sync.Mutex + jobs []*baseJob + + errsLock sync.Mutex + errors []*Error + + badDstsLock sync.Mutex + badDsts map[string]error + + writePipe chan *writeJob + postPipe chan *baseJob + + running sync.WaitGroup +} + +func New(ctx context.Context, opts ...Option) (*Copyer, error) { + opt := newOption() + for _, o := range opts { + if o == nil { + continue + } + opt = o(opt) + } + if err := opt.check(); err != nil { + return nil, err + } + + c := &Copyer{ + option: opt, + stage: StageIndex, + updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {}, + badDsts: make(map[string]error), + writePipe: make(chan *writeJob, 32), + postPipe: make(chan *baseJob, 8), + } + + c.createFlag = os.O_WRONLY | os.O_CREATE + if c.overwrite { + c.createFlag |= os.O_TRUNC + } else { + c.createFlag |= os.O_EXCL + } + + c.running.Add(1) + go c.run(ctx) + return c, nil +} + +func (c *Copyer) Wait() *Report { + c.running.Wait() + return c.Report() +} + +func (c *Copyer) run(ctx context.Context) { + defer c.running.Done() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if c.withProgressBar { + c.startProgressBar(ctx) + } + + c.index(ctx) + if !c.checkJobs() { + return + } + + c.copy(ctx) +} + +func (c *Copyer) reportError(file string, err error) { + e := &Error{Path: file, Err: err} + logrus.Errorf(e.Error()) + + c.errsLock.Lock() + defer c.errsLock.Unlock() + c.errors = append(c.errors, e) +} + +func (c *Copyer) getErrors() []*Error { + c.errsLock.Lock() + defer c.errsLock.Unlock() + return c.errors +} + +func (c *Copyer) addBadDsts(dst string, err error) { + c.badDstsLock.Lock() + defer c.badDstsLock.Unlock() + c.badDsts[dst] = err +} + +func (c *Copyer) getBadDsts() map[string]error { + c.errsLock.Lock() + defer c.errsLock.Unlock() + return c.badDsts +} diff --git a/cmd/acp/main.go b/cmd/acp/main.go index 29b8f3c..6672221 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -12,10 +12,14 @@ import ( ) var ( - notOverwrite = flag.Bool("n", false, "not overwrite exist file") - noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool") - reportPath = flag.String("report", "", "json report storage path") - targetPaths []string + withProgressBar = flag.Bool("p", true, "display progress bar") + notOverwrite = flag.Bool("n", false, "not overwrite exist file") + noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool") + reportPath = flag.String("report", "", "json report storage path") + fromLinear = flag.Bool("from-linear", false, "json report storage path") + toLinear = flag.Bool("to-linear", false, "json report storage path") + + targetPaths []string ) func init() { @@ -53,10 +57,21 @@ func main() { } }() - c, err := acp.New( - ctx, acp.Source(sources...), acp.Target(targetPaths...), - acp.Overwrite(!*notOverwrite), acp.WithProgressBar(true), acp.WithHash(true), - ) + opts := make([]acp.Option, 0, 8) + opts = append(opts, acp.Source(sources...)) + opts = append(opts, acp.Target(targetPaths...)) + opts = append(opts, acp.WithHash(*reportPath != "")) + opts = append(opts, acp.Overwrite(!*notOverwrite)) + opts = append(opts, acp.WithProgressBar(*withProgressBar)) + + if *fromLinear { + opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true))) + } + if *toLinear { + opts = append(opts, acp.SetToDevice(acp.LinearDevice(true))) + } + + c, err := acp.New(ctx, opts...) if err != nil { panic(err) } diff --git a/copy.go b/copy.go index 304ad0b..23e979e 100644 --- a/copy.go +++ b/copy.go @@ -2,137 +2,39 @@ package acp import ( "context" - "encoding/hex" + "errors" "fmt" "hash" "os" "sync" "sync/atomic" - "time" + "syscall" "github.com/abc950309/acp/mmap" "github.com/hashicorp/go-multierror" "github.com/minio/sha256-simd" "github.com/schollz/progressbar/v3" - "github.com/sirupsen/logrus" ) const ( - unexpectFileMode = os.ModeType &^ os.ModeDir - batchSize = 1024 * 1024 + batchSize = 1024 * 1024 ) var ( sha256Pool = &sync.Pool{New: func() interface{} { return sha256.New() }} ) -const ( - StageIndex = iota - StageCopy - StageFinished -) - -type Copyer struct { - *option - - createFlag int - stage int64 - copyedBytes int64 - totalBytes int64 - copyedFiles int64 - totalFiles int64 - - updateProgressBar func(func(bar *progressbar.ProgressBar)) - - jobs []*Job - writePipe chan *writeJob - metaPipe chan *metaJob - - wg sync.WaitGroup - reportLock sync.Mutex - errors []*Error - files []*File -} - -func New(ctx context.Context, opts ...Option) (*Copyer, error) { - opt := newOption() - for _, o := range opts { - if o == nil { - continue - } - opt = o(opt) - } - if err := opt.check(); err != nil { - return nil, err - } - - c := &Copyer{ - option: opt, - stage: StageIndex, - writePipe: make(chan *writeJob, 32), - metaPipe: make(chan *metaJob, 8), - updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {}, - } - - c.createFlag = os.O_WRONLY | os.O_CREATE - if c.overwrite { - c.createFlag |= os.O_TRUNC - } else { - c.createFlag |= os.O_EXCL - } - - c.wg.Add(1) - go c.run(ctx) - return c, nil -} - -func (c *Copyer) Wait() *Report { - c.wg.Wait() - return c.Report() -} - -func (c *Copyer) reportError(file string, err error) { - e := &Error{Path: file, Err: err} - logrus.Errorf(e.Error()) - - c.reportLock.Lock() - defer c.reportLock.Unlock() - c.errors = append(c.errors, e) -} - -func (c *Copyer) run(ctx context.Context) { - defer c.wg.Done() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - if c.withProgressBar { - c.startProgressBar(ctx) - } - - c.index(ctx) - c.copy(ctx) -} - -func (c *Copyer) index(ctx context.Context) { - for _, s := range c.src { - c.walk(s.base, s.path) - } - - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes)) - bar.Describe(fmt.Sprintf("[0/%d] index finished...", atomic.LoadInt64(&c.totalFiles))) - }) -} - func (c *Copyer) copy(ctx context.Context) { atomic.StoreInt64(&c.stage, StageCopy) defer atomic.StoreInt64(&c.stage, StageFinished) + wg := new(sync.WaitGroup) + wg.Add(1) go func() { + defer wg.Done() defer close(c.writePipe) - for _, job := range c.jobs { + for _, job := range c.getJobs() { c.prepare(ctx, job) select { @@ -143,111 +45,96 @@ func (c *Copyer) copy(ctx context.Context) { } }() + for i := 0; i < c.threads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case job, ok := <-c.writePipe: + if !ok { + return + } + + wg.Add(1) + c.write(ctx, job) + case <-ctx.Done(): + return + } + } + }() + } + go func() { - defer close(c.metaPipe) - - for job := range c.writePipe { - c.write(ctx, job) - + for { select { + case job, ok := <-c.postPipe: + if !ok { + return + } + c.post(wg, job) case <-ctx.Done(): return - default: } } }() - for file := range c.metaPipe { - c.meta(file) - } + wg.Wait() } -func (c *Copyer) walk(base, path string) { - stat, err := os.Stat(path) - if err != nil { - c.reportError(path, fmt.Errorf("walk get stat, %w", err)) - return - } +func (c *Copyer) prepare(ctx context.Context, job *baseJob) { + job.setStatus(jobStatusPreparing) - job, err := newJobFromFileInfo(base, path, stat) - if err != nil { - c.reportError(path, fmt.Errorf("make job fail, %w", err)) - return - } - if job.Mode&unexpectFileMode != 0 { - return - } - if !job.Mode.IsDir() { - atomic.AddInt64(&c.totalFiles, 1) - atomic.AddInt64(&c.totalBytes, job.Size) - c.jobs = append(c.jobs, job) - return - } - - enterJob := new(Job) - *enterJob = *job - enterJob.Type = JobTypeEnterDir - c.jobs = append(c.jobs, enterJob) - - files, err := os.ReadDir(path) - if err != nil { - c.reportError(path, fmt.Errorf("walk read dir, %w", err)) - return - } - - for _, file := range files { - c.walk(base, path+"/"+file.Name()) - } - - exitJob := new(Job) - *exitJob = *job - exitJob.Type = JobTypeExitDir - c.jobs = append(c.jobs, exitJob) -} - -func (c *Copyer) prepare(ctx context.Context, job *Job) { - switch job.Type { - case JobTypeEnterDir: + switch job.typ { + case jobTypeDir: for _, d := range c.dst { - name := d + job.RelativePath - err := os.Mkdir(name, job.Mode&os.ModePerm) - if err != nil && !os.IsExist(err) { - c.reportError(name, fmt.Errorf("mkdir fail, %w", err)) + target := d + job.source.target(d) + if err := os.MkdirAll(target, job.mode&os.ModePerm); err != nil && !os.IsExist(err) { + c.reportError(target, fmt.Errorf("mkdir fail, %w", err)) + job.fail(target, fmt.Errorf("mkdir fail, %w", err)) + continue } + job.succes(target) } - return - case JobTypeExitDir: - c.writePipe <- &writeJob{Job: job} + + c.writePipe <- &writeJob{baseJob: job} return } - name := job.Source - file, err := mmap.Open(name) + file, err := mmap.Open(job.source.path()) if err != nil { - c.reportError(name, fmt.Errorf("open src file fail, %w", err)) + c.reportError(job.source.path(), fmt.Errorf("open src file fail, %w", err)) return } - c.writePipe <- &writeJob{Job: job, src: file} + c.writePipe <- &writeJob{baseJob: job, src: file} } func (c *Copyer) write(ctx context.Context, job *writeJob) { - if job.src == nil { - c.metaPipe <- &metaJob{Job: job.Job} + job.setStatus(jobStatusCopying) + if job.typ != jobTypeNormal { return } - defer job.src.Close() - - num := atomic.AddInt64(&c.copyedFiles, 1) - go c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.RelativePath)) - }) var wg sync.WaitGroup - var lock sync.Mutex - var readErr error + defer func() { + wg.Wait() + job.src.Close() + c.postPipe <- job.baseJob + }() + + num := atomic.AddInt64(&c.copyedFiles, 1) + c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.source.relativePath)) + }) + chans := make([]chan []byte, 0, len(c.dst)+1) - next := &metaJob{Job: job.Job, failTarget: make(map[string]string)} + defer func() { + for _, ch := range chans { + close(ch) + } + }() if c.withHash { sha := sha256Pool.Get().(hash.Hash) @@ -265,21 +152,24 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { sha.Write(buf) } - lock.Lock() - defer lock.Unlock() - next.hash = sha.Sum(nil) + job.setHash(sha.Sum(nil)) }() } + var readErr error + badDsts := c.getBadDsts() for _, d := range c.dst { - name := d + job.RelativePath - file, err := os.OpenFile(name, c.createFlag, job.Mode) + dst := d + + name := job.source.target(dst) + if e, has := badDsts[dst]; has && e != nil { + job.fail(name, fmt.Errorf("bad target path, %w", e)) + } + + file, err := os.OpenFile(name, c.createFlag, job.mode) if err != nil { c.reportError(name, fmt.Errorf("open dst file fail, %w", err)) - - lock.Lock() - defer lock.Unlock() - next.failTarget[name] = fmt.Errorf("open dst file fail, %w", err).Error() + job.fail(name, fmt.Errorf("open dst file fail, %w", err)) continue } @@ -293,9 +183,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { var rerr error defer func() { if rerr == nil { - lock.Lock() - defer lock.Unlock() - next.successTarget = append(next.successTarget, name) + job.succes(name) return } @@ -307,29 +195,24 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { rerr = multierror.Append(rerr, re) } - c.reportError(name, rerr) + // if no space + if errors.Is(err, syscall.ENOSPC) { + c.addBadDsts(dst, err) + } - lock.Lock() - defer lock.Unlock() - next.failTarget[name] = rerr.Error() + c.reportError(name, rerr) + job.fail(name, rerr) }() defer file.Close() for buf := range ch { - nr := len(buf) - n, err := file.Write(buf) - if n < 0 || nr < n { - if err == nil { - rerr = fmt.Errorf("write fail, unexpected return, byte_num= %d", n) - return - } - + if err != nil { rerr = fmt.Errorf("write fail, %w", err) return } - if nr != n { - rerr = fmt.Errorf("write fail, write and read bytes not equal, read= %d write= %d", nr, n) + if len(buf) != n { + rerr = fmt.Errorf("write fail, unexpected writen bytes return, read= %d write= %d", len(buf), n) return } } @@ -340,15 +223,9 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { }() } - defer func() { - for _, ch := range chans { - close(ch) - } - - wg.Wait() - c.metaPipe <- next - }() - + if len(chans) == 0 { + return + } readErr = c.streamCopy(ctx, chans, job.src) } @@ -381,31 +258,23 @@ func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.R } } -func (c *Copyer) meta(job *metaJob) { - if job.Mode.IsDir() { - for _, d := range c.dst { - if err := os.Chtimes(d+job.RelativePath, job.ModTime, job.ModTime); err != nil { - c.reportError(d+job.RelativePath, fmt.Errorf("change info, chtimes fail, %w", err)) - } - } - return - } +func (c *Copyer) post(wg *sync.WaitGroup, job *baseJob) { + defer wg.Done() - c.files = append(c.files, &File{ - Source: job.Source, - SuccessTarget: job.successTarget, - FailTarget: job.failTarget, - RelativePath: job.RelativePath, - Size: job.Size, - Mode: job.Mode, - ModTime: job.ModTime, - WriteTime: time.Now(), - SHA256: hex.EncodeToString(job.hash), - }) - - for _, name := range job.successTarget { - if err := os.Chtimes(name, job.ModTime, job.ModTime); err != nil { + job.setStatus(jobStatusFinishing) + for _, name := range job.successTargets { + if err := os.Chtimes(name, job.modTime, job.modTime); err != nil { c.reportError(name, fmt.Errorf("change info, chtimes fail, %w", err)) } } + + job.setStatus(jobStatusFinished) + if job.parent == nil { + return + } + + left := job.parent.done(job) + if left == 0 { + c.postPipe <- job.parent + } } diff --git a/index.go b/index.go new file mode 100644 index 0000000..b4ad4c3 --- /dev/null +++ b/index.go @@ -0,0 +1,138 @@ +package acp + +import ( + "context" + "fmt" + "os" + "sort" + "sync/atomic" + + "github.com/schollz/progressbar/v3" +) + +const ( + unexpectFileMode = os.ModeType &^ os.ModeDir +) + +func (c *Copyer) index(ctx context.Context) { + for _, s := range c.src { + c.walk(nil, s) + } + + c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes)) + bar.Describe(fmt.Sprintf("[0/%d] index finished...", atomic.LoadInt64(&c.totalFiles))) + }) +} + +func (c *Copyer) walk(parent *baseJob, src *source) *baseJob { + path := src.path() + + stat, err := os.Stat(path) + if err != nil { + c.reportError(path, fmt.Errorf("walk get stat, %w", err)) + return nil + } + if stat.Mode()&unexpectFileMode != 0 { + return nil + } + + job, err := newJobFromFileInfo(parent, src, stat) + if err != nil { + c.reportError(path, fmt.Errorf("make job fail, %w", err)) + return nil + } + + c.appendJobs(job) + if job.typ == jobTypeNormal { + totalBytes := atomic.AddInt64(&c.totalBytes, job.size) + totalFiles := atomic.AddInt64(&c.totalFiles, 1) + + c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.ChangeMax64(totalBytes) + bar.Describe(fmt.Sprintf("[0/%d] indexing...", totalFiles)) + }) + + return job + } + + files, err := os.ReadDir(path) + if err != nil { + c.reportError(path, fmt.Errorf("walk read dir, %w", err)) + return nil + } + + job.children = make(map[*baseJob]struct{}, len(files)) + for _, file := range files { + id := c.walk(job, &source{base: src.base, relativePath: src.relativePath + "/" + file.Name()}) + if id == nil { + continue + } + job.children[id] = struct{}{} + } + + return job +} + +func (c *Copyer) appendJobs(jobs ...*baseJob) { + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + c.jobs = append(c.jobs, jobs...) +} + +func (c *Copyer) getJobs() []*baseJob { + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + return c.jobs +} + +func (c *Copyer) checkJobs() bool { + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + + if len(c.jobs) == 0 { + c.reportError("", fmt.Errorf("cannot found available jobs")) + return false + } + + sort.Slice(c.jobs, func(i int, j int) bool { + return c.jobs[i].comparableRelativePath < c.jobs[j].comparableRelativePath + }) + + var last *baseJob + filtered := make([]*baseJob, 0, len(c.jobs)) + for _, job := range c.jobs { + if last == nil { + filtered = append(filtered, job) + last = job + continue + } + if last.source.relativePath != job.source.relativePath { + filtered = append(filtered, job) + last = job + continue + } + + if last.typ != job.typ { + c.reportError(job.source.path(), fmt.Errorf("same relative path with different type, '%s' and '%s'", job.source.path(), last.source.path())) + return false + } + if last.typ == jobTypeNormal { + c.reportError(job.source.path(), fmt.Errorf("same relative path as normal file, ignored, '%s'", job.source.path())) + continue + } + + func() { + last.lock.Lock() + defer last.lock.Unlock() + + for n := range job.children { + last.children[n] = struct{}{} + n.parent = last + } + }() + } + + c.jobs = filtered + return true +} diff --git a/job.go b/job.go index 7368501..1d029d4 100644 --- a/job.go +++ b/job.go @@ -1,57 +1,154 @@ package acp import ( - "fmt" + "encoding/hex" "os" "strings" + "sync" "time" "github.com/abc950309/acp/mmap" ) -type JobType uint8 +type jobType uint8 const ( - JobTypeNormal = JobType(iota) - JobTypeEnterDir - JobTypeExitDir + jobTypeNormal = jobType(iota) + jobTypeDir ) -type Job struct { - Source string - RelativePath string - Type JobType - Name string // base name of the file - Size int64 // length in bytes for regular files; system-dependent for others - Mode os.FileMode // file mode bits - ModTime time.Time // modification time +type jobStatus uint8 + +const ( + jobStatusPending = jobStatus(iota) + jobStatusPreparing + jobStatusCopying + jobStatusFinishing + jobStatusFinished +) + +var ( + statusMapping = map[jobStatus]string{ + jobStatusPending: "pending", + jobStatusPreparing: "preparing", + jobStatusCopying: "copying", + jobStatusFinishing: "finishing", + jobStatusFinished: "finished", + } +) + +type baseJob struct { + parent *baseJob + source *source + typ jobType + + name string // base name of the file + size int64 // length in bytes for regular files; system-dependent for others + mode os.FileMode // file mode bits + modTime time.Time // modification time + + lock sync.Mutex + writeTime time.Time + status jobStatus + children map[*baseJob]struct{} + + successTargets []string + failedTargets map[string]error + hash []byte + + // utils + comparableRelativePath string } -func newJobFromFileInfo(base, path string, info os.FileInfo) (*Job, error) { - if !strings.HasPrefix(path, base) { - return nil, fmt.Errorf("path do not contains base, path= '%s', base= '%s'", path, base) +func newJobFromFileInfo(parent *baseJob, source *source, info os.FileInfo) (*baseJob, error) { + job := &baseJob{ + parent: parent, + source: source, + + name: info.Name(), + size: info.Size(), + mode: info.Mode(), + modTime: info.ModTime(), + + comparableRelativePath: strings.ReplaceAll(source.relativePath, "/", "\x00"), + } + if job.mode.IsDir() { + job.typ = jobTypeDir } - job := &Job{ - Source: path, - RelativePath: path[len(base):], - Name: info.Name(), - Size: info.Size(), - Mode: info.Mode(), - ModTime: info.ModTime(), - } return job, nil } +func (j *baseJob) setStatus(s jobStatus) { + j.lock.Lock() + defer j.lock.Unlock() + j.status = s + + if s == jobStatusCopying { + j.writeTime = time.Now() + } +} + +func (j *baseJob) setHash(h []byte) { + j.lock.Lock() + defer j.lock.Unlock() + j.hash = h +} + +func (j *baseJob) done(child *baseJob) int { + if j.typ == jobTypeNormal { + return 0 + } + + j.lock.Lock() + defer j.lock.Unlock() + + delete(j.children, child) + return len(j.children) +} + +func (j *baseJob) succes(path string) { + j.lock.Lock() + defer j.lock.Unlock() + j.successTargets = append(j.successTargets, path) +} + +func (j *baseJob) fail(path string, err error) { + j.lock.Lock() + defer j.lock.Unlock() + + if j.failedTargets == nil { + j.failedTargets = make(map[string]error, 1) + } + j.failedTargets[path] = err +} + +func (j *baseJob) report() *File { + j.lock.Lock() + defer j.lock.Unlock() + + fails := make(map[string]string, len(j.failedTargets)) + for n, e := range j.failedTargets { + fails[n] = e.Error() + } + + return &File{ + Source: j.source.path(), + RelativePath: j.source.relativePath, + + Status: statusMapping[j.status], + SuccessTargets: j.successTargets, + FailTargets: fails, + + Size: j.size, + Mode: j.mode, + ModTime: j.modTime, + WriteTime: j.writeTime, + SHA256: hex.EncodeToString(j.hash), + } +} + type writeJob struct { - *Job + *baseJob src *mmap.ReaderAt } - -type metaJob struct { - *Job - - successTarget []string - failTarget map[string]string - hash []byte -} diff --git a/opt.go b/opt.go index cb03c83..8bc790a 100644 --- a/opt.go +++ b/opt.go @@ -4,18 +4,24 @@ import ( "fmt" "os" "path" - "sort" "strings" ) type source struct { - path string - base string - name string + base string + relativePath string +} + +func (s *source) path() string { + return s.base + s.relativePath +} + +func (s *source) target(dst string) string { + return dst + s.relativePath } type option struct { - src []source + src []*source dst []string fromDevice *deviceOption @@ -23,8 +29,8 @@ type option struct { overwrite bool withProgressBar bool - // threads int - withHash bool + threads int + withHash bool } func newOption() *option { @@ -61,20 +67,17 @@ func (o *option) check() error { return fmt.Errorf("source path not found") } for _, s := range o.src { - if _, err := os.Stat(s.path); err != nil { - return fmt.Errorf("check src path '%s', %w", s.path, err) + if _, err := os.Stat(s.path()); err != nil { + return fmt.Errorf("check src path '%s', %w", s.path(), err) } } - sort.Slice(o.src, func(i, j int) bool { - return o.src[i].name < o.src[j].name - }) - for idx := 1; idx < len(o.src); idx++ { - if o.src[idx].name == o.src[idx-1].name { - return fmt.Errorf("have same name source path, '%s' and '%s'", o.src[idx-1].path, o.src[idx].path) - } + if o.threads < 1 { + o.threads = 4 + } + if o.fromDevice.linear || o.toDevice.linear { + o.threads = 1 } - return nil } @@ -87,13 +90,32 @@ func Source(paths ...string) Option { if p == "" { continue } + p = path.Clean(p) if p[len(p)-1] == '/' { p = p[:len(p)-1] } base, name := path.Split(p) - o.src = append(o.src, source{path: p, base: base, name: name}) + o.src = append(o.src, &source{base: base, relativePath: name}) + } + return o + } +} + +func AccurateSource(base string, relativePaths ...string) Option { + return func(o *option) *option { + for _, p := range relativePaths { + p = strings.TrimSpace(p) + if p == "" { + continue + } + + if p[len(p)-1] == '/' { + p = p[:len(p)-1] + } + + o.src = append(o.src, &source{base: base, relativePath: p}) } return o } diff --git a/progress_bar.go b/progress_bar.go index c0bb7e1..01ca079 100644 --- a/progress_bar.go +++ b/progress_bar.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "sync" "sync/atomic" "time" @@ -16,8 +15,6 @@ const ( ) func (c *Copyer) startProgressBar(ctx context.Context) { - var lock sync.Mutex - // progressBar := progressbar.DefaultBytes(1, "[0/0] indexing...") progressBar := progressbar.NewOptions64( 1, @@ -33,37 +30,41 @@ func (c *Copyer) startProgressBar(ctx context.Context) { progressbar.OptionSetRenderBlankState(true), ) + ch := make(chan func(bar *progressbar.ProgressBar), 8) c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) { - lock.Lock() - defer lock.Unlock() - - if progressBar == nil { - return - } - f(progressBar) + ch <- f } go func() { + for f := range ch { + if progressBar == nil { + continue + } + f(progressBar) + } + }() + + go func() { + defer close(ch) + ticker := time.NewTicker(barUpdateInterval) // around 255ms, avoid conflict with progress bar fresh by second defer ticker.Stop() var lastCopyedBytes int64 - for range ticker.C { - switch atomic.LoadInt64(&c.stage) { - case StageIndex: - go c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes)) - bar.Describe(fmt.Sprintf("[0/%d] indexing...", atomic.LoadInt64(&c.totalFiles))) - }) - case StageCopy: - currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) - diff := currentCopyedBytes - lastCopyedBytes - lastCopyedBytes = currentCopyedBytes + for { + select { + case <-ticker.C: + switch atomic.LoadInt64(&c.stage) { + case StageCopy: + currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) + diff := currentCopyedBytes - lastCopyedBytes + lastCopyedBytes = currentCopyedBytes - go c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.Add64(diff) - }) - case StageFinished: + c.updateProgressBar(func(bar *progressbar.ProgressBar) { + bar.Add64(diff) + }) + } + case <-ctx.Done(): currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) diff := int(currentCopyedBytes - lastCopyedBytes) lastCopyedBytes = currentCopyedBytes @@ -73,17 +74,7 @@ func (c *Copyer) startProgressBar(ctx context.Context) { bar.Add(diff) bar.Describe(fmt.Sprintf("[%d/%d] finished!", copyedFiles, totalFiles)) }) - } - - select { - case <-ctx.Done(): - time.Sleep(barUpdateInterval) - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.Close() - progressBar = nil - }) return - default: } } }() diff --git a/report.go b/report.go index b31e6b5..ab97292 100644 --- a/report.go +++ b/report.go @@ -8,12 +8,16 @@ import ( ) func (c *Copyer) Report() *Report { - c.reportLock.Lock() - defer c.reportLock.Unlock() + jobs, errs := c.getJobs(), c.getErrors() + + files := make([]*File, 0, len(jobs)) + for _, job := range jobs { + files = append(files, job.report()) + } return &Report{ - Files: c.files, - Errors: c.errors, + Files: files, + Errors: errs, } } @@ -47,18 +51,21 @@ func (e *Error) UnmarshalJSON(buf []byte) error { } type File struct { - Source string `json:"source"` - SuccessTarget []string `json:"success_target"` - FailTarget map[string]string `json:"fail_target"` - RelativePath string `json:"relative_path"` - Size int64 `json:"size"` - Mode os.FileMode `json:"mode"` - ModTime time.Time `json:"mod_time"` - WriteTime time.Time `json:"write_time"` - SHA256 string `json:"sha256"` + Source string `json:"source"` + RelativePath string `json:"relative_path"` + + Status string `json:"status"` + SuccessTargets []string `json:"success_target"` + FailTargets map[string]string `json:"fail_target"` + + Size int64 `json:"size"` + Mode os.FileMode `json:"mode"` + ModTime time.Time `json:"mod_time"` + WriteTime time.Time `json:"write_time"` + SHA256 string `json:"sha256"` } type Report struct { - Files []*File - Errors []*Error + Files []*File `json:"files,omitempty"` + Errors []*Error `json:"errors,omitempty"` }