diff --git a/acp.go b/acp.go index a3ffa36..293db97 100644 --- a/acp.go +++ b/acp.go @@ -26,6 +26,8 @@ type Copyer struct { totalFiles int64 updateProgressBar func(func(bar *progressbar.ProgressBar)) + updateCopying func(func(set map[int64]struct{})) + logf func(l logrus.Level, format string, args ...any) jobsLock sync.Mutex jobs []*baseJob @@ -36,8 +38,9 @@ type Copyer struct { badDstsLock sync.Mutex badDsts map[string]error - writePipe chan *writeJob - postPipe chan *baseJob + readingFiles chan struct{} + writePipe chan *writeJob + postPipe chan *baseJob running sync.WaitGroup } @@ -55,12 +58,18 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) { } c := &Copyer{ - option: opt, - stage: StageIndex, + option: opt, + stage: StageIndex, + updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {}, - badDsts: make(map[string]error), - writePipe: make(chan *writeJob, 32), - postPipe: make(chan *baseJob, 8), + updateCopying: func(f func(set map[int64]struct{})) {}, + logf: func(l logrus.Level, format string, args ...any) { + logrus.StandardLogger().Logf(l, format, args...) + }, + + badDsts: make(map[string]error), + writePipe: make(chan *writeJob, 32), + postPipe: make(chan *baseJob, 8), } c.createFlag = os.O_WRONLY | os.O_CREATE @@ -70,6 +79,10 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) { c.createFlag |= os.O_EXCL } + if c.fromDevice.linear { + c.readingFiles = make(chan struct{}, 1) + } + c.running.Add(1) go c.run(ctx) return c, nil @@ -100,7 +113,7 @@ func (c *Copyer) run(ctx context.Context) { func (c *Copyer) reportError(file string, err error) { e := &Error{Path: file, Err: err} - logrus.Errorf(e.Error()) + c.logf(logrus.ErrorLevel, e.Error()) c.errsLock.Lock() defer c.errsLock.Unlock() diff --git a/copy.go b/copy.go index 23e979e..f734dff 100644 --- a/copy.go +++ b/copy.go @@ -13,7 +13,7 @@ import ( "github.com/abc950309/acp/mmap" "github.com/hashicorp/go-multierror" "github.com/minio/sha256-simd" - "github.com/schollz/progressbar/v3" + "github.com/sirupsen/logrus" ) const ( @@ -67,20 +67,21 @@ func (c *Copyer) copy(ctx context.Context) { } go func() { - for { - select { - case job, ok := <-c.postPipe: - if !ok { - return - } - c.post(wg, job) - case <-ctx.Done(): - return - } + for job := range c.postPipe { + c.post(wg, job) } }() - wg.Wait() + finished := make(chan struct{}, 1) + go func() { + wg.Wait() + finished <- struct{}{} + }() + + select { + case <-finished: + case <-ctx.Done(): + } } func (c *Copyer) prepare(ctx context.Context, job *baseJob) { @@ -89,7 +90,7 @@ func (c *Copyer) prepare(ctx context.Context, job *baseJob) { switch job.typ { case jobTypeDir: for _, d := range c.dst { - target := d + job.source.target(d) + target := job.source.target(d) if err := os.MkdirAll(target, job.mode&os.ModePerm); err != nil && !os.IsExist(err) { c.reportError(target, fmt.Errorf("mkdir fail, %w", err)) job.fail(target, fmt.Errorf("mkdir fail, %w", err)) @@ -102,6 +103,10 @@ func (c *Copyer) prepare(ctx context.Context, job *baseJob) { return } + if c.readingFiles != nil { + c.readingFiles <- struct{}{} + } + file, err := mmap.Open(job.source.path()) if err != nil { c.reportError(job.source.path(), fmt.Errorf("open src file fail, %w", err)) @@ -120,14 +125,19 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { var wg sync.WaitGroup defer func() { wg.Wait() + job.src.Close() + if c.readingFiles != nil { + <-c.readingFiles + } + c.postPipe <- job.baseJob }() num := atomic.AddInt64(&c.copyedFiles, 1) - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.source.relativePath)) - }) + c.logf(logrus.InfoLevel, "[%d/%d] copying: %s", num, c.totalFiles, job.source.relativePath) + c.updateCopying(func(set map[int64]struct{}) { set[num] = struct{}{} }) + defer c.updateCopying(func(set map[int64]struct{}) { delete(set, num) }) chans := make([]chan []byte, 0, len(c.dst)+1) defer func() { diff --git a/opt.go b/opt.go index 8bc790a..56937a3 100644 --- a/opt.go +++ b/opt.go @@ -166,12 +166,12 @@ func WithProgressBar(b bool) Option { } } -// func WithThreads(threads int) Option { -// return func(o *option) *option { -// o.threads = threads -// return o -// } -// } +func WithThreads(threads int) Option { + return func(o *option) *option { + o.threads = threads + return o + } +} func WithHash(b bool) Option { return func(o *option) *option { diff --git a/progress_bar.go b/progress_bar.go index 01ca079..05f95a2 100644 --- a/progress_bar.go +++ b/progress_bar.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "os" + "sort" + "strings" "sync/atomic" "time" "github.com/schollz/progressbar/v3" + "github.com/sirupsen/logrus" ) const ( @@ -30,17 +33,49 @@ func (c *Copyer) startProgressBar(ctx context.Context) { progressbar.OptionSetRenderBlankState(true), ) - ch := make(chan func(bar *progressbar.ProgressBar), 8) + ch := make(chan interface{}, 8) c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) { ch <- f } + c.updateCopying = func(f func(set map[int64]struct{})) { + ch <- f + } + c.logf = func(l logrus.Level, format string, args ...any) { + ch <- func() { logrus.StandardLogger().Logf(l, format, args...) } + } go func() { + copying := make(map[int64]struct{}, c.threads) + for f := range ch { if progressBar == nil { continue } - f(progressBar) + + switch v := f.(type) { + case func(bar *progressbar.ProgressBar): + v(progressBar) + case func(set map[int64]struct{}): + v(copying) + + idxs := make([]int64, 0, len(copying)) + for idx := range copying { + idxs = append(idxs, idx) + } + sort.Slice(idxs, func(i, j int) bool { + return idxs[i] < idxs[j] + }) + + strs := make([]string, 0, len(idxs)) + for _, idx := range idxs { + strs = append(strs, fmt.Sprintf("file %d", idx)) + } + + copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles) + progressBar.Describe(fmt.Sprintf("[%d/%d] copying... %s", copyedFiles, totalFiles, strings.Join(strs, ", "))) + case func(): + v() + } } }()