From 7e7d887b2e9a129cc353d1aaa08640c6fd71e070 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E7=AB=9E=E5=AE=81?= Date: Wed, 14 Sep 2022 00:53:10 +0800 Subject: [PATCH] feat: add continue --- acp.go | 10 ++++-- autofill.go | 90 +++++++++++++++++++++++++++++++++++++++++++++++++ cmd/acp/main.go | 66 +++++++++++++++++++++++++----------- fs.go | 41 ++++++++++++++++++++++ index.go | 30 ++++++++++------- opt.go | 11 ++++++ report.go | 29 +++++++++++----- 7 files changed, 235 insertions(+), 42 deletions(-) create mode 100644 autofill.go create mode 100644 fs.go diff --git a/acp.go b/acp.go index 293db97..c7a29ce 100644 --- a/acp.go +++ b/acp.go @@ -29,8 +29,9 @@ type Copyer struct { updateCopying func(func(set map[int64]struct{})) logf func(l logrus.Level, format string, args ...any) - jobsLock sync.Mutex - jobs []*baseJob + jobsLock sync.Mutex + jobs []*baseJob + noSpaceSource []*source errsLock sync.Mutex errors []*Error @@ -108,6 +109,11 @@ func (c *Copyer) run(ctx context.Context) { return } + if err := c.applyAutoFillLimit(); err != nil { + c.reportError("_autofill", err) + return + } + c.copy(ctx) } diff --git a/autofill.go b/autofill.go new file mode 100644 index 0000000..abba816 --- /dev/null +++ b/autofill.go @@ -0,0 +1,90 @@ +package acp + +import ( + "fmt" + "math" + "strings" +) + +func (c *Copyer) applyAutoFillLimit() error { + if !c.autoFill { + return nil + } + + counts := make(map[string]int, len(c.dst)) + infos := make(map[string]*fileSystem, len(c.dst)) + for _, d := range c.dst { + fsInfo, err := getFileSystem(d) + if err != nil { + return fmt.Errorf("get file system fail, %s, %w", d, err) + } + + infos[fsInfo.MountPoint] = fsInfo + counts[fsInfo.MountPoint] = counts[fsInfo.MountPoint] + 1 + } + + min := int64(math.MaxInt64) + for mp, info := range infos { + size := info.AvailableSize / int64(counts[mp]) + if size < min { + min = size + } + } + + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + + idx := c.getAutoFillCutoffIdx(min) + if idx < 0 { + return nil + } + if idx == 0 { + return fmt.Errorf("cannot found available auto fill slice, filesystem_size= %d", min) + } + + cutoff := c.jobs[idx:] + c.jobs = c.jobs[:idx] + last := "" + for _, job := range cutoff { + job.parent.done(job) + + if strings.HasPrefix(job.source.relativePath, last) { + if len(job.source.relativePath) == len(last) { + continue + } + if job.source.relativePath[len(last)] == '/' { + continue + } + } + + c.noSpaceSource = append(c.noSpaceSource, job.source) + last = job.source.relativePath + } + return nil +} + +func (c *Copyer) getAutoFillCutoffIdx(limit int64) int { + left := limit + targetIdx := -1 + for idx, job := range c.jobs { + left -= job.size + if left < 0 { + targetIdx = idx + } + } + if targetIdx < 0 { + return -1 + } + + if c.autoFillSplitDepth <= 0 { + return targetIdx + } + + for idx := targetIdx; idx >= 0; idx-- { + if strings.Count(c.jobs[idx].source.relativePath, "/") < c.autoFillSplitDepth { + return idx + } + } + + return 0 +} diff --git a/cmd/acp/main.go b/cmd/acp/main.go index 0755e49..23cf796 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -14,11 +14,13 @@ import ( var ( withProgressBar = flag.Bool("p", true, "display progress bar") notOverwrite = flag.Bool("n", false, "not overwrite exist file") + continueReport = flag.String("c", "", "continue with previous report, for auto fill circumstances") noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool") reportPath = flag.String("report", "", "json report storage path") reportIndent = flag.Bool("report-indent", false, "json report with indent") fromLinear = flag.Bool("from-linear", false, "copy from linear device, such like tape drive") toLinear = flag.Bool("to-linear", false, "copy to linear device, such like tape drive") + autoFillDepth = flag.Int("auto-fill-depth", 0, "auto fill the whole filesystem, split by specify filepath depth, and report will output left filepaths") targetPaths []string ) @@ -59,7 +61,25 @@ func main() { }() opts := make([]acp.Option, 0, 8) - opts = append(opts, acp.Source(sources...)) + if *continueReport != "" { + f, err := os.Open(*continueReport) + if err != nil { + logrus.Fatalf("cannot open continue report file, %s", err) + } + + r := new(acp.Report) + if err := json.NewDecoder(f).Decode(r); err != nil { + logrus.Fatalf("decode continue report file, %s", err) + } + + for _, s := range r.NoSpaceSources { + logrus.Infof("restore unfinished: base= '%s' relative_path= '%s'", s.Base, s.RelativePath) + opts = append(opts, acp.AccurateSource(s.Base, s.RelativePath)) + } + } else { + opts = append(opts, acp.Source(sources...)) + } + opts = append(opts, acp.Target(targetPaths...)) opts = append(opts, acp.WithHash(*reportPath != "")) opts = append(opts, acp.Overwrite(!*notOverwrite)) @@ -71,31 +91,37 @@ func main() { if *toLinear { opts = append(opts, acp.SetToDevice(acp.LinearDevice(true))) } + if *autoFillDepth != 0 { + opts = append(opts, acp.WithAutoFill(true, *autoFillDepth)) + } c, err := acp.New(ctx, opts...) if err != nil { - panic(err) + logrus.Fatalf("unexpected exit: %s", err) } - report := c.Wait() - if *reportPath == "" { - return - } + defer func() { + if *reportPath == "" { + return + } - r, err := os.Create(*reportPath) - if err != nil { - logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err) - logrus.Infof("report: %s", report) - return - } - defer r.Close() + report := c.Report() + r, err := os.Create(*reportPath) + if err != nil { + logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err) + logrus.Infof("report: %s", report) + return + } + defer r.Close() - var buf []byte - if *reportIndent { - buf, _ = json.MarshalIndent(report, "", "\t") - } else { - buf, _ = json.Marshal(report) - } + var buf []byte + if *reportIndent { + buf, _ = json.MarshalIndent(report, "", "\t") + } else { + buf, _ = json.Marshal(report) + } - r.Write(buf) + r.Write(buf) + }() + c.Wait() } diff --git a/fs.go b/fs.go new file mode 100644 index 0000000..1aeda4e --- /dev/null +++ b/fs.go @@ -0,0 +1,41 @@ +package acp + +import ( + "fmt" + "syscall" +) + +type fileSystem struct { + TypeName string + MountPoint string + TotalSize int64 + AvailableSize int64 +} + +func getFileSystem(path string) (*fileSystem, error) { + stat := new(syscall.Statfs_t) + + if err := syscall.Statfs(path, stat); err != nil { + return nil, fmt.Errorf("read statfs fail, err= %w", err) + } + + return &fileSystem{ + TypeName: unpaddingInt8s(stat.Fstypename[:]), + MountPoint: unpaddingInt8s(stat.Mntonname[:]), + TotalSize: int64(stat.Blocks) * int64(stat.Bsize), + AvailableSize: int64(stat.Bavail) * int64(stat.Bsize), + }, nil +} + +func unpaddingInt8s(buf []int8) string { + result := make([]byte, 0, len(buf)) + for _, c := range buf { + if c == 0x00 { + break + } + + result = append(result, byte(c)) + } + + return string(result) +} diff --git a/index.go b/index.go index b4ad4c3..1b83c08 100644 --- a/index.go +++ b/index.go @@ -74,18 +74,6 @@ func (c *Copyer) walk(parent *baseJob, src *source) *baseJob { return job } -func (c *Copyer) appendJobs(jobs ...*baseJob) { - c.jobsLock.Lock() - defer c.jobsLock.Unlock() - c.jobs = append(c.jobs, jobs...) -} - -func (c *Copyer) getJobs() []*baseJob { - c.jobsLock.Lock() - defer c.jobsLock.Unlock() - return c.jobs -} - func (c *Copyer) checkJobs() bool { c.jobsLock.Lock() defer c.jobsLock.Unlock() @@ -136,3 +124,21 @@ func (c *Copyer) checkJobs() bool { c.jobs = filtered return true } + +func (c *Copyer) appendJobs(jobs ...*baseJob) { + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + c.jobs = append(c.jobs, jobs...) +} + +func (c *Copyer) getJobs() []*baseJob { + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + return c.jobs +} + +func (c *Copyer) getJobsAndNoSpaceSource() ([]*baseJob, []*source) { + c.jobsLock.Lock() + defer c.jobsLock.Unlock() + return c.jobs, c.noSpaceSource +} diff --git a/opt.go b/opt.go index 56937a3..449152f 100644 --- a/opt.go +++ b/opt.go @@ -31,6 +31,9 @@ type option struct { withProgressBar bool threads int withHash bool + + autoFill bool + autoFillSplitDepth int } func newOption() *option { @@ -179,3 +182,11 @@ func WithHash(b bool) Option { return o } } + +func WithAutoFill(on bool, depth int) Option { + return func(o *option) *option { + o.autoFill = on + o.autoFillSplitDepth = depth + return o + } +} diff --git a/report.go b/report.go index ab97292..f259992 100644 --- a/report.go +++ b/report.go @@ -8,16 +8,23 @@ import ( ) func (c *Copyer) Report() *Report { - jobs, errs := c.getJobs(), c.getErrors() + jobs, nss := c.getJobsAndNoSpaceSource() + errs := c.getErrors() files := make([]*File, 0, len(jobs)) for _, job := range jobs { files = append(files, job.report()) } + noSpaceSources := make([]*FilePath, 0, len(nss)) + for _, s := range nss { + noSpaceSources = append(noSpaceSources, &FilePath{Base: s.base, RelativePath: s.relativePath}) + } + return &Report{ - Files: files, - Errors: errs, + Files: files, + NoSpaceSources: noSpaceSources, + Errors: errs, } } @@ -55,8 +62,8 @@ type File struct { RelativePath string `json:"relative_path"` Status string `json:"status"` - SuccessTargets []string `json:"success_target"` - FailTargets map[string]string `json:"fail_target"` + SuccessTargets []string `json:"success_target,omitempty"` + FailTargets map[string]string `json:"fail_target,omitempty"` Size int64 `json:"size"` Mode os.FileMode `json:"mode"` @@ -65,7 +72,13 @@ type File struct { SHA256 string `json:"sha256"` } -type Report struct { - Files []*File `json:"files,omitempty"` - Errors []*Error `json:"errors,omitempty"` +type FilePath struct { + Base string `json:"base,omitempty"` + RelativePath string `json:"relative_path,omitempty"` +} + +type Report struct { + Files []*File `json:"files,omitempty"` + NoSpaceSources []*FilePath `json:"no_space_sources,omitempty"` + Errors []*Error `json:"errors,omitempty"` }