feat: add event

This commit is contained in:
崔竞宁
2022-12-07 19:50:48 +08:00
parent 988889e8bd
commit 9fa93b905b
16 changed files with 681 additions and 724 deletions

165
acp.go
View File

@@ -2,48 +2,15 @@ package acp
import (
"context"
"os"
"sync"
"github.com/schollz/progressbar/v3"
"github.com/sirupsen/logrus"
)
const (
StageIndex = iota
StageCopy
StageFinished
)
type Copyer struct {
*option
createFlag int
stage int64
copyedBytes int64
totalBytes int64
copyedFiles int64
totalFiles int64
updateProgressBar func(func(bar *progressbar.ProgressBar))
updateCopying func(func(set map[int64]struct{}))
logf func(l logrus.Level, format string, args ...any)
jobsLock sync.Mutex
jobs []*baseJob
noSpaceSource []*source
errsLock sync.Mutex
errors []*Error
badDstsLock sync.Mutex
badDsts map[string]error
readingFiles chan struct{}
writePipe chan *writeJob
postPipe chan *baseJob
running sync.WaitGroup
eventCh chan Event
}
func New(ctx context.Context, opts ...Option) (*Copyer, error) {
@@ -60,89 +27,91 @@ func New(ctx context.Context, opts ...Option) (*Copyer, error) {
c := &Copyer{
option: opt,
stage: StageIndex,
updateProgressBar: func(f func(bar *progressbar.ProgressBar)) {},
updateCopying: func(f func(set map[int64]struct{})) {},
badDsts: make(map[string]error),
writePipe: make(chan *writeJob, 32),
postPipe: make(chan *baseJob, 8),
}
c.createFlag = os.O_WRONLY | os.O_CREATE
if c.overwrite {
c.createFlag |= os.O_TRUNC
} else {
c.createFlag |= os.O_EXCL
}
if c.fromDevice.linear {
c.readingFiles = make(chan struct{}, 1)
}
if opt.logger != nil {
c.logf = func(l logrus.Level, format string, args ...any) { opt.logger.Logf(l, format, args...) }
} else {
c.logf = func(l logrus.Level, format string, args ...any) { logrus.StandardLogger().Logf(l, format, args...) }
eventCh: make(chan Event, 128),
}
c.running.Add(1)
go wrap(ctx, func() { c.run(ctx) })
return c, nil
}
func (c *Copyer) Wait() *Report {
func (c *Copyer) Wait() {
c.running.Wait()
return c.Report()
}
func (c *Copyer) run(ctx context.Context) {
func (c *Copyer) run(ctx context.Context) error {
defer c.running.Done()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if c.withProgressBar {
c.startProgressBar(ctx)
go wrap(ctx, func() { c.eventLoop(ctx) })
indexed, err := c.index(ctx)
if err != nil {
return err
}
c.index(ctx)
if !c.checkJobs() {
return
}
prepared := c.prepare(ctx, indexed)
copyed := c.copy(ctx, prepared)
c.cleanupJob(ctx, copyed)
if err := c.applyAutoFillLimit(); err != nil {
c.reportError("_autofill", err)
return
}
c.copy(ctx)
return nil
}
func (c *Copyer) reportError(file string, err error) {
e := &Error{Path: file, Err: err}
func (c *Copyer) eventLoop(ctx context.Context) {
chans := make([]chan Event, len(c.eventHanders))
for idx := range chans {
chans[idx] = make(chan Event, 128)
}
for idx, ch := range chans {
handler := c.eventHanders[idx]
events := ch
go wrap(ctx, func() {
for {
e, ok := <-events
if !ok {
handler(&EventFinished{})
return
}
handler(e)
}
})
}
defer func() {
for _, ch := range chans {
close(ch)
}
}()
for {
select {
case e, ok := <-c.eventCh:
if !ok {
return
}
for _, ch := range chans {
ch <- e
}
case <-ctx.Done():
return
}
}
}
func (c *Copyer) logf(l logrus.Level, format string, args ...any) {
c.logger.Logf(l, format, args...)
}
func (c *Copyer) submit(e Event) {
c.eventCh <- e
}
func (c *Copyer) reportError(src, dst string, err error) {
e := &Error{Src: src, Dst: dst, Err: err}
c.logf(logrus.ErrorLevel, e.Error())
c.errsLock.Lock()
defer c.errsLock.Unlock()
c.errors = append(c.errors, e)
}
func (c *Copyer) getErrors() []*Error {
c.errsLock.Lock()
defer c.errsLock.Unlock()
return c.errors
}
func (c *Copyer) addBadDsts(dst string, err error) {
c.badDstsLock.Lock()
defer c.badDstsLock.Unlock()
c.badDsts[dst] = err
}
func (c *Copyer) getBadDsts() map[string]error {
c.errsLock.Lock()
defer c.errsLock.Unlock()
return c.badDsts
c.submit(&EventReportError{Error: e})
}

View File

@@ -1,86 +0,0 @@
package acp
import (
"fmt"
"math"
"strings"
)
func (c *Copyer) applyAutoFillLimit() error {
if !c.autoFill {
return nil
}
counts := make(map[string]int, len(c.dst))
infos := make(map[string]*fileSystem, len(c.dst))
for _, d := range c.dst {
fsInfo, err := getFileSystem(d)
if err != nil {
return fmt.Errorf("get file system fail, %s, %w", d, err)
}
infos[d] = fsInfo
counts[d] = counts[d] + 1
}
min := int64(math.MaxInt64)
for mp, info := range infos {
size := info.AvailableSize / int64(counts[mp])
if size < min {
min = size
}
}
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
idx := c.getAutoFillCutoffIdx(min)
if idx < 0 {
return nil
}
if idx == 0 {
return fmt.Errorf("cannot found available auto fill slice, filesystem_size= %d", min)
}
cutoff := c.jobs[idx:]
c.jobs = c.jobs[:idx]
last := ""
for _, job := range cutoff {
if job.parent != nil {
job.parent.done(job)
}
if strings.HasPrefix(job.source.relativePath, last) {
continue
}
c.noSpaceSource = append(c.noSpaceSource, job.source)
last = job.source.relativePath + "/"
}
return nil
}
func (c *Copyer) getAutoFillCutoffIdx(limit int64) int {
left := limit
targetIdx := -1
for idx, job := range c.jobs {
left -= job.size
if left < 0 {
targetIdx = idx
}
}
if targetIdx < 0 {
return -1
}
if c.autoFillSplitDepth <= 0 {
return targetIdx
}
for idx := targetIdx; idx >= 0; idx-- {
if strings.Count(c.jobs[idx].source.relativePath, "/") < c.autoFillSplitDepth {
return idx
}
}
return 0
}

27
cleanup.go Normal file
View File

@@ -0,0 +1,27 @@
package acp
import (
"context"
"fmt"
"os"
)
func (c *Copyer) cleanupJob(ctx context.Context, copyed <-chan *baseJob) {
for {
select {
case job, ok := <-copyed:
if !ok {
return
}
for _, name := range job.successTargets {
if err := os.Chtimes(name, job.modTime, job.modTime); err != nil {
c.reportError(job.source.src(), name, fmt.Errorf("change info, chtimes fail, %w", err))
}
}
job.setStatus(jobStatusFinished)
case <-ctx.Done():
return
}
}
}

View File

@@ -2,25 +2,27 @@ package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"os/signal"
"unsafe"
"github.com/abc950309/acp"
jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"github.com/sirupsen/logrus"
)
var (
withProgressBar = flag.Bool("p", true, "display progress bar")
notOverwrite = flag.Bool("n", false, "not overwrite exist file")
continueReport = flag.String("c", "", "continue with previous report, for auto fill circumstances")
// continueReport = flag.String("c", "", "continue with previous report, for auto fill circumstances")
noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool")
reportPath = flag.String("report", "", "json report storage path")
reportIndent = flag.Bool("report-indent", false, "json report with indent")
fromLinear = flag.Bool("from-linear", false, "copy from linear device, such like tape drive")
toLinear = flag.Bool("to-linear", false, "copy to linear device, such like tape drive")
autoFillDepth = flag.Int("auto-fill-depth", 0, "auto fill the whole filesystem, split by specify filepath depth, and report will output left filepaths")
targetPaths []string
)
@@ -61,29 +63,31 @@ func main() {
}()
opts := make([]acp.Option, 0, 8)
if *continueReport != "" {
f, err := os.Open(*continueReport)
if err != nil {
logrus.Fatalf("cannot open continue report file, %s", err)
}
r := new(acp.Report)
if err := json.NewDecoder(f).Decode(r); err != nil {
logrus.Fatalf("decode continue report file, %s", err)
}
for _, s := range r.NoSpaceSources {
logrus.Infof("restore unfinished: base= '%s' relative_path= '%v'", s.Base, s.RelativePaths)
opts = append(opts, acp.AccurateSource(s.Base, s.RelativePaths...))
}
} else {
opts = append(opts, acp.Source(sources...))
}
// if *continueReport != "" {
// f, err := os.Open(*continueReport)
// if err != nil {
// logrus.Fatalf("cannot open continue report file, %s", err)
// }
// r := new(acp.Report)
// if err := json.NewDecoder(f).Decode(r); err != nil {
// logrus.Fatalf("decode continue report file, %s", err)
// }
// for _, s := range r.NoSpaceSources {
// logrus.Infof("restore unfinished: base= '%s' relative_path= '%v'", s.Base, s.RelativePaths)
// opts = append(opts, acp.AccurateSource(s.Base, s.RelativePaths...))
// }
// }
opts = append(opts, acp.Target(targetPaths...))
opts = append(opts, acp.WithHash(*reportPath != ""))
opts = append(opts, acp.Overwrite(!*notOverwrite))
opts = append(opts, acp.WithProgressBar(*withProgressBar))
if *withProgressBar {
opts = append(opts, acp.WithProgressBar())
}
if *fromLinear {
opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true)))
@@ -91,21 +95,16 @@ func main() {
if *toLinear {
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
}
if *autoFillDepth != 0 {
opts = append(opts, acp.WithAutoFill(true, *autoFillDepth))
}
c, err := acp.New(ctx, opts...)
if err != nil {
logrus.Fatalf("unexpected exit: %s", err)
}
if *reportPath != "" {
handler, getter := acp.NewReportGetter()
opts = append(opts, acp.WithEventHandler(handler))
defer func() {
if *reportPath == "" {
return
}
report := c.Report()
report := getter()
r, err := os.Create(*reportPath)
if err != nil {
logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err)
@@ -116,12 +115,56 @@ func main() {
var buf []byte
if *reportIndent {
buf, _ = json.MarshalIndent(report, "", "\t")
buf, _ = reportJSON.MarshalIndent(report, "", "\t")
} else {
buf, _ = json.Marshal(report)
buf, _ = reportJSON.Marshal(report)
}
r.Write(buf)
}()
}
c, err := acp.New(ctx, opts...)
if err != nil {
logrus.Fatalf("unexpected exit: %s", err)
}
c.Wait()
}
var (
reportJSON jsoniter.API
)
type errValCoder struct{}
func (*errValCoder) IsEmpty(ptr unsafe.Pointer) bool {
val := (*error)(ptr)
return *val == nil
}
func (*errValCoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
val := (*error)(ptr)
stream.WriteString((*val).Error())
}
func (*errValCoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
val := (*error)(ptr)
*val = fmt.Errorf(iter.ReadString())
}
func init() {
reportJSON = jsoniter.Config{
EscapeHTML: true,
SortMapKeys: true,
ValidateJsonRawMessage: true,
}.Froze()
var emptyErr error
reportJSON.RegisterExtension(jsoniter.EncoderExtension{
reflect2.TypeOf(emptyErr): &errValCoder{},
})
reportJSON.RegisterExtension(jsoniter.DecoderExtension{
reflect2.TypeOf(emptyErr): &errValCoder{},
})
}

165
copy.go
View File

@@ -6,14 +6,16 @@ import (
"fmt"
"hash"
"os"
"path"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/abc950309/acp/mmap"
mapset "github.com/deckarep/golang-set/v2"
"github.com/hashicorp/go-multierror"
"github.com/minio/sha256-simd"
"github.com/sirupsen/logrus"
)
const (
@@ -24,121 +26,70 @@ var (
sha256Pool = &sync.Pool{New: func() interface{} { return sha256.New() }}
)
func (c *Copyer) copy(ctx context.Context) {
atomic.StoreInt64(&c.stage, StageCopy)
defer atomic.StoreInt64(&c.stage, StageFinished)
wg := new(sync.WaitGroup)
func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *baseJob {
ch := make(chan *baseJob, 128)
wg.Add(1)
var copying sync.WaitGroup
done := make(chan struct{})
defer func() {
go wrap(ctx, func() {
defer wg.Done()
defer close(c.writePipe)
defer close(done)
defer close(ch)
for _, job := range c.getJobs() {
c.prepare(ctx, job)
copying.Wait()
})
}()
cntr := new(counter)
go wrap(ctx, func() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
case <-ticker.C:
c.submit(&EventUpdateProgress{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files)})
case <-done:
c.submit(&EventUpdateProgress{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files), Finished: true})
return
default:
}
}
})
for i := 0; i < c.threads; i++ {
wg.Add(1)
badDsts := mapset.NewSet[string]()
for idx := 0; idx < c.toDevice.threads; idx++ {
copying.Add(1)
go wrap(ctx, func() {
defer wg.Done()
defer copying.Done()
for {
select {
case job, ok := <-c.writePipe:
case <-ctx.Done():
return
case job, ok := <-prepared:
if !ok {
return
}
wg.Add(1)
c.write(ctx, job)
case <-ctx.Done():
return
wrap(ctx, func() { c.write(ctx, job, ch, cntr, badDsts) })
}
}
})
}
go wrap(ctx, func() {
for job := range c.postPipe {
c.post(wg, job)
}
})
finished := make(chan struct{}, 1)
go wrap(ctx, func() {
wg.Wait()
finished <- struct{}{}
})
select {
case <-finished:
case <-ctx.Done():
}
return ch
}
func (c *Copyer) prepare(ctx context.Context, job *baseJob) {
job.setStatus(jobStatusPreparing)
switch job.typ {
case jobTypeDir:
for _, d := range c.dst {
target := job.source.target(d)
if err := os.MkdirAll(target, job.mode&os.ModePerm); err != nil && !os.IsExist(err) {
c.reportError(target, fmt.Errorf("mkdir fail, %w", err))
job.fail(target, fmt.Errorf("mkdir fail, %w", err))
continue
}
job.succes(target)
}
c.writePipe <- &writeJob{baseJob: job}
return
}
if c.readingFiles != nil {
c.readingFiles <- struct{}{}
}
file, err := mmap.Open(job.source.path())
if err != nil {
c.reportError(job.source.path(), fmt.Errorf("open src file fail, %w", err))
return
}
c.writePipe <- &writeJob{baseJob: job, src: file}
}
func (c *Copyer) write(ctx context.Context, job *writeJob) {
func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, cntr *counter, badDsts mapset.Set[string]) {
job.setStatus(jobStatusCopying)
if job.typ != jobTypeNormal {
return
}
defer job.setStatus(jobStatusFinishing)
var wg sync.WaitGroup
defer func() {
wg.Wait()
job.src.Close()
if c.readingFiles != nil {
<-c.readingFiles
}
c.postPipe <- job.baseJob
job.done()
ch <- job.baseJob
}()
num := atomic.AddInt64(&c.copyedFiles, 1)
c.logf(logrus.InfoLevel, "[%d/%d] copying: %s", num, c.totalFiles, job.source.relativePath)
c.updateCopying(func(set map[int64]struct{}) { set[num] = struct{}{} })
defer c.updateCopying(func(set map[int64]struct{}) { delete(set, num) })
atomic.AddInt64(&cntr.files, 1)
chans := make([]chan []byte, 0, len(c.dst)+1)
defer func() {
for _, ch := range chans {
@@ -167,18 +118,21 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
}
var readErr error
badDsts := c.getBadDsts()
for _, d := range c.dst {
dst := d
name := job.source.dst(dst)
name := job.source.target(dst)
if e, has := badDsts[dst]; has && e != nil {
job.fail(name, fmt.Errorf("bad target path, %w", e))
if badDsts.Contains(dst) {
job.fail(name, fmt.Errorf("bad target path"))
continue
}
if err := os.MkdirAll(path.Dir(name), os.ModePerm); err != nil {
job.fail(name, fmt.Errorf("mkdir dst dir fail, %w", err))
continue
}
file, err := os.OpenFile(name, c.createFlag, job.mode)
if err != nil {
c.reportError(name, fmt.Errorf("open dst file fail, %w", err))
job.fail(name, fmt.Errorf("open dst file fail, %w", err))
continue
}
@@ -206,11 +160,11 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
}
// if no space
if errors.Is(err, syscall.ENOSPC) {
c.addBadDsts(dst, err)
if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) {
badDsts.Add(dst)
}
c.reportError(name, rerr)
c.reportError(job.source.src(), name, rerr)
job.fail(name, rerr)
}()
@@ -236,10 +190,10 @@ func (c *Copyer) write(ctx context.Context, job *writeJob) {
if len(chans) == 0 {
return
}
readErr = c.streamCopy(ctx, chans, job.src)
readErr = c.streamCopy(ctx, chans, job.src, &cntr.bytes)
}
func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.ReaderAt) error {
func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.ReaderAt, bytes *int64) error {
if src.Len() == 0 {
return nil
}
@@ -255,7 +209,7 @@ func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.R
}
nr := len(buf)
atomic.AddInt64(&c.copyedBytes, int64(nr))
atomic.AddInt64(bytes, int64(nr))
if nr < batchSize {
return nil
}
@@ -267,24 +221,3 @@ func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src *mmap.R
}
}
}
func (c *Copyer) post(wg *sync.WaitGroup, job *baseJob) {
defer wg.Done()
job.setStatus(jobStatusFinishing)
for _, name := range job.successTargets {
if err := os.Chtimes(name, job.modTime, job.modTime); err != nil {
c.reportError(name, fmt.Errorf("change info, chtimes fail, %w", err))
}
}
job.setStatus(jobStatusFinished)
if job.parent == nil {
return
}
left := job.parent.done(job)
if left == 0 {
c.postPipe <- job.parent
}
}

