mirror of
https://github.com/samuelncui/acp.git
synced 2026-01-08 03:55:15 +00:00
feat: with multi threads
This commit is contained in:
126
acp.go
Normal file
126
acp.go
Normal file
@@ -0,0 +1,126 @@
|
||||
package acp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/schollz/progressbar/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
StageIndex = iota
|
||||
StageCopy
|
||||
StageFinished
|
||||
)
|
||||
|
||||
type Copyer struct {
|
||||
*option
|
||||
|
||||
createFlag int
|
||||
stage int64
|
||||
copyedBytes int64
|
||||
totalBytes int64
|
||||
copyedFiles int64
|
||||
totalFiles int64
|
||||
|
||||
updateProgressBar func(func(bar *progressbar.ProgressBar))
|
||||
|
||||
jobsLock sync.Mutex
|
||||
jobs []*baseJob
|
||||
|
||||
errsLock sync.Mutex
|
||||
errors []*Error
|
||||
|
||||
badDstsLock sync.Mutex
|
||||
badDsts map[string]error
|
||||
|
||||
writePipe chan *writeJob
|
||||
postPipe chan *baseJob
|
||||
|
||||
running sync.WaitGroup
|
||||
}
|
||||
|
||||
func New(ctx context.Context, opts ...Option) (*Copyer, error) {
|
||||
opt := newOption()
|
||||
for _, o := range opts {
|
||||
if o == nil {
|
||||
continue
|
||||
}
|
||||
opt = o(opt)
|
||||
}
|
||||
if err := opt.check(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Copyer{
|
||||
option: opt,
|
||||
stage: StageIndex,
|
||||
updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {},
|
||||
badDsts: make(map[string]error),
|
||||
writePipe: make(chan *writeJob, 32),
|
||||
postPipe: make(chan *baseJob, 8),
|
||||
}
|
||||
|
||||
c.createFlag = os.O_WRONLY | os.O_CREATE
|
||||
if c.overwrite {
|
||||
c.createFlag |= os.O_TRUNC
|
||||
} else {
|
||||
c.createFlag |= os.O_EXCL
|
||||
}
|
||||
|
||||
c.running.Add(1)
|
||||
go c.run(ctx)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Copyer) Wait() *Report {
|
||||
c.running.Wait()
|
||||
return c.Report()
|
||||
}
|
||||
|
||||
func (c *Copyer) run(ctx context.Context) {
|
||||
defer c.running.Done()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if c.withProgressBar {
|
||||
c.startProgressBar(ctx)
|
||||
}
|
||||
|
||||
c.index(ctx)
|
||||
if !c.checkJobs() {
|
||||
return
|
||||
}
|
||||
|
||||
c.copy(ctx)
|
||||
}
|
||||
|
||||
func (c *Copyer) reportError(file string, err error) {
|
||||
e := &Error{Path: file, Err: err}
|
||||
logrus.Errorf(e.Error())
|
||||
|
||||
c.errsLock.Lock()
|
||||
defer c.errsLock.Unlock()
|
||||
c.errors = append(c.errors, e)
|
||||
}
|
||||
|
||||
func (c *Copyer) getErrors() []*Error {
|
||||
c.errsLock.Lock()
|
||||
defer c.errsLock.Unlock()
|
||||
return c.errors
|
||||
}
|
||||
|
||||
func (c *Copyer) addBadDsts(dst string, err error) {
|
||||
c.badDstsLock.Lock()
|
||||
defer c.badDstsLock.Unlock()
|
||||
c.badDsts[dst] = err
|
||||
}
|
||||
|
||||
func (c *Copyer) getBadDsts() map[string]error {
|
||||
c.errsLock.Lock()
|
||||
defer c.errsLock.Unlock()
|
||||
return c.badDsts
|
||||
}
|
||||
@@ -12,10 +12,14 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
notOverwrite = flag.Bool("n", false, "not overwrite exist file")
|
||||
noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool")
|
||||
reportPath = flag.String("report", "", "json report storage path")
|
||||
targetPaths []string
|
||||
withProgressBar = flag.Bool("p", true, "display progress bar")
|
||||
notOverwrite = flag.Bool("n", false, "not overwrite exist file")
|
||||
noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool")
|
||||
reportPath = flag.String("report", "", "json report storage path")
|
||||
fromLinear = flag.Bool("from-linear", false, "json report storage path")
|
||||
toLinear = flag.Bool("to-linear", false, "json report storage path")
|
||||
|
||||
targetPaths []string
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -53,10 +57,21 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
c, err := acp.New(
|
||||
ctx, acp.Source(sources...), acp.Target(targetPaths...),
|
||||
acp.Overwrite(!*notOverwrite), acp.WithProgressBar(true), acp.WithHash(true),
|
||||
)
|
||||
opts := make([]acp.Option, 0, 8)
|
||||
opts = append(opts, acp.Source(sources...))
|
||||
opts = append(opts, acp.Target(targetPaths...))
|
||||
opts = append(opts, acp.WithHash(*reportPath != ""))
|
||||
opts = append(opts, acp.Overwrite(!*notOverwrite))
|
||||
opts = append(opts, acp.WithProgressBar(*withProgressBar))
|
||||
|
||||
if *fromLinear {
|
||||
opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true)))
|
||||
}
|
||||
if *toLinear {
|
||||
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
|
||||
}
|
||||
|
||||
c, err := acp.New(ctx, opts...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
347
copy.go
347
copy.go
@@ -2,137 +2,39 @@ package acp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"syscall"
|
||||
|
||||
"github.com/abc950309/acp/mmap"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/minio/sha256-simd"
|
||||
"github.com/schollz/progressbar/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
unexpectFileMode = os.ModeType &^ os.ModeDir
|
||||
batchSize = 1024 * 1024
|
||||
batchSize = 1024 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
sha256Pool = &sync.Pool{New: func() interface{} { return sha256.New() }}
|
||||
)
|
||||
|
||||
const (
|
||||
StageIndex = iota
|
||||
StageCopy
|
||||
StageFinished
|
||||
)
|
||||
|
||||
type Copyer struct {
|
||||
*option
|
||||
|
||||
createFlag int
|
||||
stage int64
|
||||
copyedBytes int64
|
||||
totalBytes int64
|
||||
copyedFiles int64
|
||||
totalFiles int64
|
||||
|
||||
updateProgressBar func(func(bar *progressbar.ProgressBar))
|
||||
|
||||
jobs []*Job
|
||||
writePipe chan *writeJob
|
||||
metaPipe chan *metaJob
|
||||
|
||||
wg sync.WaitGroup
|
||||
reportLock sync.Mutex
|
||||
errors []*Error
|
||||
files []*File
|
||||
}
|
||||
|
||||
func New(ctx context.Context, opts ...Option) (*Copyer, error) {
|
||||
opt := newOption()
|
||||
for _, o := range opts {
|
||||
if o == nil {
|
||||
continue
|
||||
}
|
||||
opt = o(opt)
|
||||
}
|
||||
if err := opt.check(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Copyer{
|
||||
option: opt,
|
||||
stage: StageIndex,
|
||||
writePipe: make(chan *writeJob, 32),
|
||||
metaPipe: make(chan *metaJob, 8),
|
||||
updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {},
|
||||
}
|
||||
|
||||
c.createFlag = os.O_WRONLY | os.O_CREATE
|
||||
if c.overwrite {
|
||||
c.createFlag |= os.O_TRUNC
|
||||
} else {
|
||||
c.createFlag |= os.O_EXCL
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.run(ctx)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Copyer) Wait() *Report {
|
||||
c.wg.Wait()
|
||||
return c.Report()
|
||||
}
|
||||
|
||||
func (c *Copyer) reportError(file string, err error) {
|
||||
e := &Error{Path: file, Err: err}
|
||||
logrus.Errorf(e.Error())
|
||||
|
||||
c.reportLock.Lock()
|
||||
defer c.reportLock.Unlock()
|
||||
c.errors = append(c.errors, e)
|
||||
}
|
||||
|
||||
func (c *Copyer) run(ctx context.Context) {
|
||||
defer c.wg.Done()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if c.withProgressBar {
|
||||
c.startProgressBar(ctx)
|
||||
}
|
||||
|
||||
c.index(ctx)
|
||||
c.copy(ctx)
|
||||
}
|
||||
|
||||
func (c *Copyer) index(ctx context.Context) {
|
||||
for _, s := range c.src {
|
||||
c.walk(s.base, s.path)
|
||||
}
|
||||
|
||||
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes))
|
||||
bar.Describe(fmt.Sprintf("[0/%d] index finished...", atomic.LoadInt64(&c.totalFiles)))
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Copyer) copy(ctx context.Context) {
|
||||
atomic.StoreInt64(&c.stage, StageCopy)
|
||||
defer atomic.StoreInt64(&c.stage, StageFinished)
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(c.writePipe)
|
||||
|
||||
for _, job := range c.jobs {
|
||||
for _, job := range c.getJobs() {
|
||||
c.prepare(ctx, job)
|
||||
|
||||
select {
|
||||
@@ -143,111 +45,96 @@ func (c *Copyer) copy(ctx context.Context) {
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < c.threads; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-c.writePipe:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
c.write(ctx, job)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(c.metaPipe)
|
||||
|
||||
for job := range c.writePipe {
|
||||
c.write(ctx, job)
|
||||
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-c.postPipe:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.post(wg, job)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for file := range c.metaPipe {
|
||||
c.meta(file)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (c *Copyer) walk(base, path string) {
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
c.reportError(path, fmt.Errorf("walk get stat, %w", err))
|
||||
return
|
||||
}
|
||||
func (c *Copyer) prepare(ctx context.Context, job *baseJob) {
|
||||
job.setStatus(jobStatusPreparing)
|
||||
|
||||
job, err := newJobFromFileInfo(base, path, stat)
|
||||
if err != nil {
|
||||
c.reportError(path, fmt.Errorf("make job fail, %w", err))
|
||||
return
|
||||
}
|
||||
if job.Mode&unexpectFileMode != 0 {
|
||||
return
|
||||
}
|
||||
if !job.Mode.IsDir() {
|
||||
atomic.AddInt64(&c.totalFiles, 1)
|
||||
atomic.AddInt64(&c.totalBytes, job.Size)
|
||||
c.jobs = append(c.jobs, job)
|
||||
return
|
||||
}
|
||||
|
||||
enterJob := new(Job)
|
||||
*enterJob = *job
|
||||
enterJob.Type = JobTypeEnterDir
|
||||
c.jobs = append(c.jobs, enterJob)
|
||||
|
||||
files, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
c.reportError(path, fmt.Errorf("walk read dir, %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
c.walk(base, path+"/"+file.Name())
|
||||
}
|
||||
|
||||
exitJob := new(Job)
|
||||
*exitJob = *job
|
||||
exitJob.Type = JobTypeExitDir
|
||||
c.jobs = append(c.jobs, exitJob)
|
||||
}
|
||||
|
||||
func (c *Copyer) prepare(ctx context.Context, job *Job) {
|
||||
switch job.Type {
|
||||
case JobTypeEnterDir:
|
||||
switch job.typ {
|
||||
case jobTypeDir:
|
||||
for _, d := range c.dst {
|
||||
name := d + job.RelativePath
|
||||
err := os.Mkdir(name, job.Mode&os.ModePerm)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
c.reportError(name, fmt.Errorf("mkdir fail, %w", err))
|
||||
target := d + job.source.target(d)
|
||||
if err := os.MkdirAll(target, job.mode&os.ModePerm); err != nil && !os.IsExist(err) {
|
||||
c.reportError(target, fmt.Errorf("mkdir fail, %w", err))
|
||||
job.fail(target, fmt.Errorf("mkdir fail, %w", err))
|
||||
continue
|
||||
}
|
||||
job.succes(target)
|
||||
}
|
||||
return
|
||||
case JobTypeExitDir:
|
||||
c.writePipe <- &writeJob{Job: job}
|
||||
|
||||
c.writePipe <- &writeJob{baseJob: job}
|
||||
return
|
||||
}
|
||||
|
||||
name := job.Source
|
||||
file, err := mmap.Open(name)
|
||||
file, err := mmap.Open(job.source.path())
|
||||
if err != nil {
|
||||
c.reportError(name, fmt.Errorf("open src file fail, %w", err))
|
||||
c.reportError(job.source.path(), fmt.Errorf("open src file fail, %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
c.writePipe <- &writeJob{Job: job, src: file}
|
||||
c.writePipe <- &writeJob{baseJob: job, src: file}
|
||||
}
|
||||
|
||||
func (c *Copyer) write(ctx context.Context, job *writeJob) {
|
||||
if job.src == nil {
|
||||
c.metaPipe <- &metaJob{Job: job.Job}
|
||||
job.setStatus(jobStatusCopying)
|
||||
if job.typ != jobTypeNormal {
|
||||
return
|
||||
}
|
||||
defer job.src.Close()
|
||||
|
||||
num := atomic.AddInt64(&c.copyedFiles, 1)
|
||||
go c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.RelativePath))
|
||||
})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var lock sync.Mutex
|
||||
var readErr error
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
job.src.Close()
|
||||
c.postPipe <- job.baseJob
|
||||
}()
|
||||
|
||||
num := atomic.AddInt64(&c.copyedFiles, 1)
|
||||
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.Describe(fmt.Sprintf("[%d/%d] %s", num, c.totalFiles, job.source.relativePath))
|
||||
})
|
||||
|
||||
chans := make([]chan []byte, 0, len(c.dst)+1)
|
||||
next := &metaJob{Job: job.Job, failTarget: make(map[string]string)}
|
||||
defer func() {
|
||||
for _, ch := range chans {
|
||||
close(ch)
|
||||
}
|
||||
}()
|
||||
|
||||
if c.withHash {
|
||||
sha := sha256Pool.Get().(hash.Hash)
|
||||
@@ -265,21 +152,24 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
|
||||
sha.Write(buf)
|
||||
}
|
||||
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
next.hash = sha.Sum(nil)
|
||||
job.setHash(sha.Sum(nil))
|
||||
}()
|
||||
}
|
||||
|
||||
var readErr error
|
||||
badDsts := c.getBadDsts()
|
||||
for _, d := range c.dst {
|
||||
name := d + job.RelativePath
|
||||
file, err := os.OpenFile(name, c.createFlag, job.Mode)
|
||||
dst := d
|
||||
|
||||
name := job.source.target(dst)
|
||||
if e, has := badDsts[dst]; has && e != nil {
|
||||
job.fail(name, fmt.Errorf("bad target path, %w", e))
|
||||
}
|
||||
|
||||
file, err := os.OpenFile(name, c.createFlag, job.mode)
|
||||
if err != nil {
|
||||
c.reportError(name, fmt.Errorf("open dst file fail, %w", err))
|
||||
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
next.failTarget[name] = fmt.Errorf("open dst file fail, %w", err).Error()
|
||||
job.fail(name, fmt.Errorf("open dst file fail, %w", err))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -293,9 +183,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
|
||||
var rerr error
|
||||
defer func() {
|
||||
if rerr == nil {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
next.successTarget = append(next.successTarget, name)
|
||||
job.succes(name)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -307,29 +195,24 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
|
||||
rerr = multierror.Append(rerr, re)
|
||||
}
|
||||
|
||||
c.reportError(name, rerr)
|
||||
// if no space
|
||||
if errors.Is(err, syscall.ENOSPC) {
|
||||
c.addBadDsts(dst, err)
|
||||
}
|
||||
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
next.failTarget[name] = rerr.Error()
|
||||
c.reportError(name, rerr)
|
||||
job.fail(name, rerr)
|
||||
}()
|
||||
|
||||
defer file.Close()
|
||||
for buf := range ch {
|
||||
nr := len(buf)
|
||||
|
||||
n, err := file.Write(buf)
|
||||
if n < 0 || nr < n {
|
||||
if err == nil {
|
||||
rerr = fmt.Errorf("write fail, unexpected return, byte_num= %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
rerr = fmt.Errorf("write fail, %w", err)
|
||||
return
|
||||
}
|
||||
if nr != n {
|
||||
rerr = fmt.Errorf("write fail, write and read bytes not equal, read= %d write= %d", nr, n)
|
||||
if len(buf) != n {
|
||||
rerr = fmt.Errorf("write fail, unexpected writen bytes return, read= %d write= %d", len(buf), n)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -340,15 +223,9 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
|
||||
}()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for _, ch := range chans {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
c.metaPipe <- next
|
||||
}()
|
||||
|
||||
if len(chans) == 0 {
|
||||
return
|
||||
}
|
||||
readErr = c.streamCopy(ctx, chans, job.src)
|
||||
}
|
||||
|
||||
@@ -381,31 +258,23 @@ func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.R
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Copyer) meta(job *metaJob) {
|
||||
if job.Mode.IsDir() {
|
||||
for _, d := range c.dst {
|
||||
if err := os.Chtimes(d+job.RelativePath, job.ModTime, job.ModTime); err != nil {
|
||||
c.reportError(d+job.RelativePath, fmt.Errorf("change info, chtimes fail, %w", err))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
func (c *Copyer) post(wg *sync.WaitGroup, job *baseJob) {
|
||||
defer wg.Done()
|
||||
|
||||
c.files = append(c.files, &File{
|
||||
Source: job.Source,
|
||||
SuccessTarget: job.successTarget,
|
||||
FailTarget: job.failTarget,
|
||||
RelativePath: job.RelativePath,
|
||||
Size: job.Size,
|
||||
Mode: job.Mode,
|
||||
ModTime: job.ModTime,
|
||||
WriteTime: time.Now(),
|
||||
SHA256: hex.EncodeToString(job.hash),
|
||||
})
|
||||
|
||||
for _, name := range job.successTarget {
|
||||
if err := os.Chtimes(name, job.ModTime, job.ModTime); err != nil {
|
||||
job.setStatus(jobStatusFinishing)
|
||||
for _, name := range job.successTargets {
|
||||
if err := os.Chtimes(name, job.modTime, job.modTime); err != nil {
|
||||
c.reportError(name, fmt.Errorf("change info, chtimes fail, %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
job.setStatus(jobStatusFinished)
|
||||
if job.parent == nil {
|
||||
return
|
||||
}
|
||||
|
||||
left := job.parent.done(job)
|
||||
if left == 0 {
|
||||
c.postPipe <- job.parent
|
||||
}
|
||||
}
|
||||
|
||||
138
index.go
Normal file
138
index.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package acp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/schollz/progressbar/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
unexpectFileMode = os.ModeType &^ os.ModeDir
|
||||
)
|
||||
|
||||
func (c *Copyer) index(ctx context.Context) {
|
||||
for _, s := range c.src {
|
||||
c.walk(nil, s)
|
||||
}
|
||||
|
||||
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes))
|
||||
bar.Describe(fmt.Sprintf("[0/%d] index finished...", atomic.LoadInt64(&c.totalFiles)))
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Copyer) walk(parent *baseJob, src *source) *baseJob {
|
||||
path := src.path()
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
c.reportError(path, fmt.Errorf("walk get stat, %w", err))
|
||||
return nil
|
||||
}
|
||||
if stat.Mode()&unexpectFileMode != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
job, err := newJobFromFileInfo(parent, src, stat)
|
||||
if err != nil {
|
||||
c.reportError(path, fmt.Errorf("make job fail, %w", err))
|
||||
return nil
|
||||
}
|
||||
|
||||
c.appendJobs(job)
|
||||
if job.typ == jobTypeNormal {
|
||||
totalBytes := atomic.AddInt64(&c.totalBytes, job.size)
|
||||
totalFiles := atomic.AddInt64(&c.totalFiles, 1)
|
||||
|
||||
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.ChangeMax64(totalBytes)
|
||||
bar.Describe(fmt.Sprintf("[0/%d] indexing...", totalFiles))
|
||||
})
|
||||
|
||||
return job
|
||||
}
|
||||
|
||||
files, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
c.reportError(path, fmt.Errorf("walk read dir, %w", err))
|
||||
return nil
|
||||
}
|
||||
|
||||
job.children = make(map[*baseJob]struct{}, len(files))
|
||||
for _, file := range files {
|
||||
id := c.walk(job, &source{base: src.base, relativePath: src.relativePath + "/" + file.Name()})
|
||||
if id == nil {
|
||||
continue
|
||||
}
|
||||
job.children[id] = struct{}{}
|
||||
}
|
||||
|
||||
return job
|
||||
}
|
||||
|
||||
func (c *Copyer) appendJobs(jobs ...*baseJob) {
|
||||
c.jobsLock.Lock()
|
||||
defer c.jobsLock.Unlock()
|
||||
c.jobs = append(c.jobs, jobs...)
|
||||
}
|
||||
|
||||
func (c *Copyer) getJobs() []*baseJob {
|
||||
c.jobsLock.Lock()
|
||||
defer c.jobsLock.Unlock()
|
||||
return c.jobs
|
||||
}
|
||||
|
||||
func (c *Copyer) checkJobs() bool {
|
||||
c.jobsLock.Lock()
|
||||
defer c.jobsLock.Unlock()
|
||||
|
||||
if len(c.jobs) == 0 {
|
||||
c.reportError("", fmt.Errorf("cannot found available jobs"))
|
||||
return false
|
||||
}
|
||||
|
||||
sort.Slice(c.jobs, func(i int, j int) bool {
|
||||
return c.jobs[i].comparableRelativePath < c.jobs[j].comparableRelativePath
|
||||
})
|
||||
|
||||
var last *baseJob
|
||||
filtered := make([]*baseJob, 0, len(c.jobs))
|
||||
for _, job := range c.jobs {
|
||||
if last == nil {
|
||||
filtered = append(filtered, job)
|
||||
last = job
|
||||
continue
|
||||
}
|
||||
if last.source.relativePath != job.source.relativePath {
|
||||
filtered = append(filtered, job)
|
||||
last = job
|
||||
continue
|
||||
}
|
||||
|
||||
if last.typ != job.typ {
|
||||
c.reportError(job.source.path(), fmt.Errorf("same relative path with different type, '%s' and '%s'", job.source.path(), last.source.path()))
|
||||
return false
|
||||
}
|
||||
if last.typ == jobTypeNormal {
|
||||
c.reportError(job.source.path(), fmt.Errorf("same relative path as normal file, ignored, '%s'", job.source.path()))
|
||||
continue
|
||||
}
|
||||
|
||||
func() {
|
||||
last.lock.Lock()
|
||||
defer last.lock.Unlock()
|
||||
|
||||
for n := range job.children {
|
||||
last.children[n] = struct{}{}
|
||||
n.parent = last
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
c.jobs = filtered
|
||||
return true
|
||||
}
|
||||
163
job.go
163
job.go
@@ -1,57 +1,154 @@
|
||||
package acp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/abc950309/acp/mmap"
|
||||
)
|
||||
|
||||
type JobType uint8
|
||||
type jobType uint8
|
||||
|
||||
const (
|
||||
JobTypeNormal = JobType(iota)
|
||||
JobTypeEnterDir
|
||||
JobTypeExitDir
|
||||
jobTypeNormal = jobType(iota)
|
||||
jobTypeDir
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
Source string
|
||||
RelativePath string
|
||||
Type JobType
|
||||
Name string // base name of the file
|
||||
Size int64 // length in bytes for regular files; system-dependent for others
|
||||
Mode os.FileMode // file mode bits
|
||||
ModTime time.Time // modification time
|
||||
type jobStatus uint8
|
||||
|
||||
const (
|
||||
jobStatusPending = jobStatus(iota)
|
||||
jobStatusPreparing
|
||||
jobStatusCopying
|
||||
jobStatusFinishing
|
||||
jobStatusFinished
|
||||
)
|
||||
|
||||
var (
|
||||
statusMapping = map[jobStatus]string{
|
||||
jobStatusPending: "pending",
|
||||
jobStatusPreparing: "preparing",
|
||||
jobStatusCopying: "copying",
|
||||
jobStatusFinishing: "finishing",
|
||||
jobStatusFinished: "finished",
|
||||
}
|
||||
)
|
||||
|
||||
type baseJob struct {
|
||||
parent *baseJob
|
||||
source *source
|
||||
typ jobType
|
||||
|
||||
name string // base name of the file
|
||||
size int64 // length in bytes for regular files; system-dependent for others
|
||||
mode os.FileMode // file mode bits
|
||||
modTime time.Time // modification time
|
||||
|
||||
lock sync.Mutex
|
||||
writeTime time.Time
|
||||
status jobStatus
|
||||
children map[*baseJob]struct{}
|
||||
|
||||
successTargets []string
|
||||
failedTargets map[string]error
|
||||
hash []byte
|
||||
|
||||
// utils
|
||||
comparableRelativePath string
|
||||
}
|
||||
|
||||
func newJobFromFileInfo(base, path string, info os.FileInfo) (*Job, error) {
|
||||
if !strings.HasPrefix(path, base) {
|
||||
return nil, fmt.Errorf("path do not contains base, path= '%s', base= '%s'", path, base)
|
||||
func newJobFromFileInfo(parent *baseJob, source *source, info os.FileInfo) (*baseJob, error) {
|
||||
job := &baseJob{
|
||||
parent: parent,
|
||||
source: source,
|
||||
|
||||
name: info.Name(),
|
||||
size: info.Size(),
|
||||
mode: info.Mode(),
|
||||
modTime: info.ModTime(),
|
||||
|
||||
comparableRelativePath: strings.ReplaceAll(source.relativePath, "/", "\x00"),
|
||||
}
|
||||
if job.mode.IsDir() {
|
||||
job.typ = jobTypeDir
|
||||
}
|
||||
|
||||
job := &Job{
|
||||
Source: path,
|
||||
RelativePath: path[len(base):],
|
||||
Name: info.Name(),
|
||||
Size: info.Size(),
|
||||
Mode: info.Mode(),
|
||||
ModTime: info.ModTime(),
|
||||
}
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (j *baseJob) setStatus(s jobStatus) {
|
||||
j.lock.Lock()
|
||||
defer j.lock.Unlock()
|
||||
j.status = s
|
||||
|
||||
if s == jobStatusCopying {
|
||||
j.writeTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (j *baseJob) setHash(h []byte) {
|
||||
j.lock.Lock()
|
||||
defer j.lock.Unlock()
|
||||
j.hash = h
|
||||
}
|
||||
|
||||
func (j *baseJob) done(child *baseJob) int {
|
||||
if j.typ == jobTypeNormal {
|
||||
return 0
|
||||
}
|
||||
|
||||
j.lock.Lock()
|
||||
defer j.lock.Unlock()
|
||||
|
||||
delete(j.children, child)
|
||||
return len(j.children)
|
||||
}
|
||||
|
||||
func (j *baseJob) succes(path string) {
|
||||
j.lock.Lock()
|
||||
defer j.lock.Unlock()
|
||||
j.successTargets = append(j.successTargets, path)
|
||||
}
|
||||
|
||||
func (j *baseJob) fail(path string, err error) {
|
||||
j.lock.Lock()
|
||||
defer j.lock.Unlock()
|
||||
|
||||
if j.failedTargets == nil {
|
||||
j.failedTargets = make(map[string]error, 1)
|
||||
}
|
||||
j.failedTargets[path] = err
|
||||
}
|
||||
|
||||
func (j *baseJob) report() *File {
|
||||
j.lock.Lock()
|
||||
defer j.lock.Unlock()
|
||||
|
||||
fails := make(map[string]string, len(j.failedTargets))
|
||||
for n, e := range j.failedTargets {
|
||||
fails[n] = e.Error()
|
||||
}
|
||||
|
||||
return &File{
|
||||
Source: j.source.path(),
|
||||
RelativePath: j.source.relativePath,
|
||||
|
||||
Status: statusMapping[j.status],
|
||||
SuccessTargets: j.successTargets,
|
||||
FailTargets: fails,
|
||||
|
||||
Size: j.size,
|
||||
Mode: j.mode,
|
||||
ModTime: j.modTime,
|
||||
WriteTime: j.writeTime,
|
||||
SHA256: hex.EncodeToString(j.hash),
|
||||
}
|
||||
}
|
||||
|
||||
type writeJob struct {
|
||||
*Job
|
||||
*baseJob
|
||||
src *mmap.ReaderAt
|
||||
}
|
||||
|
||||
type metaJob struct {
|
||||
*Job
|
||||
|
||||
successTarget []string
|
||||
failTarget map[string]string
|
||||
hash []byte
|
||||
}
|
||||
|
||||
58
opt.go
58
opt.go
@@ -4,18 +4,24 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type source struct {
|
||||
path string
|
||||
base string
|
||||
name string
|
||||
base string
|
||||
relativePath string
|
||||
}
|
||||
|
||||
func (s *source) path() string {
|
||||
return s.base + s.relativePath
|
||||
}
|
||||
|
||||
func (s *source) target(dst string) string {
|
||||
return dst + s.relativePath
|
||||
}
|
||||
|
||||
type option struct {
|
||||
src []source
|
||||
src []*source
|
||||
dst []string
|
||||
|
||||
fromDevice *deviceOption
|
||||
@@ -23,8 +29,8 @@ type option struct {
|
||||
|
||||
overwrite bool
|
||||
withProgressBar bool
|
||||
// threads int
|
||||
withHash bool
|
||||
threads int
|
||||
withHash bool
|
||||
}
|
||||
|
||||
func newOption() *option {
|
||||
@@ -61,20 +67,17 @@ func (o *option) check() error {
|
||||
return fmt.Errorf("source path not found")
|
||||
}
|
||||
for _, s := range o.src {
|
||||
if _, err := os.Stat(s.path); err != nil {
|
||||
return fmt.Errorf("check src path '%s', %w", s.path, err)
|
||||
if _, err := os.Stat(s.path()); err != nil {
|
||||
return fmt.Errorf("check src path '%s', %w", s.path(), err)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(o.src, func(i, j int) bool {
|
||||
return o.src[i].name < o.src[j].name
|
||||
})
|
||||
for idx := 1; idx < len(o.src); idx++ {
|
||||
if o.src[idx].name == o.src[idx-1].name {
|
||||
return fmt.Errorf("have same name source path, '%s' and '%s'", o.src[idx-1].path, o.src[idx].path)
|
||||
}
|
||||
if o.threads < 1 {
|
||||
o.threads = 4
|
||||
}
|
||||
if o.fromDevice.linear || o.toDevice.linear {
|
||||
o.threads = 1
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -87,13 +90,32 @@ func Source(paths ...string) Option {
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
p = path.Clean(p)
|
||||
|
||||
if p[len(p)-1] == '/' {
|
||||
p = p[:len(p)-1]
|
||||
}
|
||||
|
||||
base, name := path.Split(p)
|
||||
o.src = append(o.src, source{path: p, base: base, name: name})
|
||||
o.src = append(o.src, &source{base: base, relativePath: name})
|
||||
}
|
||||
return o
|
||||
}
|
||||
}
|
||||
|
||||
func AccurateSource(base string, relativePaths ...string) Option {
|
||||
return func(o *option) *option {
|
||||
for _, p := range relativePaths {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if p[len(p)-1] == '/' {
|
||||
p = p[:len(p)-1]
|
||||
}
|
||||
|
||||
o.src = append(o.src, &source{base: base, relativePath: p})
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -16,8 +15,6 @@ const (
|
||||
)
|
||||
|
||||
func (c *Copyer) startProgressBar(ctx context.Context) {
|
||||
var lock sync.Mutex
|
||||
|
||||
// progressBar := progressbar.DefaultBytes(1, "[0/0] indexing...")
|
||||
progressBar := progressbar.NewOptions64(
|
||||
1,
|
||||
@@ -33,37 +30,41 @@ func (c *Copyer) startProgressBar(ctx context.Context) {
|
||||
progressbar.OptionSetRenderBlankState(true),
|
||||
)
|
||||
|
||||
ch := make(chan func(bar *progressbar.ProgressBar), 8)
|
||||
c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
if progressBar == nil {
|
||||
return
|
||||
}
|
||||
f(progressBar)
|
||||
ch <- f
|
||||
}
|
||||
|
||||
go func() {
|
||||
for f := range ch {
|
||||
if progressBar == nil {
|
||||
continue
|
||||
}
|
||||
f(progressBar)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
ticker := time.NewTicker(barUpdateInterval) // around 255ms, avoid conflict with progress bar fresh by second
|
||||
defer ticker.Stop()
|
||||
|
||||
var lastCopyedBytes int64
|
||||
for range ticker.C {
|
||||
switch atomic.LoadInt64(&c.stage) {
|
||||
case StageIndex:
|
||||
go c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes))
|
||||
bar.Describe(fmt.Sprintf("[0/%d] indexing...", atomic.LoadInt64(&c.totalFiles)))
|
||||
})
|
||||
case StageCopy:
|
||||
currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes)
|
||||
diff := currentCopyedBytes - lastCopyedBytes
|
||||
lastCopyedBytes = currentCopyedBytes
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
switch atomic.LoadInt64(&c.stage) {
|
||||
case StageCopy:
|
||||
currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes)
|
||||
diff := currentCopyedBytes - lastCopyedBytes
|
||||
lastCopyedBytes = currentCopyedBytes
|
||||
|
||||
go c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.Add64(diff)
|
||||
})
|
||||
case StageFinished:
|
||||
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.Add64(diff)
|
||||
})
|
||||
}
|
||||
case <-ctx.Done():
|
||||
currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes)
|
||||
diff := int(currentCopyedBytes - lastCopyedBytes)
|
||||
lastCopyedBytes = currentCopyedBytes
|
||||
@@ -73,17 +74,7 @@ func (c *Copyer) startProgressBar(ctx context.Context) {
|
||||
bar.Add(diff)
|
||||
bar.Describe(fmt.Sprintf("[%d/%d] finished!", copyedFiles, totalFiles))
|
||||
})
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
time.Sleep(barUpdateInterval)
|
||||
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
|
||||
bar.Close()
|
||||
progressBar = nil
|
||||
})
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
37
report.go
37
report.go
@@ -8,12 +8,16 @@ import (
|
||||
)
|
||||
|
||||
func (c *Copyer) Report() *Report {
|
||||
c.reportLock.Lock()
|
||||
defer c.reportLock.Unlock()
|
||||
jobs, errs := c.getJobs(), c.getErrors()
|
||||
|
||||
files := make([]*File, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
files = append(files, job.report())
|
||||
}
|
||||
|
||||
return &Report{
|
||||
Files: c.files,
|
||||
Errors: c.errors,
|
||||
Files: files,
|
||||
Errors: errs,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,18 +51,21 @@ func (e *Error) UnmarshalJSON(buf []byte) error {
|
||||
}
|
||||
|
||||
type File struct {
|
||||
Source string `json:"source"`
|
||||
SuccessTarget []string `json:"success_target"`
|
||||
FailTarget map[string]string `json:"fail_target"`
|
||||
RelativePath string `json:"relative_path"`
|
||||
Size int64 `json:"size"`
|
||||
Mode os.FileMode `json:"mode"`
|
||||
ModTime time.Time `json:"mod_time"`
|
||||
WriteTime time.Time `json:"write_time"`
|
||||
SHA256 string `json:"sha256"`
|
||||
Source string `json:"source"`
|
||||
RelativePath string `json:"relative_path"`
|
||||
|
||||
Status string `json:"status"`
|
||||
SuccessTargets []string `json:"success_target"`
|
||||
FailTargets map[string]string `json:"fail_target"`
|
||||
|
||||
Size int64 `json:"size"`
|
||||
Mode os.FileMode `json:"mode"`
|
||||
ModTime time.Time `json:"mod_time"`
|
||||
WriteTime time.Time `json:"write_time"`
|
||||
SHA256 string `json:"sha256"`
|
||||
}
|
||||
|
||||
type Report struct {
|
||||
Files []*File
|
||||
Errors []*Error
|
||||
Files []*File `json:"files,omitempty"`
|
||||
Errors []*Error `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user