feat: add threads and fix log race

This commit is contained in:
崔竞宁
2022-09-10 21:58:43 +08:00
parent 5ec051f97a
commit 11e3201692
4 changed files with 90 additions and 32 deletions

29
acp.go
View File

@@ -26,6 +26,8 @@ type Copyer struct {
totalFiles int64
updateProgressBar func(func(bar *progressbar.ProgressBar))
updateCopying func(func(set map[int64]struct{}))
logf func(l logrus.Level, format string, args ...any)
jobsLock sync.Mutex
jobs []*baseJob
@@ -36,8 +38,9 @@ type Copyer struct {
badDstsLock sync.Mutex
badDsts map[string]error
writePipe chan *writeJob
postPipe chan *baseJob
readingFiles chan struct{}
writePipe chan *writeJob
postPipe chan *baseJob
running sync.WaitGroup
}
@@ -55,12 +58,18 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) {
}
c := &Copyer{
option: opt,
stage: StageIndex,
option: opt,
stage: StageIndex,
updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {},
badDsts: make(map[string]error),
writePipe: make(chan *writeJob, 32),
postPipe: make(chan *baseJob, 8),
updateCopying: func(f func(set map[int64]struct{})) {},
logf: func(l logrus.Level, format string, args ...any) {
logrus.StandardLogger().Logf(l, format, args...)
},
badDsts: make(map[string]error),
writePipe: make(chan *writeJob, 32),
postPipe: make(chan *baseJob, 8),
}
c.createFlag = os.O_WRONLY | os.O_CREATE
@@ -70,6 +79,10 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) {
c.createFlag |= os.O_EXCL
}
if c.fromDevice.linear {
c.readingFiles = make(chan struct{}, 1)
}
c.running.Add(1)
go c.run(ctx)
return c, nil
@@ -100,7 +113,7 @@ func (c *Copyer) run(ctx context.Context) {
func (c *Copyer) reportError(file string, err error) {
e := &Error{Path: file, Err: err}
logrus.Errorf(e.Error())
c.logf(logrus.ErrorLevel, e.Error())
c.errsLock.Lock()
defer c.errsLock.Unlock()

42
copy.go
View File

@@ -13,7 +13,7 @@ import (
"github.com/abc950309/acp/mmap"
"github.com/hashicorp/go-multierror"
"github.com/minio/sha256-simd"
"github.com/schollz/progressbar/v3"
"github.com/sirupsen/logrus"
)
const (
@@ -67,20 +67,21 @@ func (c *Copyer) copy(ctx context.Context) {
}
go func() {
for {
select {
case job, ok := <-c.postPipe:
if !ok {
return
}
c.post(wg, job)
case <-ctx.Done():
return
}
for job := range c.postPipe {
c.post(wg, job)
}
}()
wg.Wait()
finished := make(chan struct{}, 1)
go func() {
wg.Wait()
finished <- struct{}{}
}()
select {
case <-finished:
case <-ctx.Done():
}
}
func (c *Copyer) prepare(ctx context.Context, job *baseJob) {
@@ -89,7 +90,7 @@ func (c *Copyer) prepare(ctx context.Context, job *baseJob) {
switch job.typ {
case jobTypeDir:
for _, d := range c.dst {
target := d + job.source.target(d)
target := job.source.target(d)
if err := os.MkdirAll(target, job.mode&os.ModePerm); err != nil && !os.IsExist(err) {
c.reportError(target, fmt.Errorf("mkdir fail, %w", err))
job.fail(target, fmt.Errorf("mkdir fail, %w", err))
@@ -102,6 +103,10 @@ func (c *Copyer) prepare(ctx context.Context, job *baseJob) {
return
}
if c.readingFiles != nil {
c.readingFiles <- struct{}{}
}
file, err := mmap.Open(job.source.path())
if err != nil {
c.reportError(job.source.path(), fmt.Errorf("open src file fail, %w", err))
@@ -120,14 +125,19 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
var wg sync.WaitGroup
defer func() {
wg.Wait()
job.src.Close()
if c.readingFiles != nil {
<-c.readingFiles
}
c.postPipe <- job.baseJob
}()
num := atomic.AddInt64(&c.copyedFiles, 1)
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.source.relativePath))
})
c.logf(logrus.InfoLevel, "[%d/%d] copying: %s", num, c.totalFiles, job.source.relativePath)
c.updateCopying(func(set map[int64]struct{}) { set[num] = struct{}{} })
defer c.updateCopying(func(set map[int64]struct{}) { delete(set, num) })
chans := make([]chan []byte, 0, len(c.dst)+1)
defer func() {

12
opt.go
View File

@@ -166,12 +166,12 @@ func WithProgressBar(b bool) Option {
}
}
// func WithThreads(threads int) Option {
// return func(o *option) *option {
// o.threads = threads
// return o
// }
// }
func WithThreads(threads int) Option {
return func(o *option) *option {
o.threads = threads
return o
}
}
func WithHash(b bool) Option {
return func(o *option) *option {

View File

@@ -4,10 +4,13 @@ import (
"context"
"fmt"
"os"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/schollz/progressbar/v3"
"github.com/sirupsen/logrus"
)
const (
@@ -30,17 +33,49 @@ func (c *Copyer) startProgressBar(ctx context.Context) {
progressbar.OptionSetRenderBlankState(true),
)
ch := make(chan func(bar *progressbar.ProgressBar), 8)
ch := make(chan interface{}, 8)
c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) {
ch <- f
}
c.updateCopying = func(f func(set map[int64]struct{})) {
ch <- f
}
c.logf = func(l logrus.Level, format string, args ...any) {
ch <- func() { logrus.StandardLogger().Logf(l, format, args...) }
}
go func() {
copying := make(map[int64]struct{}, c.threads)
for f := range ch {
if progressBar == nil {
continue
}
f(progressBar)
switch v := f.(type) {
case func(bar *progressbar.ProgressBar):
v(progressBar)
case func(set map[int64]struct{}):
v(copying)
idxs := make([]int64, 0, len(copying))
for idx := range copying {
idxs = append(idxs, idx)
}
sort.Slice(idxs, func(i, j int) bool {
return idxs[i] < idxs[j]
})
strs := make([]string, 0, len(idxs))
for _, idx := range idxs {
strs = append(strs, fmt.Sprintf("file %d", idx))
}
copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles)
progressBar.Describe(fmt.Sprintf("[%d/%d] copying... %s", copyedFiles, totalFiles, strings.Join(strs, ", ")))
case func():
v()
}
}
}()