42
error.go Normal file
View File

@@ -0,0 +1,42 @@
package acp
import (
"encoding/json"
"fmt"
)
var (
_ = error(new(Error))
_ = json.Marshaler(new(Error))
_ = json.Unmarshaler(new(Error))
)
type Error struct {
Src string `json:"src,omitempty"`
Dst string `json:"dst,omitempty"`
Err error `json:"error,omitempty"`
}
type jsonError struct {
Src string `json:"src,omitempty"`
Dst string `json:"dst,omitempty"`
Err string `json:"error,omitempty"`
}
func (e *Error) Error() string {
return fmt.Sprintf("[%s => %s]: %s", e.Src, e.Dst, e.Err)
}
func (e *Error) MarshalJSON() ([]byte, error) {
return json.Marshal(&jsonError{Src: e.Src, Dst: e.Dst, Err: e.Err.Error()})
}
func (e *Error) UnmarshalJSON(buf []byte) error {
m := new(jsonError)
if err := json.Unmarshal(buf, &m); err != nil {
return err
}
e.Src, e.Dst, e.Err = m.Src, e.Dst, fmt.Errorf(m.Err)
return nil
}

37
event.go Normal file
View File

@@ -0,0 +1,37 @@
package acp
type EventHandler func(Event)
type Event interface {
iEvent()
}
type EventUpdateCount struct {
Bytes, Files int64
Finished bool
}
func (*EventUpdateCount) iEvent() {}
type EventUpdateProgress struct {
Bytes, Files int64
Finished bool
}
func (*EventUpdateProgress) iEvent() {}
type EventUpdateJob struct {
Job *Job
}
func (*EventUpdateJob) iEvent() {}
type EventReportError struct {
Error *Error
}
func (*EventReportError) iEvent() {}
type EventFinished struct{}
func (*EventFinished) iEvent() {}

