diff --git a/acp.go b/acp.go index f43ef79..51d9ef9 100644 --- a/acp.go +++ b/acp.go @@ -2,48 +2,15 @@ package acp import ( "context" - "os" "sync" - "github.com/schollz/progressbar/v3" "github.com/sirupsen/logrus" ) -const ( - StageIndex = iota - StageCopy - StageFinished -) - type Copyer struct { *option - - createFlag int - stage int64 - copyedBytes int64 - totalBytes int64 - copyedFiles int64 - totalFiles int64 - - updateProgressBar func(func(bar *progressbar.ProgressBar)) - updateCopying func(func(set map[int64]struct{})) - logf func(l logrus.Level, format string, args ...any) - - jobsLock sync.Mutex - jobs []*baseJob - noSpaceSource []*source - - errsLock sync.Mutex - errors []*Error - - badDstsLock sync.Mutex - badDsts map[string]error - - readingFiles chan struct{} - writePipe chan *writeJob - postPipe chan *baseJob - running sync.WaitGroup + eventCh chan Event } func New(ctx context.Context, opts ...Option) (*Copyer, error) { @@ -59,90 +26,92 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) { } c := &Copyer{ - option: opt, - stage: StageIndex, - - updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {}, - updateCopying: func(f func(set map[int64]struct{})) {}, - - badDsts: make(map[string]error), - writePipe: make(chan *writeJob, 32), - postPipe: make(chan *baseJob, 8), - } - - c.createFlag = os.O_WRONLY | os.O_CREATE - if c.overwrite { - c.createFlag |= os.O_TRUNC - } else { - c.createFlag |= os.O_EXCL - } - - if c.fromDevice.linear { - c.readingFiles = make(chan struct{}, 1) - } - - if opt.logger != nil { - c.logf = func(l logrus.Level, format string, args ...any) { opt.logger.Logf(l, format, args...) } - } else { - c.logf = func(l logrus.Level, format string, args ...any) { logrus.StandardLogger().Logf(l, format, args...) } + option: opt, + eventCh: make(chan Event, 128), } c.running.Add(1) go wrap(ctx, func() { c.run(ctx) }) + return c, nil } -func (c *Copyer) Wait() *Report { +func (c *Copyer) Wait() { c.running.Wait() - return c.Report() } -func (c *Copyer) run(ctx context.Context) { +func (c *Copyer) run(ctx context.Context) error { defer c.running.Done() ctx, cancel := context.WithCancel(ctx) defer cancel() - if c.withProgressBar { - c.startProgressBar(ctx) + go wrap(ctx, func() { c.eventLoop(ctx) }) + + indexed, err := c.index(ctx) + if err != nil { + return err } - c.index(ctx) - if !c.checkJobs() { - return - } + prepared := c.prepare(ctx, indexed) + copyed := c.copy(ctx, prepared) + c.cleanupJob(ctx, copyed) - if err := c.applyAutoFillLimit(); err != nil { - c.reportError("_autofill", err) - return - } - - c.copy(ctx) + return nil } -func (c *Copyer) reportError(file string, err error) { - e := &Error{Path: file, Err: err} +func (c *Copyer) eventLoop(ctx context.Context) { + chans := make([]chan Event, len(c.eventHanders)) + for idx := range chans { + chans[idx] = make(chan Event, 128) + } + + for idx, ch := range chans { + handler := c.eventHanders[idx] + events := ch + + go wrap(ctx, func() { + for { + e, ok := <-events + if !ok { + handler(&EventFinished{}) + return + } + handler(e) + } + }) + } + + defer func() { + for _, ch := range chans { + close(ch) + } + }() + for { + select { + case e, ok := <-c.eventCh: + if !ok { + return + } + for _, ch := range chans { + ch <- e + } + case <-ctx.Done(): + return + } + } +} + +func (c *Copyer) logf(l logrus.Level, format string, args ...any) { + c.logger.Logf(l, format, args...) +} + +func (c *Copyer) submit(e Event) { + c.eventCh <- e +} + +func (c *Copyer) reportError(src, dst string, err error) { + e := &Error{Src: src, Dst: dst, Err: err} c.logf(logrus.ErrorLevel, e.Error()) - - c.errsLock.Lock() - defer c.errsLock.Unlock() - c.errors = append(c.errors, e) -} - -func (c *Copyer) getErrors() []*Error { - c.errsLock.Lock() - defer c.errsLock.Unlock() - return c.errors -} - -func (c *Copyer) addBadDsts(dst string, err error) { - c.badDstsLock.Lock() - defer c.badDstsLock.Unlock() - c.badDsts[dst] = err -} - -func (c *Copyer) getBadDsts() map[string]error { - c.errsLock.Lock() - defer c.errsLock.Unlock() - return c.badDsts + c.submit(&EventReportError{Error: e}) } diff --git a/autofill.go b/autofill.go deleted file mode 100644 index 33a6827..0000000 --- a/autofill.go +++ /dev/null @@ -1,86 +0,0 @@ -package acp - -import ( - "fmt" - "math" - "strings" -) - -func (c *Copyer) applyAutoFillLimit() error { - if !c.autoFill { - return nil - } - - counts := make(map[string]int, len(c.dst)) - infos := make(map[string]*fileSystem, len(c.dst)) - for _, d := range c.dst { - fsInfo, err := getFileSystem(d) - if err != nil { - return fmt.Errorf("get file system fail, %s, %w", d, err) - } - - infos[d] = fsInfo - counts[d] = counts[d] + 1 - } - - min := int64(math.MaxInt64) - for mp, info := range infos { - size := info.AvailableSize / int64(counts[mp]) - if size < min { - min = size - } - } - - c.jobsLock.Lock() - defer c.jobsLock.Unlock() - - idx := c.getAutoFillCutoffIdx(min) - if idx < 0 { - return nil - } - if idx == 0 { - return fmt.Errorf("cannot found available auto fill slice, filesystem_size= %d", min) - } - - cutoff := c.jobs[idx:] - c.jobs = c.jobs[:idx] - last := "" - for _, job := range cutoff { - if job.parent != nil { - job.parent.done(job) - } - if strings.HasPrefix(job.source.relativePath, last) { - continue - } - - c.noSpaceSource = append(c.noSpaceSource, job.source) - last = job.source.relativePath + "/" - } - return nil -} - -func (c *Copyer) getAutoFillCutoffIdx(limit int64) int { - left := limit - targetIdx := -1 - for idx, job := range c.jobs { - left -= job.size - if left < 0 { - targetIdx = idx - } - } - if targetIdx < 0 { - return -1 - } - - if c.autoFillSplitDepth <= 0 { - return targetIdx - } - - for idx := targetIdx; idx >= 0; idx-- { - if strings.Count(c.jobs[idx].source.relativePath, "/") < c.autoFillSplitDepth { - return idx - } - } - - return 0 -} diff --git a/cleanup.go b/cleanup.go new file mode 100644 index 0000000..26fa873 --- /dev/null +++ b/cleanup.go @@ -0,0 +1,27 @@ +package acp + +import ( + "context" + "fmt" + "os" +) + +func (c *Copyer) cleanupJob(ctx context.Context, copyed <-chan *baseJob) { + for { + select { + case job, ok := <-copyed: + if !ok { + return + } + for _, name := range job.successTargets { + if err := os.Chtimes(name, job.modTime, job.modTime); err != nil { + c.reportError(job.source.src(), name, fmt.Errorf("change info, chtimes fail, %w", err)) + } + } + + job.setStatus(jobStatusFinished) + case <-ctx.Done(): + return + } + } +} diff --git a/cmd/acp/main.go b/cmd/acp/main.go index 0d8f579..456bc77 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -2,25 +2,27 @@ package main import ( "context" - "encoding/json" "flag" + "fmt" "os" "os/signal" + "unsafe" "github.com/abc950309/acp" + jsoniter "github.com/json-iterator/go" + "github.com/modern-go/reflect2" "github.com/sirupsen/logrus" ) var ( withProgressBar = flag.Bool("p", true, "display progress bar") notOverwrite = flag.Bool("n", false, "not overwrite exist file") - continueReport = flag.String("c", "", "continue with previous report, for auto fill circumstances") - noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool") - reportPath = flag.String("report", "", "json report storage path") - reportIndent = flag.Bool("report-indent", false, "json report with indent") - fromLinear = flag.Bool("from-linear", false, "copy from linear device, such like tape drive") - toLinear = flag.Bool("to-linear", false, "copy to linear device, such like tape drive") - autoFillDepth = flag.Int("auto-fill-depth", 0, "auto fill the whole filesystem, split by specify filepath depth, and report will output left filepaths") + // continueReport = flag.String("c", "", "continue with previous report, for auto fill circumstances") + noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool") + reportPath = flag.String("report", "", "json report storage path") + reportIndent = flag.Bool("report-indent", false, "json report with indent") + fromLinear = flag.Bool("from-linear", false, "copy from linear device, such like tape drive") + toLinear = flag.Bool("to-linear", false, "copy to linear device, such like tape drive") targetPaths []string ) @@ -61,29 +63,31 @@ func main() { }() opts := make([]acp.Option, 0, 8) - if *continueReport != "" { - f, err := os.Open(*continueReport) - if err != nil { - logrus.Fatalf("cannot open continue report file, %s", err) - } + opts = append(opts, acp.Source(sources...)) + // if *continueReport != "" { + // f, err := os.Open(*continueReport) + // if err != nil { + // logrus.Fatalf("cannot open continue report file, %s", err) + // } - r := new(acp.Report) - if err := json.NewDecoder(f).Decode(r); err != nil { - logrus.Fatalf("decode continue report file, %s", err) - } + // r := new(acp.Report) + // if err := json.NewDecoder(f).Decode(r); err != nil { + // logrus.Fatalf("decode continue report file, %s", err) + // } - for _, s := range r.NoSpaceSources { - logrus.Infof("restore unfinished: base= '%s' relative_path= '%v'", s.Base, s.RelativePaths) - opts = append(opts, acp.AccurateSource(s.Base, s.RelativePaths...)) - } - } else { - opts = append(opts, acp.Source(sources...)) - } + // for _, s := range r.NoSpaceSources { + // logrus.Infof("restore unfinished: base= '%s' relative_path= '%v'", s.Base, s.RelativePaths) + // opts = append(opts, acp.AccurateSource(s.Base, s.RelativePaths...)) + // } + // } opts = append(opts, acp.Target(targetPaths...)) opts = append(opts, acp.WithHash(*reportPath != "")) opts = append(opts, acp.Overwrite(!*notOverwrite)) - opts = append(opts, acp.WithProgressBar(*withProgressBar)) + + if *withProgressBar { + opts = append(opts, acp.WithProgressBar()) + } if *fromLinear { opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true))) @@ -91,8 +95,33 @@ func main() { if *toLinear { opts = append(opts, acp.SetToDevice(acp.LinearDevice(true))) } - if *autoFillDepth != 0 { - opts = append(opts, acp.WithAutoFill(true, *autoFillDepth)) + + if *reportPath != "" { + handler, getter := acp.NewReportGetter() + opts = append(opts, acp.WithEventHandler(handler)) + defer func() { + if *reportPath == "" { + return + } + + report := getter() + r, err := os.Create(*reportPath) + if err != nil { + logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err) + logrus.Infof("report: %s", report) + return + } + defer r.Close() + + var buf []byte + if *reportIndent { + buf, _ = reportJSON.MarshalIndent(report, "", "\t") + } else { + buf, _ = reportJSON.Marshal(report) + } + + r.Write(buf) + }() } c, err := acp.New(ctx, opts...) @@ -100,28 +129,42 @@ func main() { logrus.Fatalf("unexpected exit: %s", err) } - defer func() { - if *reportPath == "" { - return - } - - report := c.Report() - r, err := os.Create(*reportPath) - if err != nil { - logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err) - logrus.Infof("report: %s", report) - return - } - defer r.Close() - - var buf []byte - if *reportIndent { - buf, _ = json.MarshalIndent(report, "", "\t") - } else { - buf, _ = json.Marshal(report) - } - - r.Write(buf) - }() c.Wait() } + +var ( + reportJSON jsoniter.API +) + +type errValCoder struct{} + +func (*errValCoder) IsEmpty(ptr unsafe.Pointer) bool { + val := (*error)(ptr) + return *val == nil +} + +func (*errValCoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) { + val := (*error)(ptr) + stream.WriteString((*val).Error()) +} + +func (*errValCoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { + val := (*error)(ptr) + *val = fmt.Errorf(iter.ReadString()) +} + +func init() { + reportJSON = jsoniter.Config{ + EscapeHTML: true, + SortMapKeys: true, + ValidateJsonRawMessage: true, + }.Froze() + + var emptyErr error + reportJSON.RegisterExtension(jsoniter.EncoderExtension{ + reflect2.TypeOf(emptyErr): &errValCoder{}, + }) + reportJSON.RegisterExtension(jsoniter.DecoderExtension{ + reflect2.TypeOf(emptyErr): &errValCoder{}, + }) +} diff --git a/copy.go b/copy.go index 2e30a8d..cd1d5f5 100644 --- a/copy.go +++ b/copy.go @@ -6,14 +6,16 @@ import ( "fmt" "hash" "os" + "path" "sync" "sync/atomic" "syscall" + "time" "github.com/abc950309/acp/mmap" + mapset "github.com/deckarep/golang-set/v2" "github.com/hashicorp/go-multierror" "github.com/minio/sha256-simd" - "github.com/sirupsen/logrus" ) const ( @@ -24,121 +26,70 @@ var ( sha256Pool = &sync.Pool{New: func() interface{} { return sha256.New() }} ) -func (c *Copyer) copy(ctx context.Context) { - atomic.StoreInt64(&c.stage, StageCopy) - defer atomic.StoreInt64(&c.stage, StageFinished) - wg := new(sync.WaitGroup) +func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *baseJob { + ch := make(chan *baseJob, 128) - wg.Add(1) + var copying sync.WaitGroup + done := make(chan struct{}) + defer func() { + go wrap(ctx, func() { + defer close(done) + defer close(ch) + + copying.Wait() + }) + }() + + cntr := new(counter) go wrap(ctx, func() { - defer wg.Done() - defer close(c.writePipe) - - for _, job := range c.getJobs() { - c.prepare(ctx, job) - + ticker := time.NewTicker(time.Second) + for { select { - case <-ctx.Done(): + case <-ticker.C: + c.submit(&EventUpdateProgress{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files)}) + case <-done: + c.submit(&EventUpdateProgress{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files), Finished: true}) return - default: } } }) - for i := 0; i < c.threads; i++ { - wg.Add(1) + badDsts := mapset.NewSet[string]() + for idx := 0; idx < c.toDevice.threads; idx++ { + copying.Add(1) go wrap(ctx, func() { - defer wg.Done() + defer copying.Done() for { select { - case job, ok := <-c.writePipe: + case <-ctx.Done(): + return + case job, ok := <-prepared: if !ok { return } - wg.Add(1) - c.write(ctx, job) - case <-ctx.Done(): - return + wrap(ctx, func() { c.write(ctx, job, ch, cntr, badDsts) }) } } }) } - go wrap(ctx, func() { - for job := range c.postPipe { - c.post(wg, job) - } - }) - - finished := make(chan struct{}, 1) - go wrap(ctx, func() { - wg.Wait() - finished <- struct{}{} - }) - - select { - case <-finished: - case <-ctx.Done(): - } + return ch } -func (c *Copyer) prepare(ctx context.Context, job *baseJob) { - job.setStatus(jobStatusPreparing) - - switch job.typ { - case jobTypeDir: - for _, d := range c.dst { - target := job.source.target(d) - if err := os.MkdirAll(target, job.mode&os.ModePerm); err != nil && !os.IsExist(err) { - c.reportError(target, fmt.Errorf("mkdir fail, %w", err)) - job.fail(target, fmt.Errorf("mkdir fail, %w", err)) - continue - } - job.succes(target) - } - - c.writePipe <- &writeJob{baseJob: job} - return - } - - if c.readingFiles != nil { - c.readingFiles <- struct{}{} - } - - file, err := mmap.Open(job.source.path()) - if err != nil { - c.reportError(job.source.path(), fmt.Errorf("open src file fail, %w", err)) - return - } - - c.writePipe <- &writeJob{baseJob: job, src: file} -} - -func (c *Copyer) write(ctx context.Context, job *writeJob) { +func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, cntr *counter, badDsts mapset.Set[string]) { job.setStatus(jobStatusCopying) - if job.typ != jobTypeNormal { - return - } + defer job.setStatus(jobStatusFinishing) var wg sync.WaitGroup defer func() { wg.Wait() - - job.src.Close() - if c.readingFiles != nil { - <-c.readingFiles - } - - c.postPipe <- job.baseJob + job.done() + ch <- job.baseJob }() - num := atomic.AddInt64(&c.copyedFiles, 1) - c.logf(logrus.InfoLevel, "[%d/%d] copying: %s", num, c.totalFiles, job.source.relativePath) - c.updateCopying(func(set map[int64]struct{}) { set[num] = struct{}{} }) - defer c.updateCopying(func(set map[int64]struct{}) { delete(set, num) }) - + atomic.AddInt64(&cntr.files, 1) chans := make([]chan []byte, 0, len(c.dst)+1) defer func() { for _, ch := range chans { @@ -167,18 +118,21 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { } var readErr error - badDsts := c.getBadDsts() for _, d := range c.dst { dst := d + name := job.source.dst(dst) - name := job.source.target(dst) - if e, has := badDsts[dst]; has && e != nil { - job.fail(name, fmt.Errorf("bad target path, %w", e)) + if badDsts.Contains(dst) { + job.fail(name, fmt.Errorf("bad target path")) + continue + } + if err := os.MkdirAll(path.Dir(name), os.ModePerm); err != nil { + job.fail(name, fmt.Errorf("mkdir dst dir fail, %w", err)) + continue } file, err := os.OpenFile(name, c.createFlag, job.mode) if err != nil { - c.reportError(name, fmt.Errorf("open dst file fail, %w", err)) job.fail(name, fmt.Errorf("open dst file fail, %w", err)) continue } @@ -206,11 +160,11 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { } // if no space - if errors.Is(err, syscall.ENOSPC) { - c.addBadDsts(dst, err) + if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { + badDsts.Add(dst) } - c.reportError(name, rerr) + c.reportError(job.source.src(), name, rerr) job.fail(name, rerr) }() @@ -236,10 +190,10 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) { if len(chans) == 0 { return } - readErr = c.streamCopy(ctx, chans, job.src) + readErr = c.streamCopy(ctx, chans, job.src, &cntr.bytes) } -func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.ReaderAt) error { +func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.ReaderAt, bytes *int64) error { if src.Len() == 0 { return nil } @@ -255,7 +209,7 @@ func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.R } nr := len(buf) - atomic.AddInt64(&c.copyedBytes, int64(nr)) + atomic.AddInt64(bytes, int64(nr)) if nr < batchSize { return nil } @@ -267,24 +221,3 @@ func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.R } } } - -func (c *Copyer) post(wg *sync.WaitGroup, job *baseJob) { - defer wg.Done() - - job.setStatus(jobStatusFinishing) - for _, name := range job.successTargets { - if err := os.Chtimes(name, job.modTime, job.modTime); err != nil { - c.reportError(name, fmt.Errorf("change info, chtimes fail, %w", err)) - } - } - - job.setStatus(jobStatusFinished) - if job.parent == nil { - return - } - - left := job.parent.done(job) - if left == 0 { - c.postPipe <- job.parent - } -} diff --git a/error.go b/error.go new file mode 100644 index 0000000..a2eba58 --- /dev/null +++ b/error.go @@ -0,0 +1,42 @@ +package acp + +import ( + "encoding/json" + "fmt" +) + +var ( + _ = error(new(Error)) + _ = json.Marshaler(new(Error)) + _ = json.Unmarshaler(new(Error)) +) + +type Error struct { + Src string `json:"src,omitempty"` + Dst string `json:"dst,omitempty"` + Err error `json:"error,omitempty"` +} + +type jsonError struct { + Src string `json:"src,omitempty"` + Dst string `json:"dst,omitempty"` + Err string `json:"error,omitempty"` +} + +func (e *Error) Error() string { + return fmt.Sprintf("[%s => %s]: %s", e.Src, e.Dst, e.Err) +} + +func (e *Error) MarshalJSON() ([]byte, error) { + return json.Marshal(&jsonError{Src: e.Src, Dst: e.Dst, Err: e.Err.Error()}) +} + +func (e *Error) UnmarshalJSON(buf []byte) error { + m := new(jsonError) + if err := json.Unmarshal(buf, &m); err != nil { + return err + } + + e.Src, e.Dst, e.Err = m.Src, e.Dst, fmt.Errorf(m.Err) + return nil +} diff --git a/event.go b/event.go new file mode 100644 index 0000000..3b99a1f --- /dev/null +++ b/event.go @@ -0,0 +1,37 @@ +package acp + +type EventHandler func(Event) + +type Event interface { + iEvent() +} + +type EventUpdateCount struct { + Bytes, Files int64 + Finished bool +} + +func (*EventUpdateCount) iEvent() {} + +type EventUpdateProgress struct { + Bytes, Files int64 + Finished bool +} + +func (*EventUpdateProgress) iEvent() {} + +type EventUpdateJob struct { + Job *Job +} + +func (*EventUpdateJob) iEvent() {} + +type EventReportError struct { + Error *Error +} + +func (*EventReportError) iEvent() {} + +type EventFinished struct{} + +func (*EventFinished) iEvent() {} diff --git a/go.mod b/go.mod index d7bedac..f91776f 100644 --- a/go.mod +++ b/go.mod @@ -10,10 +10,14 @@ require ( ) require ( + github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.0.4 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/rivo/uniseg v0.3.4 // indirect golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect diff --git a/index.go b/index.go index 1b83c08..93b35d3 100644 --- a/index.go +++ b/index.go @@ -6,139 +6,123 @@ import ( "os" "sort" "sync/atomic" - - "github.com/schollz/progressbar/v3" + "time" ) const ( unexpectFileMode = os.ModeType &^ os.ModeDir ) -func (c *Copyer) index(ctx context.Context) { - for _, s := range c.src { - c.walk(nil, s) - } - - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes)) - bar.Describe(fmt.Sprintf("[0/%d] index finished...", atomic.LoadInt64(&c.totalFiles))) - }) +type counter struct { + bytes, files int64 } -func (c *Copyer) walk(parent *baseJob, src *source) *baseJob { - path := src.path() - - stat, err := os.Stat(path) +func (c *Copyer) index(ctx context.Context) (<-chan *baseJob, error) { + jobs := c.walk(ctx) + filtered, err := c.joinJobs(jobs) if err != nil { - c.reportError(path, fmt.Errorf("walk get stat, %w", err)) - return nil - } - if stat.Mode()&unexpectFileMode != 0 { - return nil + return nil, err } - job, err := newJobFromFileInfo(parent, src, stat) - if err != nil { - c.reportError(path, fmt.Errorf("make job fail, %w", err)) - return nil - } + ch := make(chan *baseJob, 128) + go wrap(ctx, func() { + defer close(ch) - c.appendJobs(job) - if job.typ == jobTypeNormal { - totalBytes := atomic.AddInt64(&c.totalBytes, job.size) - totalFiles := atomic.AddInt64(&c.totalFiles, 1) - - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.ChangeMax64(totalBytes) - bar.Describe(fmt.Sprintf("[0/%d] indexing...", totalFiles)) - }) - - return job - } - - files, err := os.ReadDir(path) - if err != nil { - c.reportError(path, fmt.Errorf("walk read dir, %w", err)) - return nil - } - - job.children = make(map[*baseJob]struct{}, len(files)) - for _, file := range files { - id := c.walk(job, &source{base: src.base, relativePath: src.relativePath + "/" + file.Name()}) - if id == nil { - continue + for _, job := range filtered { + select { + case <-ctx.Done(): + return + case ch <- job: + } } - job.children[id] = struct{}{} - } + }) - return job + return ch, nil } -func (c *Copyer) checkJobs() bool { - c.jobsLock.Lock() - defer c.jobsLock.Unlock() +func (c *Copyer) walk(ctx context.Context) []*baseJob { + done := make(chan struct{}) + defer close(done) - if len(c.jobs) == 0 { - c.reportError("", fmt.Errorf("cannot found available jobs")) - return false + cntr := new(counter) + go wrap(ctx, func() { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + c.submit(&EventUpdateCount{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files)}) + case <-done: + c.submit(&EventUpdateCount{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files), Finished: true}) + return + } + } + }) + + jobs := make([]*baseJob, 0, 64) + appendJob := func(job *baseJob) { + jobs = append(jobs, job) + atomic.AddInt64(&cntr.files, 1) + atomic.AddInt64(&cntr.bytes, job.size) } - sort.Slice(c.jobs, func(i int, j int) bool { - return c.jobs[i].comparableRelativePath < c.jobs[j].comparableRelativePath + var walk func(src *source) + walk = func(src *source) { + path := src.src() + + stat, err := os.Stat(path) + if err != nil { + c.reportError(path, "", fmt.Errorf("walk get stat, %w", err)) + return + } + + mode := stat.Mode() + if mode&unexpectFileMode != 0 { + return + } + if !mode.IsDir() { + job, err := c.newJobFromFileInfo(src, stat) + if err != nil { + c.reportError(path, "", fmt.Errorf("make job fail, %w", err)) + return + } + + appendJob(job) + return + } + + files, err := os.ReadDir(path) + if err != nil { + c.reportError(path, "", fmt.Errorf("walk read dir, %w", err)) + return + } + for _, file := range files { + walk(src.append(file.Name())) + } + + return + } + for _, s := range c.src { + walk(s) + } + return jobs +} + +func (c *Copyer) joinJobs(jobs []*baseJob) ([]*baseJob, error) { + sort.Slice(jobs, func(i int, j int) bool { + return comparePath(jobs[i].source.path, jobs[j].source.path) < 0 }) var last *baseJob - filtered := make([]*baseJob, 0, len(c.jobs)) - for _, job := range c.jobs { - if last == nil { - filtered = append(filtered, job) - last = job - continue - } - if last.source.relativePath != job.source.relativePath { - filtered = append(filtered, job) - last = job + filtered := make([]*baseJob, 0, len(jobs)) + for _, job := range jobs { + if last != nil && comparePath(last.source.path, job.source.path) == 0 { + c.reportError(last.source.src(), "", fmt.Errorf("same relative path, ignored, '%s'", job.source.src())) continue } - if last.typ != job.typ { - c.reportError(job.source.path(), fmt.Errorf("same relative path with different type, '%s' and '%s'", job.source.path(), last.source.path())) - return false - } - if last.typ == jobTypeNormal { - c.reportError(job.source.path(), fmt.Errorf("same relative path as normal file, ignored, '%s'", job.source.path())) - continue - } - - func() { - last.lock.Lock() - defer last.lock.Unlock() - - for n := range job.children { - last.children[n] = struct{}{} - n.parent = last - } - }() + filtered = append(filtered, job) + last = job } - c.jobs = filtered - return true -} - -func (c *Copyer) appendJobs(jobs ...*baseJob) { - c.jobsLock.Lock() - defer c.jobsLock.Unlock() - c.jobs = append(c.jobs, jobs...) -} - -func (c *Copyer) getJobs() []*baseJob { - c.jobsLock.Lock() - defer c.jobsLock.Unlock() - return c.jobs -} - -func (c *Copyer) getJobsAndNoSpaceSource() ([]*baseJob, []*source) { - c.jobsLock.Lock() - defer c.jobsLock.Unlock() - return c.jobs, c.noSpaceSource + return filtered, nil } diff --git a/job.go b/job.go index 1d029d4..db38965 100644 --- a/job.go +++ b/job.go @@ -2,21 +2,14 @@ package acp import ( "encoding/hex" + "fmt" "os" - "strings" "sync" "time" "github.com/abc950309/acp/mmap" ) -type jobType uint8 - -const ( - jobTypeNormal = jobType(iota) - jobTypeDir -) - type jobStatus uint8 const ( @@ -38,9 +31,8 @@ var ( ) type baseJob struct { - parent *baseJob + copyer *Copyer source *source - typ jobType name string // base name of the file size int64 // length in bytes for regular files; system-dependent for others @@ -50,32 +42,27 @@ type baseJob struct { lock sync.Mutex writeTime time.Time status jobStatus - children map[*baseJob]struct{} successTargets []string failedTargets map[string]error hash []byte - - // utils - comparableRelativePath string } -func newJobFromFileInfo(parent *baseJob, source *source, info os.FileInfo) (*baseJob, error) { +func (c *Copyer) newJobFromFileInfo(source *source, info os.FileInfo) (*baseJob, error) { job := &baseJob{ - parent: parent, + copyer: c, source: source, name: info.Name(), size: info.Size(), mode: info.Mode(), modTime: info.ModTime(), - - comparableRelativePath: strings.ReplaceAll(source.relativePath, "/", "\x00"), } - if job.mode.IsDir() { - job.typ = jobTypeDir + if job.mode.IsDir() || job.mode&unexpectFileMode != 0 { + return nil, fmt.Errorf("unexpected file, path= %s", source.src()) } + c.submit(&EventUpdateJob{job.report()}) return job, nil } @@ -87,30 +74,24 @@ func (j *baseJob) setStatus(s jobStatus) { if s == jobStatusCopying { j.writeTime = time.Now() } + + j.copyer.submit(&EventUpdateJob{j.report()}) } func (j *baseJob) setHash(h []byte) { j.lock.Lock() defer j.lock.Unlock() + j.hash = h -} - -func (j *baseJob) done(child *baseJob) int { - if j.typ == jobTypeNormal { - return 0 - } - - j.lock.Lock() - defer j.lock.Unlock() - - delete(j.children, child) - return len(j.children) + j.copyer.submit(&EventUpdateJob{j.report()}) } func (j *baseJob) succes(path string) { j.lock.Lock() defer j.lock.Unlock() + j.successTargets = append(j.successTargets, path) + j.copyer.submit(&EventUpdateJob{j.report()}) } func (j *baseJob) fail(path string, err error) { @@ -121,24 +102,17 @@ func (j *baseJob) fail(path string, err error) { j.failedTargets = make(map[string]error, 1) } j.failedTargets[path] = err + j.copyer.submit(&EventUpdateJob{j.report()}) } -func (j *baseJob) report() *File { - j.lock.Lock() - defer j.lock.Unlock() - - fails := make(map[string]string, len(j.failedTargets)) - for n, e := range j.failedTargets { - fails[n] = e.Error() - } - - return &File{ - Source: j.source.path(), - RelativePath: j.source.relativePath, +func (j *baseJob) report() *Job { + return &Job{ + Base: j.source.base, + Path: j.source.path, Status: statusMapping[j.status], SuccessTargets: j.successTargets, - FailTargets: fails, + FailTargets: j.failedTargets, Size: j.size, Mode: j.mode, @@ -151,4 +125,25 @@ func (j *baseJob) report() *File { type writeJob struct { *baseJob src *mmap.ReaderAt + ch chan struct{} +} + +func (wj *writeJob) done() { + wj.src.Close() + close(wj.ch) +} + +type Job struct { + Base string `json:"base"` + Path []string `json:"path"` + + Status string `json:"status"` + SuccessTargets []string `json:"success_target,omitempty"` + FailTargets map[string]error `json:"fail_target,omitempty"` + + Size int64 `json:"size"` + Mode os.FileMode `json:"mode"` + ModTime time.Time `json:"mod_time"` + WriteTime time.Time `json:"write_time"` + SHA256 string `json:"sha256"` } diff --git a/opt.go b/opt.go index cb1fa61..6eeae5d 100644 --- a/opt.go +++ b/opt.go @@ -4,22 +4,31 @@ import ( "fmt" "os" "path" + "sort" "strings" "github.com/sirupsen/logrus" ) type source struct { - base string - relativePath string + base string + path []string } -func (s *source) path() string { - return s.base + s.relativePath +func (s *source) src() string { + return s.base + path.Join(s.path...) } -func (s *source) target(dst string) string { - return dst + s.relativePath +func (s *source) dst(dst string) string { + return dst + path.Join(s.path...) +} + +func (s *source) append(next ...string) *source { + copyed := make([]string, len(s.path)+len(next)) + copy(copyed, s.path) + copy(copyed[len(s.path):], next) + + return &source{base: s.base, path: copyed} } type option struct { @@ -29,21 +38,18 @@ type option struct { fromDevice *deviceOption toDevice *deviceOption - overwrite bool - withProgressBar bool - threads int - withHash bool + createFlag int + withHash bool - autoFill bool - autoFillSplitDepth int - - logger *logrus.Logger + logger *logrus.Logger + eventHanders []EventHandler } func newOption() *option { return &option{ fromDevice: new(deviceOption), toDevice: new(deviceOption), + createFlag: os.O_WRONLY | os.O_CREATE | os.O_EXCL, } } @@ -73,17 +79,20 @@ func (o *option) check() error { if len(o.src) == 0 { return fmt.Errorf("source path not found") } + sort.Slice(o.src, func(i, j int) bool { + return comparePath(o.src[i].path, o.src[j].path) < 0 + }) for _, s := range o.src { - if _, err := os.Stat(s.path()); err != nil { - return fmt.Errorf("check src path '%s', %w", s.path(), err) + src := s.src() + if _, err := os.Stat(src); err != nil { + return fmt.Errorf("check src path '%s', %w", src, err) } } - if o.threads < 1 { - o.threads = 4 - } - if o.fromDevice.linear || o.toDevice.linear { - o.threads = 1 + o.fromDevice.check() + o.toDevice.check() + if o.logger == nil { + o.logger = logrus.StandardLogger() } return nil } @@ -99,23 +108,17 @@ func Source(paths ...string) Option { } base, name := path.Split(p) - o.src = append(o.src, &source{base: base, relativePath: name}) + o.src = append(o.src, &source{base: base, path: []string{name}}) } return o } } -func AccurateSource(base string, relativePaths ...string) Option { +func AccurateSource(base string, paths ...[]string) Option { return func(o *option) *option { base = path.Clean(base) - - for _, p := range relativePaths { - p = path.Clean(p) - if p[len(p)-1] == '/' { - p = p[:len(p)-1] - } - - o.src = append(o.src, &source{base: base, relativePath: p}) + for _, path := range paths { + o.src = append(o.src, &source{base: base, path: path}) } return o } @@ -154,23 +157,18 @@ func SetToDevice(opts ...DeviceOption) Option { func Overwrite(b bool) Option { return func(o *option) *option { - o.overwrite = b + if b { + o.createFlag = os.O_WRONLY | os.O_CREATE | os.O_TRUNC + return o + } + + o.createFlag = os.O_WRONLY | os.O_CREATE | os.O_EXCL return o } } -func WithProgressBar(b bool) Option { - return func(o *option) *option { - o.withProgressBar = b - return o - } -} - -func WithThreads(threads int) Option { - return func(o *option) *option { - o.threads = threads - return o - } +func WithProgressBar() Option { + return WithEventHandler(NewProgressBar()) } func WithHash(b bool) Option { @@ -180,17 +178,61 @@ func WithHash(b bool) Option { } } -func WithAutoFill(on bool, depth int) Option { - return func(o *option) *option { - o.autoFill = on - o.autoFillSplitDepth = depth - return o - } -} - func WithLogger(logger *logrus.Logger) Option { return func(o *option) *option { o.logger = logger return o } } + +func WithEventHandler(h EventHandler) Option { + return func(o *option) *option { + o.eventHanders = append(o.eventHanders, h) + return o + } +} + +func comparePath(a, b []string) int { + al, bl := len(a), len(b) + + l := al + if bl < al { + l = bl + } + + for idx := 0; idx < l; idx++ { + if a[idx] < b[idx] { + return -1 + } + if a[idx] > b[idx] { + return 1 + } + } + + if al < bl { + return -1 + } + if al > bl { + return 1 + } + return 0 +} + +// isChild return -1(not) 0(equal) 1(child) +func isChild(parent, child []string) int { + pl, cl := len(parent), len(child) + if pl > cl { + return -1 + } + + for idx := 0; idx < pl; idx++ { + if parent[idx] != child[idx] { + return -1 + } + } + + if pl == cl { + return 0 + } + return 1 +} diff --git a/opt_device.go b/opt_device.go index 8ab75c8..8903170 100644 --- a/opt_device.go +++ b/opt_device.go @@ -1,7 +1,17 @@ package acp type deviceOption struct { - linear bool + linear bool + threads int +} + +func (do *deviceOption) check() { + if do.threads == 0 { + do.threads = 8 + } + if do.linear { + do.threads = 1 + } } type DeviceOption func(*deviceOption) *deviceOption @@ -12,3 +22,10 @@ func LinearDevice(b bool) DeviceOption { return d } } + +func DeviceThreads(threads int) DeviceOption { + return func(d *deviceOption) *deviceOption { + d.threads = threads + return d + } +} diff --git a/prepare.go b/prepare.go new file mode 100644 index 0000000..b4f891d --- /dev/null +++ b/prepare.go @@ -0,0 +1,57 @@ +package acp + +import ( + "context" + "fmt" + "sync" + + "github.com/abc950309/acp/mmap" +) + +func (c *Copyer) prepare(ctx context.Context, indexed <-chan *baseJob) <-chan *writeJob { + chanLen := 32 + if c.fromDevice.linear { + chanLen = 0 + } + + var wg sync.WaitGroup + ch := make(chan *writeJob, chanLen) + defer func() { + go wrap(ctx, func() { + defer close(ch) + wg.Wait() + }) + }() + + for idx := 0; idx < c.fromDevice.threads; idx++ { + wg.Add(1) + go wrap(ctx, func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case job, ok := <-indexed: + if !ok { + return + } + + job.setStatus(jobStatusPreparing) + + file, err := mmap.Open(job.source.src()) + if err != nil { + c.reportError(job.source.src(), "", fmt.Errorf("open src file fail, %w", err)) + return + } + + wj := &writeJob{baseJob: job, src: file, ch: make(chan struct{})} + ch <- wj + <-wj.ch + } + } + }) + } + + return ch +} diff --git a/progress_bar.go b/progress_bar.go index 7089273..121e3d6 100644 --- a/progress_bar.go +++ b/progress_bar.go @@ -1,25 +1,16 @@ package acp import ( - "context" "fmt" "os" - "sort" - "strings" - "sync/atomic" "time" "github.com/schollz/progressbar/v3" - "github.com/sirupsen/logrus" ) -const ( - barUpdateInterval = time.Nanosecond * 255002861 -) - -func (c *Copyer) startProgressBar(ctx context.Context) { +func NewProgressBar() EventHandler { // progressBar := progressbar.DefaultBytes(1, "[0/0] indexing...") - progressBar := progressbar.NewOptions64( + bar := progressbar.NewOptions64( 1, progressbar.OptionSetDescription("[0/0] indexing..."), progressbar.OptionSetWriter(os.Stderr), @@ -33,84 +24,26 @@ func (c *Copyer) startProgressBar(ctx context.Context) { progressbar.OptionSetRenderBlankState(true), ) - ch := make(chan interface{}, 8) - c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) { - ch <- f - } - c.updateCopying = func(f func(set map[int64]struct{})) { - ch <- f - } - c.logf = func(l logrus.Level, format string, args ...any) { - ch <- func() { logrus.StandardLogger().Logf(l, format, args...) } - } + var totalFiles int64 + return func(ev Event) { + switch e := ev.(type) { + case *EventUpdateCount: + totalFiles = e.Files + bar.Describe(fmt.Sprintf("[0/%d] indexing...", totalFiles)) + bar.ChangeMax64(e.Bytes) + return + case *EventUpdateProgress: + bar.Set64(e.Bytes) - go wrap(ctx, func() { - copying := make(map[int64]struct{}, c.threads) - - for f := range ch { - if progressBar == nil { - continue - } - - switch v := f.(type) { - case func(bar *progressbar.ProgressBar): - v(progressBar) - case func(set map[int64]struct{}): - v(copying) - - idxs := make([]int64, 0, len(copying)) - for idx := range copying { - idxs = append(idxs, idx) - } - sort.Slice(idxs, func(i, j int) bool { - return idxs[i] < idxs[j] - }) - - strs := make([]string, 0, len(idxs)) - for _, idx := range idxs { - strs = append(strs, fmt.Sprintf("file %d", idx)) - } - - copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles) - progressBar.Describe(fmt.Sprintf("[%d/%d] copying... %s", copyedFiles, totalFiles, strings.Join(strs, ", "))) - case func(): - v() - } - } - }) - - go wrap(ctx, func() { - defer close(ch) - - ticker := time.NewTicker(barUpdateInterval) // around 255ms, avoid conflict with progress bar fresh by second - defer ticker.Stop() - - var lastCopyedBytes int64 - for { - select { - case <-ticker.C: - switch atomic.LoadInt64(&c.stage) { - case StageCopy: - currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) - diff := currentCopyedBytes - lastCopyedBytes - lastCopyedBytes = currentCopyedBytes - - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.Add64(diff) - }) - } - case <-ctx.Done(): - currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes) - diff := int(currentCopyedBytes - lastCopyedBytes) - lastCopyedBytes = currentCopyedBytes - - copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles) - c.updateProgressBar(func(bar *progressbar.ProgressBar) { - bar.Add(diff) - bar.Describe(fmt.Sprintf("[%d/%d] finished!", copyedFiles, totalFiles)) - }) + if !e.Finished { + bar.Describe(fmt.Sprintf("[%d/%d] copying...", e.Files, totalFiles)) return } + + bar.Describe(fmt.Sprintf("[%d/%d] finishing...", e.Files, totalFiles)) + return + default: + return } - }) + } } diff --git a/report.go b/report.go index b4a1272..1953f6b 100644 --- a/report.go +++ b/report.go @@ -1,94 +1,54 @@ package acp import ( - "encoding/json" - "fmt" - "os" - "time" + "path" + "sync" ) -func (c *Copyer) Report() *Report { - jobs, nss := c.getJobsAndNoSpaceSource() - errs := c.getErrors() +type ReportGetter func() *Report - files := make([]*File, 0, len(jobs)) - for _, job := range jobs { - files = append(files, job.report()) +func NewReportGetter() (EventHandler, ReportGetter) { + var lock sync.Mutex + jobs := make(map[string]*Job, 8) + errors := make([]*Error, 0) + + handler := func(ev Event) { + switch e := ev.(type) { + case *EventUpdateJob: + lock.Lock() + defer lock.Unlock() + + key := path.Join(e.Job.Path...) + jobs[key] = e.Job + case *EventReportError: + lock.Lock() + defer lock.Unlock() + + errors = append(errors, e.Error) + } } + getter := func() *Report { + lock.Lock() + defer lock.Unlock() - noSpaceSources := make([]*FilePath, 0, len(nss)) - for _, s := range nss { - if len(noSpaceSources) == 0 { - noSpaceSources = append(noSpaceSources, &FilePath{Base: s.base, RelativePaths: []string{s.relativePath}}) - continue + jobsCopyed := make([]*Job, 0, len(jobs)) + for _, j := range jobs { + jobsCopyed = append(jobsCopyed, j) + } + errorsCopyed := make([]*Error, 0, len(jobs)) + for _, e := range errors { + errorsCopyed = append(errorsCopyed, e) } - if last := noSpaceSources[len(noSpaceSources)-1]; last.Base == s.base { - last.RelativePaths = append(last.RelativePaths, s.relativePath) - continue + return &Report{ + Jobs: jobsCopyed, + Errors: errorsCopyed, } - - noSpaceSources = append(noSpaceSources, &FilePath{Base: s.base, RelativePaths: []string{s.relativePath}}) } - - return &Report{ - Files: files, - NoSpaceSources: noSpaceSources, - Errors: errs, - } -} - -var ( - _ = error(new(Error)) - _ = json.Marshaler(new(Error)) - _ = json.Unmarshaler(new(Error)) -) - -type Error struct { - Path string `json:"path,omitempty"` - Err error `json:"error,omitempty"` -} - -func (e *Error) Error() string { - return fmt.Sprintf("[%s]: %s", e.Path, e.Err) -} - -func (e *Error) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]string{"path": e.Path, "error": e.Err.Error()}) -} - -func (e *Error) UnmarshalJSON(buf []byte) error { - m := make(map[string]string, 2) - if err := json.Unmarshal(buf, &m); err != nil { - return err - } - - e.Path, e.Err = m["path"], fmt.Errorf(m["error"]) - return nil -} - -type File struct { - Source string `json:"source"` - RelativePath string `json:"relative_path"` - - Status string `json:"status"` - SuccessTargets []string `json:"success_target,omitempty"` - FailTargets map[string]string `json:"fail_target,omitempty"` - - Size int64 `json:"size"` - Mode os.FileMode `json:"mode"` - ModTime time.Time `json:"mod_time"` - WriteTime time.Time `json:"write_time"` - SHA256 string `json:"sha256"` -} - -type FilePath struct { - Base string `json:"base,omitempty"` - RelativePaths []string `json:"relative_paths,omitempty"` + return handler, getter } type Report struct { - Files []*File `json:"files,omitempty"` - NoSpaceSources []*FilePath `json:"no_space_sources,omitempty"` - Errors []*Error `json:"errors,omitempty"` + Jobs []*Job `json:"files,omitempty"` + Errors []*Error `json:"errors,omitempty"` } diff --git a/status.go b/status.go.bak similarity index 100% rename from status.go rename to status.go.bak