feat: add continue

This commit is contained in:
崔竞宁
2022-09-14 00:53:10 +08:00
parent 11e3201692
commit 7e7d887b2e
7 changed files with 235 additions and 42 deletions

10
acp.go
View File

@@ -29,8 +29,9 @@ type Copyer struct {
updateCopying func(func(set map[int64]struct{}))
logf func(l logrus.Level, format string, args ...any)
jobsLock sync.Mutex
jobs []*baseJob
jobsLock sync.Mutex
jobs []*baseJob
noSpaceSource []*source
errsLock sync.Mutex
errors []*Error
@@ -108,6 +109,11 @@ func (c *Copyer) run(ctx context.Context) {
return
}
if err := c.applyAutoFillLimit(); err != nil {
c.reportError("_autofill", err)
return
}
c.copy(ctx)
}

90
autofill.go Normal file
View File

@@ -0,0 +1,90 @@
package acp
import (
"fmt"
"math"
"strings"
)
func (c *Copyer) applyAutoFillLimit() error {
if !c.autoFill {
return nil
}
counts := make(map[string]int, len(c.dst))
infos := make(map[string]*fileSystem, len(c.dst))
for _, d := range c.dst {
fsInfo, err := getFileSystem(d)
if err != nil {
return fmt.Errorf("get file system fail, %s, %w", d, err)
}
infos[fsInfo.MountPoint] = fsInfo
counts[fsInfo.MountPoint] = counts[fsInfo.MountPoint] + 1
}
min := int64(math.MaxInt64)
for mp, info := range infos {
size := info.AvailableSize / int64(counts[mp])
if size < min {
min = size
}
}
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
idx := c.getAutoFillCutoffIdx(min)
if idx < 0 {
return nil
}
if idx == 0 {
return fmt.Errorf("cannot found available auto fill slice, filesystem_size= %d", min)
}
cutoff := c.jobs[idx:]
c.jobs = c.jobs[:idx]
last := ""
for _, job := range cutoff {
job.parent.done(job)
if strings.HasPrefix(job.source.relativePath, last) {
if len(job.source.relativePath) == len(last) {
continue
}
if job.source.relativePath[len(last)] == '/' {
continue
}
}
c.noSpaceSource = append(c.noSpaceSource, job.source)
last = job.source.relativePath
}
return nil
}
func (c *Copyer) getAutoFillCutoffIdx(limit int64) int {
left := limit
targetIdx := -1
for idx, job := range c.jobs {
left -= job.size
if left < 0 {
targetIdx = idx
}
}
if targetIdx < 0 {
return -1
}
if c.autoFillSplitDepth <= 0 {
return targetIdx
}
for idx := targetIdx; idx >= 0; idx-- {
if strings.Count(c.jobs[idx].source.relativePath, "/") < c.autoFillSplitDepth {
return idx
}
}
return 0
}

View File

@@ -14,11 +14,13 @@ import (
var (
withProgressBar = flag.Bool("p", true, "display progress bar")
notOverwrite = flag.Bool("n", false, "not overwrite exist file")
continueReport = flag.String("c", "", "continue with previous report, for auto fill circumstances")
noTarget = flag.Bool("notarget", false, "do not have target, use as dir index tool")
reportPath = flag.String("report", "", "json report storage path")
reportIndent = flag.Bool("report-indent", false, "json report with indent")
fromLinear = flag.Bool("from-linear", false, "copy from linear device, such like tape drive")
toLinear = flag.Bool("to-linear", false, "copy to linear device, such like tape drive")
autoFillDepth = flag.Int("auto-fill-depth", 0, "auto fill the whole filesystem, split by specify filepath depth, and report will output left filepaths")
targetPaths []string
)
@@ -59,7 +61,25 @@ func main() {
}()
opts := make([]acp.Option, 0, 8)
opts = append(opts, acp.Source(sources...))
if *continueReport != "" {
f, err := os.Open(*continueReport)
if err != nil {
logrus.Fatalf("cannot open continue report file, %s", err)
}
r := new(acp.Report)
if err := json.NewDecoder(f).Decode(r); err != nil {
logrus.Fatalf("decode continue report file, %s", err)
}
for _, s := range r.NoSpaceSources {
logrus.Infof("restore unfinished: base= '%s' relative_path= '%s'", s.Base, s.RelativePath)
opts = append(opts, acp.AccurateSource(s.Base, s.RelativePath))
}
} else {
opts = append(opts, acp.Source(sources...))
}
opts = append(opts, acp.Target(targetPaths...))
opts = append(opts, acp.WithHash(*reportPath != ""))
opts = append(opts, acp.Overwrite(!*notOverwrite))
@@ -71,31 +91,37 @@ func main() {
if *toLinear {
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
}
if *autoFillDepth != 0 {
opts = append(opts, acp.WithAutoFill(true, *autoFillDepth))
}
c, err := acp.New(ctx, opts...)
if err != nil {
panic(err)
logrus.Fatalf("unexpected exit: %s", err)
}
report := c.Wait()
if *reportPath == "" {
return
}
defer func() {
if *reportPath == "" {
return
}
r, err := os.Create(*reportPath)
if err != nil {
logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err)
logrus.Infof("report: %s", report)
return
}
defer r.Close()
report := c.Report()
r, err := os.Create(*reportPath)
if err != nil {
logrus.Warnf("open report fail, path= '%s', err= %w", *reportPath, err)
logrus.Infof("report: %s", report)
return
}
defer r.Close()
var buf []byte
if *reportIndent {
buf, _ = json.MarshalIndent(report, "", "\t")
} else {
buf, _ = json.Marshal(report)
}
var buf []byte
if *reportIndent {
buf, _ = json.MarshalIndent(report, "", "\t")
} else {
buf, _ = json.Marshal(report)
}
r.Write(buf)
r.Write(buf)
}()
c.Wait()
}

