From 5231f1657b3883e6ba7f4fa96c446b3db674a15b Mon Sep 17 00:00:00 2001 From: Samuel N Cui Date: Wed, 27 Sep 2023 17:57:10 +0800 Subject: [PATCH] fix: large file make ltfs into readonly mode --- copy.go | 78 +++++++++++++++++++++++++++++++++++++++++------------- go.mod | 25 +++++++++-------- job.go | 13 +++++---- prepare.go | 18 ++++++------- 4 files changed, 88 insertions(+), 46 deletions(-) diff --git a/copy.go b/copy.go index eb716c4..32517d6 100644 --- a/copy.go +++ b/copy.go @@ -14,8 +14,9 @@ import ( "time" mapset "github.com/deckarep/golang-set/v2" - "github.com/hashicorp/go-multierror" sha256 "github.com/minio/sha256-simd" + "github.com/samber/lo" + "github.com/samuelncui/godf" ) const ( @@ -24,6 +25,9 @@ const ( var ( sha256Pool = &sync.Pool{New: func() interface{} { return sha256.New() }} + + ErrTargetNoSpace = fmt.Errorf("acp: target have no space") + ErrTargetDropToReadonly = fmt.Errorf("acp: target droped into readonly") ) func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *baseJob { @@ -54,7 +58,7 @@ func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *ba } }) - badDsts := mapset.NewSet[string]() + noSpaceDevices := mapset.NewSet[string]() for idx := 0; idx < c.toDevice.threads; idx++ { copying.Add(1) go wrap(ctx, func() { @@ -69,7 +73,12 @@ func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *ba return } - wrap(ctx, func() { c.write(ctx, job, ch, cntr, badDsts) }) + if noSpaceDevices.Contains(lo.Map(job.targets, func(target string, _ int) string { return c.getDevice(target) })...) { + job.fail("", ErrTargetNoSpace) + continue + } + + wrap(ctx, func() { c.write(ctx, job, ch, cntr, noSpaceDevices) }) } } }) @@ -78,7 +87,7 @@ func (c *Copyer) copy(ctx context.Context, prepared <-chan *writeJob) <-chan *ba return ch } -func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, cntr *counter, badDsts mapset.Set[string]) { +func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, cntr *counter, noSpaceDevices mapset.Set[string]) { job.setStatus(jobStatusCopying) defer job.setStatus(jobStatusFinishing) @@ -102,15 +111,33 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c target := target dev := c.getDevice(target) - if badDsts.Contains(dev) { - job.fail(target, fmt.Errorf("bad target path")) + if noSpaceDevices.Contains(dev) { + job.fail(target, ErrTargetNoSpace) + continue + } + + diskUsage, err := godf.NewDiskUsage(dev) + if err != nil { + job.fail(target, fmt.Errorf("read disk usage fail, dev= '%s', %w", dev, err)) + continue + } + if int64(diskUsage.Free()) < job.size { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, want= %d have= %d", ErrTargetNoSpace, job.size, diskUsage.Free())) continue } if err := os.MkdirAll(path.Dir(target), os.ModePerm); err != nil { // if no space - if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { - badDsts.Add(dev) + if errors.Is(err, syscall.ENOSPC) { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, mkdir dst dir fail", ErrTargetNoSpace)) + continue + } + if errors.Is(err, syscall.EROFS) { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, mkdir dst dir fail", ErrTargetDropToReadonly)) + continue } job.fail(target, fmt.Errorf("mkdir dst dir fail, %w", err)) @@ -120,8 +147,15 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c file, err := os.OpenFile(target, c.createFlag, job.mode) if err != nil { // if no space - if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { - badDsts.Add(dev) + if errors.Is(err, syscall.ENOSPC) { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, open dst file fail", ErrTargetNoSpace)) + continue + } + if errors.Is(err, syscall.EROFS) { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, open dst file fail", ErrTargetDropToReadonly)) + continue } job.fail(target, fmt.Errorf("open dst file fail, %w", err)) @@ -146,17 +180,23 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c for range ch { } + if err := os.Remove(target); err != nil { + c.reportError(job.path, target, fmt.Errorf("delete failed file has error, %w", err)) + } + // if no space - if errors.Is(err, syscall.ENOSPC) || errors.Is(err, syscall.EROFS) { - badDsts.Add(dev) + if errors.Is(rerr, syscall.ENOSPC) { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, write dst file fail", ErrTargetNoSpace)) + return + } + if errors.Is(rerr, syscall.EROFS) { + noSpaceDevices.Add(dev) + job.fail(target, fmt.Errorf("%w, write dst file fail", ErrTargetDropToReadonly)) + return } - if re := os.Remove(target); re != nil { - rerr = multierror.Append(rerr, re) - } - - c.reportError(job.path, target, rerr) - job.fail(target, rerr) + job.fail(target, fmt.Errorf("write dst file fail, %w", rerr)) }() defer file.Close() @@ -200,7 +240,7 @@ func (c *Copyer) write(ctx context.Context, job *writeJob, ch chan<- *baseJob, c job.setHash(sha.Sum(nil)) }) } - readErr = c.streamCopy(ctx, chans, job.src, &cntr.bytes) + readErr = c.streamCopy(ctx, chans, job.reader, &cntr.bytes) } func (c *Copyer) streamCopy(ctx context.Context, dsts []chan []byte, src io.ReadCloser, bytes *int64) error { diff --git a/go.mod b/go.mod index e6cf6dc..241f8dd 100644 --- a/go.mod +++ b/go.mod @@ -4,25 +4,24 @@ go 1.18 require ( github.com/davecgh/go-spew v1.1.1 - github.com/deckarep/golang-set/v2 v2.1.0 - github.com/hashicorp/go-multierror v1.1.1 + github.com/deckarep/golang-set/v2 v2.3.1 github.com/json-iterator/go v1.1.12 - github.com/klauspost/cpuid/v2 v2.0.4 - github.com/minio/sha256-simd v1.0.0 + github.com/klauspost/cpuid/v2 v2.2.5 + github.com/minio/sha256-simd v1.0.1 github.com/moby/sys/mountinfo v0.6.2 github.com/modern-go/reflect2 v1.0.2 github.com/samber/lo v1.38.1 - github.com/schollz/progressbar/v3 v3.10.1 - github.com/sirupsen/logrus v1.9.0 + github.com/samuelncui/godf v0.0.0-20230927093204-37ea5acb9fc1 + github.com/schollz/progressbar/v3 v3.13.1 + github.com/sirupsen/logrus v1.9.3 ) require ( - github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect - github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect - github.com/rivo/uniseg v0.3.4 // indirect - golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect - golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect - golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/rivo/uniseg v0.4.4 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/term v0.12.0 // indirect ) diff --git a/job.go b/job.go index 0110197..e0c8047 100644 --- a/job.go +++ b/job.go @@ -82,6 +82,7 @@ func (j *baseJob) fail(path string, err error) { if j.failedTargets == nil { j.failedTargets = make(map[string]error, 1) } + j.failedTargets[path] = err j.copyer.submit(&EventUpdateJob{j.report()}) } @@ -105,14 +106,16 @@ func (j *baseJob) report() *Job { type writeJob struct { *baseJob - src io.ReadCloser - ch chan struct{} + reader io.ReadCloser + size int64 + ch chan struct{} } -func newWriteJob(job *baseJob, src io.ReadCloser, needWait bool) *writeJob { +func newWriteJob(job *baseJob, src io.ReadCloser, size int64, needWait bool) *writeJob { j := &writeJob{ baseJob: job, - src: src, + reader: src, + size: size, } if needWait { j.ch = make(chan struct{}) @@ -121,7 +124,7 @@ func newWriteJob(job *baseJob, src io.ReadCloser, needWait bool) *writeJob { } func (wj *writeJob) done() { - wj.src.Close() + wj.reader.Close() if wj.ch != nil { close(wj.ch) diff --git a/prepare.go b/prepare.go index dcf4da2..5959451 100644 --- a/prepare.go +++ b/prepare.go @@ -41,39 +41,39 @@ func (c *Copyer) prepare(ctx context.Context, indexed <-chan *baseJob) <-chan *w job.setStatus(jobStatusPreparing) - file, err := func(path string) (io.ReadCloser, error) { + file, size, err := func(path string) (io.ReadCloser, int64, error) { if c.fromDevice.linear { file, err := os.Open(path) if err != nil { - return nil, fmt.Errorf("open src file fail, %w", err) + return nil, 0, fmt.Errorf("open src file fail, %w", err) } fileInfo, err := file.Stat() if err != nil { - return nil, fmt.Errorf("get src file stat fail, %w", err) + return nil, 0, fmt.Errorf("get src file stat fail, %w", err) } if fileInfo.Size() == 0 { - return nil, fmt.Errorf("get src file, size is zero") + return nil, 0, fmt.Errorf("get src file, size is zero") } - return file, nil + return file, fileInfo.Size(), nil } readerAt, err := mmap.Open(path) if err != nil { - return nil, fmt.Errorf("open src file by mmap fail, %w", err) + return nil, 0, fmt.Errorf("open src file by mmap fail, %w", err) } if readerAt.Len() == 0 { - return nil, fmt.Errorf("get src file by mmap, size is zero") + return nil, 0, fmt.Errorf("get src file by mmap, size is zero") } - return mmap.NewReader(readerAt), nil + return mmap.NewReader(readerAt), int64(readerAt.Len()), nil }(job.path) if err != nil { c.reportError(job.path, "", err) } - wj := newWriteJob(job, file, c.fromDevice.linear) + wj := newWriteJob(job, file, size, c.fromDevice.linear) ch <- wj wj.wait() }