4
go.mod
View File

@@ -10,10 +10,14 @@ require (
)
require (
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.0.4 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/rivo/uniseg v0.3.4 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect

188
index.go
View File

@@ -6,139 +6,123 @@ import (
"os"
"sort"
"sync/atomic"
"github.com/schollz/progressbar/v3"
"time"
)
const (
unexpectFileMode = os.ModeType &^ os.ModeDir
)
func (c *Copyer) index(ctx context.Context) {
for _, s := range c.src {
c.walk(nil, s)
}
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
bar.ChangeMax64(atomic.LoadInt64(&c.totalBytes))
bar.Describe(fmt.Sprintf("[0/%d] index finished...", atomic.LoadInt64(&c.totalFiles)))
})
type counter struct {
bytes, files int64
}
func (c *Copyer) walk(parent *baseJob, src *source) *baseJob {
path := src.path()
func (c *Copyer) index(ctx context.Context) (<-chan *baseJob, error) {
jobs := c.walk(ctx)
filtered, err := c.joinJobs(jobs)
if err != nil {
return nil, err
}
ch := make(chan *baseJob, 128)
go wrap(ctx, func() {
defer close(ch)
for _, job := range filtered {
select {
case <-ctx.Done():
return
case ch <- job:
}
}
})
return ch, nil
}
func (c *Copyer) walk(ctx context.Context) []*baseJob {
done := make(chan struct{})
defer close(done)
cntr := new(counter)
go wrap(ctx, func() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
c.submit(&EventUpdateCount{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files)})
case <-done:
c.submit(&EventUpdateCount{Bytes: atomic.LoadInt64(&cntr.bytes), Files: atomic.LoadInt64(&cntr.files), Finished: true})
return
}
}
})
jobs := make([]*baseJob, 0, 64)
appendJob := func(job *baseJob) {
jobs = append(jobs, job)
atomic.AddInt64(&cntr.files, 1)
atomic.AddInt64(&cntr.bytes, job.size)
}
var walk func(src *source)
walk = func(src *source) {
path := src.src()
stat, err := os.Stat(path)
if err != nil {
c.reportError(path, fmt.Errorf("walk get stat, %w", err))
return nil
}
if stat.Mode()&unexpectFileMode != 0 {
return nil
c.reportError(path, "", fmt.Errorf("walk get stat, %w", err))
return
}
job, err := newJobFromFileInfo(parent, src, stat)
mode := stat.Mode()
if mode&unexpectFileMode != 0 {
return
}
if !mode.IsDir() {
job, err := c.newJobFromFileInfo(src, stat)
if err != nil {
c.reportError(path, fmt.Errorf("make job fail, %w", err))
return nil
c.reportError(path, "", fmt.Errorf("make job fail, %w", err))
return
}
c.appendJobs(job)
if job.typ == jobTypeNormal {
totalBytes := atomic.AddInt64(&c.totalBytes, job.size)
totalFiles := atomic.AddInt64(&c.totalFiles, 1)
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
bar.ChangeMax64(totalBytes)
bar.Describe(fmt.Sprintf("[0/%d] indexing...", totalFiles))
})
return job
appendJob(job)
return
}
files, err := os.ReadDir(path)
if err != nil {
c.reportError(path, fmt.Errorf("walk read dir, %w", err))
return nil
c.reportError(path, "", fmt.Errorf("walk read dir, %w", err))
return
}
job.children = make(map[*baseJob]struct{}, len(files))
for _, file := range files {
id := c.walk(job, &source{base: src.base, relativePath: src.relativePath + "/" + file.Name()})
if id == nil {
continue
}
job.children[id] = struct{}{}
walk(src.append(file.Name()))
}
return job
return
}
for _, s := range c.src {
walk(s)
}
return jobs
}
func (c *Copyer) checkJobs() bool {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
if len(c.jobs) == 0 {
c.reportError("", fmt.Errorf("cannot found available jobs"))
return false
}
sort.Slice(c.jobs, func(i int, j int) bool {
return c.jobs[i].comparableRelativePath < c.jobs[j].comparableRelativePath
func (c *Copyer) joinJobs(jobs []*baseJob) ([]*baseJob, error) {
sort.Slice(jobs, func(i int, j int) bool {
return comparePath(jobs[i].source.path, jobs[j].source.path) < 0
})
var last *baseJob
filtered := make([]*baseJob, 0, len(c.jobs))
for _, job := range c.jobs {
if last == nil {
filtered := make([]*baseJob, 0, len(jobs))
for _, job := range jobs {
if last != nil && comparePath(last.source.path, job.source.path) == 0 {
c.reportError(last.source.src(), "", fmt.Errorf("same relative path, ignored, '%s'", job.source.src()))
continue
}
filtered = append(filtered, job)
last = job
continue
}
if last.source.relativePath != job.source.relativePath {
filtered = append(filtered, job)
last = job
continue
}
if last.typ != job.typ {
c.reportError(job.source.path(), fmt.Errorf("same relative path with different type, '%s' and '%s'", job.source.path(), last.source.path()))
return false
}
if last.typ == jobTypeNormal {
c.reportError(job.source.path(), fmt.Errorf("same relative path as normal file, ignored, '%s'", job.source.path()))
continue
}
func() {
last.lock.Lock()
defer last.lock.Unlock()
for n := range job.children {
last.children[n] = struct{}{}
n.parent = last
}
}()
}
c.jobs = filtered
return true
}
func (c *Copyer) appendJobs(jobs ...*baseJob) {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
c.jobs = append(c.jobs, jobs...)
}
func (c *Copyer) getJobs() []*baseJob {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
return c.jobs
}
func (c *Copyer) getJobsAndNoSpaceSource() ([]*baseJob, []*source) {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
return c.jobs, c.noSpaceSource
return filtered, nil
}

85
job.go
View File

@@ -2,21 +2,14 @@ package acp
import (
"encoding/hex"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/abc950309/acp/mmap"
)
type jobType uint8
const (
jobTypeNormal = jobType(iota)
jobTypeDir
)
type jobStatus uint8
const (
@@ -38,9 +31,8 @@ var (
)
type baseJob struct {
parent *baseJob
copyer *Copyer
source *source
typ jobType
name string // base name of the file
size int64 // length in bytes for regular files; system-dependent for others
@@ -50,32 +42,27 @@ type baseJob struct {
lock sync.Mutex
writeTime time.Time
status jobStatus
children map[*baseJob]struct{}
successTargets []string
failedTargets map[string]error
hash []byte
// utils
comparableRelativePath string
}
func newJobFromFileInfo(parent *baseJob, source *source, info os.FileInfo) (*baseJob, error) {
func (c *Copyer) newJobFromFileInfo(source *source, info os.FileInfo) (*baseJob, error) {
job := &baseJob{
parent: parent,
copyer: c,
source: source,
name: info.Name(),
size: info.Size(),
mode: info.Mode(),
modTime: info.ModTime(),
comparableRelativePath: strings.ReplaceAll(source.relativePath, "/", "\x00"),
}
if job.mode.IsDir() {
job.typ = jobTypeDir
if job.mode.IsDir() || job.mode&unexpectFileMode != 0 {
return nil, fmt.Errorf("unexpected file, path= %s", source.src())
}
c.submit(&EventUpdateJob{job.report()})
return job, nil
}
@@ -87,30 +74,24 @@ func (j *baseJob) setStatus(s jobStatus) {
if s == jobStatusCopying {
j.writeTime = time.Now()
}
j.copyer.submit(&EventUpdateJob{j.report()})
}
func (j *baseJob) setHash(h []byte) {
j.lock.Lock()
defer j.lock.Unlock()
j.hash = h
}
func (j *baseJob) done(child *baseJob) int {
if j.typ == jobTypeNormal {
return 0
}
j.lock.Lock()
defer j.lock.Unlock()
delete(j.children, child)
return len(j.children)
j.copyer.submit(&EventUpdateJob{j.report()})
}
func (j *baseJob) succes(path string) {
j.lock.Lock()
defer j.lock.Unlock()
j.successTargets = append(j.successTargets, path)
j.copyer.submit(&EventUpdateJob{j.report()})
}
func (j *baseJob) fail(path string, err error) {
@@ -121,24 +102,17 @@ func (j *baseJob) fail(path string, err error) {
j.failedTargets = make(map[string]error, 1)
}
j.failedTargets[path] = err
j.copyer.submit(&EventUpdateJob{j.report()})
}
func (j *baseJob) report() *File {
j.lock.Lock()
defer j.lock.Unlock()
fails := make(map[string]string, len(j.failedTargets))
for n, e := range j.failedTargets {
fails[n] = e.Error()
}
return &File{
Source: j.source.path(),
RelativePath: j.source.relativePath,
func (j *baseJob) report() *Job {
return &Job{
Base: j.source.base,
Path: j.source.path,
Status: statusMapping[j.status],
SuccessTargets: j.successTargets,
FailTargets: fails,
FailTargets: j.failedTargets,
Size: j.size,
Mode: j.mode,
@@ -151,4 +125,25 @@ func (j *baseJob) report() *File {
type writeJob struct {
*baseJob
src *mmap.ReaderAt
ch chan struct{}
}
func (wj *writeJob) done() {
wj.src.Close()
close(wj.ch)
}
type Job struct {
Base string `json:"base"`
Path []string `json:"path"`
Status string `json:"status"`
SuccessTargets []string `json:"success_target,omitempty"`
FailTargets map[string]error `json:"fail_target,omitempty"`
Size int64 `json:"size"`
Mode os.FileMode `json:"mode"`
ModTime time.Time `json:"mod_time"`
WriteTime time.Time `json:"write_time"`
SHA256 string `json:"sha256"`
}

140
opt.go
View File

@@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path"
"sort"
"strings"
"github.com/sirupsen/logrus"
@@ -11,15 +12,23 @@ import (
type source struct {
base string
relativePath string
path []string
}
func (s *source) path() string {
return s.base + s.relativePath
func (s *source) src() string {
return s.base + path.Join(s.path...)
}
func (s *source) target(dst string) string {
return dst + s.relativePath
func (s *source) dst(dst string) string {
return dst + path.Join(s.path...)
}
func (s *source) append(next ...string) *source {
copyed := make([]string, len(s.path)+len(next))
copy(copyed, s.path)
copy(copyed[len(s.path):], next)
return &source{base: s.base, path: copyed}
}
type option struct {
@@ -29,21 +38,18 @@ type option struct {
fromDevice *deviceOption
toDevice *deviceOption
overwrite bool
withProgressBar bool
threads int
createFlag int
withHash bool
autoFill bool
autoFillSplitDepth int
logger *logrus.Logger
eventHanders []EventHandler
}
func newOption() *option {
return &option{
fromDevice: new(deviceOption),
toDevice: new(deviceOption),
createFlag: os.O_WRONLY | os.O_CREATE | os.O_EXCL,
}
}
@@ -73,17 +79,20 @@ func (o *option) check() error {
if len(o.src) == 0 {
return fmt.Errorf("source path not found")
}
sort.Slice(o.src, func(i, j int) bool {
return comparePath(o.src[i].path, o.src[j].path) < 0
})
for _, s := range o.src {
if _, err := os.Stat(s.path()); err != nil {
return fmt.Errorf("check src path '%s', %w", s.path(), err)
src := s.src()
if _, err := os.Stat(src); err != nil {
return fmt.Errorf("check src path '%s', %w", src, err)
}
}
if o.threads < 1 {
o.threads = 4
}
if o.fromDevice.linear || o.toDevice.linear {
o.threads = 1
o.fromDevice.check()
o.toDevice.check()
if o.logger == nil {
o.logger = logrus.StandardLogger()
}
return nil
}
@@ -99,23 +108,17 @@ func Source(paths ...string) Option {
}
base, name := path.Split(p)
o.src = append(o.src, &source{base: base, relativePath: name})
o.src = append(o.src, &source{base: base, path: []string{name}})
}
return o
}
}
func AccurateSource(base string, relativePaths ...string) Option {
func AccurateSource(base string, paths ...[]string) Option {
return func(o *option) *option {
base = path.Clean(base)
for _, p := range relativePaths {
p = path.Clean(p)
if p[len(p)-1] == '/' {
p = p[:len(p)-1]
}
o.src = append(o.src, &source{base: base, relativePath: p})
for _, path := range paths {
o.src = append(o.src, &source{base: base, path: path})
}
return o
}
@@ -154,23 +157,18 @@ func SetToDevice(opts ...DeviceOption) Option {
func Overwrite(b bool) Option {
return func(o *option) *option {
o.overwrite = b
if b {
o.createFlag = os.O_WRONLY | os.O_CREATE | os.O_TRUNC
return o
}
o.createFlag = os.O_WRONLY | os.O_CREATE | os.O_EXCL
return o
}
}
func WithProgressBar(b bool) Option {
return func(o *option) *option {
o.withProgressBar = b
return o
}
}
func WithThreads(threads int) Option {
return func(o *option) *option {
o.threads = threads
return o
}
func WithProgressBar() Option {
return WithEventHandler(NewProgressBar())
}
func WithHash(b bool) Option {
@@ -180,17 +178,61 @@ func WithHash(b bool) Option {
}
}
func WithAutoFill(on bool, depth int) Option {
return func(o *option) *option {
o.autoFill = on
o.autoFillSplitDepth = depth
return o
}
}
func WithLogger(logger *logrus.Logger) Option {
return func(o *option) *option {
o.logger = logger
return o
}
}
func WithEventHandler(h EventHandler) Option {
return func(o *option) *option {
o.eventHanders = append(o.eventHanders, h)
return o
}
}
func comparePath(a, b []string) int {
al, bl := len(a), len(b)
l := al
if bl < al {
l = bl
}
for idx := 0; idx < l; idx++ {
if a[idx] < b[idx] {
return -1
}
if a[idx] > b[idx] {
return 1
}
}
if al < bl {
return -1
}
if al > bl {
return 1
}
return 0
}
// isChild return -1(not) 0(equal) 1(child)
func isChild(parent, child []string) int {
pl, cl := len(parent), len(child)
if pl > cl {
return -1
}
for idx := 0; idx < pl; idx++ {
if parent[idx] != child[idx] {
return -1
}
}
if pl == cl {
return 0
}
return 1
}

View File

@@ -2,6 +2,16 @@ package acp
type deviceOption struct {
linear bool
threads int
}
func (do *deviceOption) check() {
if do.threads == 0 {
do.threads = 8
}
if do.linear {
do.threads = 1
}
}
type DeviceOption func(*deviceOption) *deviceOption
@@ -12,3 +22,10 @@ func LinearDevice(b bool) DeviceOption {
return d
}
}
func DeviceThreads(threads int) DeviceOption {
return func(d *deviceOption) *deviceOption {
d.threads = threads
return d
}
}

57
prepare.go Normal file
View File

@@ -0,0 +1,57 @@
package acp
import (
"context"
"fmt"
"sync"
"github.com/abc950309/acp/mmap"
)
func (c *Copyer) prepare(ctx context.Context, indexed <-chan *baseJob) <-chan *writeJob {
chanLen := 32
if c.fromDevice.linear {
chanLen = 0
}
var wg sync.WaitGroup
ch := make(chan *writeJob, chanLen)
defer func() {
go wrap(ctx, func() {
defer close(ch)
wg.Wait()
})
}()
for idx := 0; idx < c.fromDevice.threads; idx++ {
wg.Add(1)
go wrap(ctx, func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-indexed:
if !ok {
return
}
job.setStatus(jobStatusPreparing)
file, err := mmap.Open(job.source.src())
if err != nil {
c.reportError(job.source.src(), "", fmt.Errorf("open src file fail, %w", err))
return
}
wj := &writeJob{baseJob: job, src: file, ch: make(chan struct{})}
ch <- wj
<-wj.ch
}
}
})
}
return ch
}

View File

@@ -1,25 +1,16 @@
package acp
import (
"context"
"fmt"
"os"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/schollz/progressbar/v3"
"github.com/sirupsen/logrus"
)
const (
barUpdateInterval = time.Nanosecond * 255002861
)
func (c *Copyer) startProgressBar(ctx context.Context) {
func NewProgressBar() EventHandler {
// progressBar := progressbar.DefaultBytes(1, "[0/0] indexing...")
progressBar := progressbar.NewOptions64(
bar := progressbar.NewOptions64(
1,
progressbar.OptionSetDescription("[0/0] indexing..."),
progressbar.OptionSetWriter(os.Stderr),
@@ -33,84 +24,26 @@ func (c *Copyer) startProgressBar(ctx context.Context) {
progressbar.OptionSetRenderBlankState(true),
)
ch := make(chan interface{}, 8)
c.updateProgressBar = func(f func(bar *progressbar.ProgressBar)) {
ch <- f
}
c.updateCopying = func(f func(set map[int64]struct{})) {
ch <- f
}
c.logf = func(l logrus.Level, format string, args ...any) {
ch <- func() { logrus.StandardLogger().Logf(l, format, args...) }
var totalFiles int64
return func(ev Event) {
switch e := ev.(type) {
case *EventUpdateCount:
totalFiles = e.Files
bar.Describe(fmt.Sprintf("[0/%d] indexing...", totalFiles))
bar.ChangeMax64(e.Bytes)
return
case *EventUpdateProgress:
bar.Set64(e.Bytes)
if !e.Finished {
bar.Describe(fmt.Sprintf("[%d/%d] copying...", e.Files, totalFiles))
return
}
go wrap(ctx, func() {
copying := make(map[int64]struct{}, c.threads)
for f := range ch {
if progressBar == nil {
continue
}
switch v := f.(type) {
case func(bar *progressbar.ProgressBar):
v(progressBar)
case func(set map[int64]struct{}):
v(copying)
idxs := make([]int64, 0, len(copying))
for idx := range copying {
idxs = append(idxs, idx)
}
sort.Slice(idxs, func(i, j int) bool {
return idxs[i] < idxs[j]
})
strs := make([]string, 0, len(idxs))
for _, idx := range idxs {
strs = append(strs, fmt.Sprintf("file %d", idx))
}
copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles)
progressBar.Describe(fmt.Sprintf("[%d/%d] copying... %s", copyedFiles, totalFiles, strings.Join(strs, ", ")))
case func():
v()
}
}
})
go wrap(ctx, func() {
defer close(ch)
ticker := time.NewTicker(barUpdateInterval) // around 255ms, avoid conflict with progress bar fresh by second
defer ticker.Stop()
var lastCopyedBytes int64
for {
select {
case <-ticker.C:
switch atomic.LoadInt64(&c.stage) {
case StageCopy:
currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes)
diff := currentCopyedBytes - lastCopyedBytes
lastCopyedBytes = currentCopyedBytes
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
bar.Add64(diff)
})
}
case <-ctx.Done():
currentCopyedBytes := atomic.LoadInt64(&c.copyedBytes)
diff := int(currentCopyedBytes - lastCopyedBytes)
lastCopyedBytes = currentCopyedBytes
copyedFiles, totalFiles := atomic.LoadInt64(&c.copyedFiles), atomic.LoadInt64(&c.totalFiles)
c.updateProgressBar(func(bar *progressbar.ProgressBar) {
bar.Add(diff)
bar.Describe(fmt.Sprintf("[%d/%d] finished!", copyedFiles, totalFiles))
})
bar.Describe(fmt.Sprintf("[%d/%d] finishing...", e.Files, totalFiles))
return
default:
return
}
}
})
}

108
report.go
View File

@@ -1,94 +1,54 @@
package acp
import (
"encoding/json"
"fmt"
"os"
"time"
"path"
"sync"
)
func (c *Copyer) Report() *Report {
jobs, nss := c.getJobsAndNoSpaceSource()
errs := c.getErrors()
type ReportGetter func() *Report
files := make([]*File, 0, len(jobs))
for _, job := range jobs {
files = append(files, job.report())
func NewReportGetter() (EventHandler, ReportGetter) {
var lock sync.Mutex
jobs := make(map[string]*Job, 8)
errors := make([]*Error, 0)
handler := func(ev Event) {
switch e := ev.(type) {
case *EventUpdateJob:
lock.Lock()
defer lock.Unlock()
key := path.Join(e.Job.Path...)
jobs[key] = e.Job
case *EventReportError:
lock.Lock()
defer lock.Unlock()
errors = append(errors, e.Error)
}
noSpaceSources := make([]*FilePath, 0, len(nss))
for _, s := range nss {
if len(noSpaceSources) == 0 {
noSpaceSources = append(noSpaceSources, &FilePath{Base: s.base, RelativePaths: []string{s.relativePath}})
continue
}
getter := func() *Report {
lock.Lock()
defer lock.Unlock()
if last := noSpaceSources[len(noSpaceSources)-1]; last.Base == s.base {
last.RelativePaths = append(last.RelativePaths, s.relativePath)
continue
jobsCopyed := make([]*Job, 0, len(jobs))
for _, j := range jobs {
jobsCopyed = append(jobsCopyed, j)
}
noSpaceSources = append(noSpaceSources, &FilePath{Base: s.base, RelativePaths: []string{s.relativePath}})
errorsCopyed := make([]*Error, 0, len(jobs))
for _, e := range errors {
errorsCopyed = append(errorsCopyed, e)
}
return &Report{
Files: files,
NoSpaceSources: noSpaceSources,
Errors: errs,
Jobs: jobsCopyed,
Errors: errorsCopyed,
}
}
var (
_ = error(new(Error))
_ = json.Marshaler(new(Error))
_ = json.Unmarshaler(new(Error))
)
type Error struct {
Path string `json:"path,omitempty"`
Err error `json:"error,omitempty"`
}
func (e *Error) Error() string {
return fmt.Sprintf("[%s]: %s", e.Path, e.Err)
}
func (e *Error) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{"path": e.Path, "error": e.Err.Error()})
}
func (e *Error) UnmarshalJSON(buf []byte) error {
m := make(map[string]string, 2)
if err := json.Unmarshal(buf, &m); err != nil {
return err
}
e.Path, e.Err = m["path"], fmt.Errorf(m["error"])
return nil
}
type File struct {
Source string `json:"source"`
RelativePath string `json:"relative_path"`
Status string `json:"status"`
SuccessTargets []string `json:"success_target,omitempty"`
FailTargets map[string]string `json:"fail_target,omitempty"`
Size int64 `json:"size"`
Mode os.FileMode `json:"mode"`
ModTime time.Time `json:"mod_time"`
WriteTime time.Time `json:"write_time"`
SHA256 string `json:"sha256"`
}
type FilePath struct {
Base string `json:"base,omitempty"`
RelativePaths []string `json:"relative_paths,omitempty"`
return handler, getter
}
type Report struct {
Files []*File `json:"files,omitempty"`
NoSpaceSources []*FilePath `json:"no_space_sources,omitempty"`
Jobs []*Job `json:"files,omitempty"`
Errors []*Error `json:"errors,omitempty"`
}