41
fs.go Normal file
View File

@@ -0,0 +1,41 @@
package acp
import (
"fmt"
"syscall"
)
type fileSystem struct {
TypeName string
MountPoint string
TotalSize int64
AvailableSize int64
}
func getFileSystem(path string) (*fileSystem, error) {
stat := new(syscall.Statfs_t)
if err := syscall.Statfs(path, stat); err != nil {
return nil, fmt.Errorf("read statfs fail, err= %w", err)
}
return &fileSystem{
TypeName: unpaddingInt8s(stat.Fstypename[:]),
MountPoint: unpaddingInt8s(stat.Mntonname[:]),
TotalSize: int64(stat.Blocks) * int64(stat.Bsize),
AvailableSize: int64(stat.Bavail) * int64(stat.Bsize),
}, nil
}
func unpaddingInt8s(buf []int8) string {
result := make([]byte, 0, len(buf))
for _, c := range buf {
if c == 0x00 {
break
}
result = append(result, byte(c))
}
return string(result)
}

View File

@@ -74,18 +74,6 @@ func (c *Copyer) walk(parent *baseJob, src *source) *baseJob {
return job
}
func (c *Copyer) appendJobs(jobs ...*baseJob) {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
c.jobs = append(c.jobs, jobs...)
}
func (c *Copyer) getJobs() []*baseJob {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
return c.jobs
}
func (c *Copyer) checkJobs() bool {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
@@ -136,3 +124,21 @@ func (c *Copyer) checkJobs() bool {
c.jobs = filtered
return true
}
func (c *Copyer) appendJobs(jobs ...*baseJob) {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
c.jobs = append(c.jobs, jobs...)
}
func (c *Copyer) getJobs() []*baseJob {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
return c.jobs
}
func (c *Copyer) getJobsAndNoSpaceSource() ([]*baseJob, []*source) {
c.jobsLock.Lock()
defer c.jobsLock.Unlock()
return c.jobs, c.noSpaceSource
}

11
opt.go
View File

@@ -31,6 +31,9 @@ type option struct {
withProgressBar bool
threads int
withHash bool
autoFill bool
autoFillSplitDepth int
}
func newOption() *option {
@@ -179,3 +182,11 @@ func WithHash(b bool) Option {
return o
}
}
func WithAutoFill(on bool, depth int) Option {
return func(o *option) *option {
o.autoFill = on
o.autoFillSplitDepth = depth
return o
}
}

View File

@@ -8,16 +8,23 @@ import (
)
func (c *Copyer) Report() *Report {
jobs, errs := c.getJobs(), c.getErrors()
jobs, nss := c.getJobsAndNoSpaceSource()
errs := c.getErrors()
files := make([]*File, 0, len(jobs))
for _, job := range jobs {
files = append(files, job.report())
}
noSpaceSources := make([]*FilePath, 0, len(nss))
for _, s := range nss {
noSpaceSources = append(noSpaceSources, &FilePath{Base: s.base, RelativePath: s.relativePath})
}
return &Report{
Files: files,
Errors: errs,
Files: files,
NoSpaceSources: noSpaceSources,
Errors: errs,
}
}
@@ -55,8 +62,8 @@ type File struct {
RelativePath string `json:"relative_path"`
Status string `json:"status"`
SuccessTargets []string `json:"success_target"`
FailTargets map[string]string `json:"fail_target"`
SuccessTargets []string `json:"success_target,omitempty"`
FailTargets map[string]string `json:"fail_target,omitempty"`
Size int64 `json:"size"`
Mode os.FileMode `json:"mode"`
@@ -65,7 +72,13 @@ type File struct {
SHA256 string `json:"sha256"`
}
type Report struct {
Files []*File `json:"files,omitempty"`
Errors []*Error `json:"errors,omitempty"`
type FilePath struct {
Base string `json:"base,omitempty"`
RelativePath string `json:"relative_path,omitempty"`
}
type Report struct {
Files []*File `json:"files,omitempty"`
NoSpaceSources []*FilePath `json:"no_space_sources,omitempty"`
Errors []*Error `json:"errors,omitempty"`
}