mirror of
https://github.com/samuelncui/yatm.git
synced 2026-01-05 04:55:23 +00:00
fix: large file make ltfs into readonly mode
This commit is contained in:
@@ -3,6 +3,7 @@ package executor
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@@ -188,6 +189,7 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
|
||||
a.progress = newProgress()
|
||||
defer func() { a.progress = nil }()
|
||||
|
||||
var dropToReadonly bool
|
||||
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
|
||||
switch e := ev.(type) {
|
||||
case *acp.EventUpdateCount:
|
||||
@@ -207,19 +209,30 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
|
||||
|
||||
var targetStatus entity.CopyStatus
|
||||
switch job.Status {
|
||||
case "pending":
|
||||
case acp.JobStatusPending, acp.JobStatusPreparing:
|
||||
targetStatus = entity.CopyStatus_PENDING
|
||||
case "preparing":
|
||||
case acp.JobStatusCopying:
|
||||
targetStatus = entity.CopyStatus_RUNNING
|
||||
case "finished":
|
||||
a.logger.WithContext(ctx).Infof("file '%s' copy finished, size= %d", src.RealPath(), job.Size)
|
||||
targetStatus = entity.CopyStatus_STAGED
|
||||
case acp.JobStatusFinished:
|
||||
targetStatus = entity.CopyStatus_FAILED
|
||||
if len(job.SuccessTargets) > 0 {
|
||||
a.logger.WithContext(ctx).Infof("file '%s' copy success, size= %d", src.RealPath(), job.Size)
|
||||
targetStatus = entity.CopyStatus_STAGED
|
||||
break // break from switch
|
||||
}
|
||||
|
||||
for dst, err := range job.FailTargets {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, acp.ErrTargetNoSpace) {
|
||||
continue
|
||||
}
|
||||
|
||||
a.logger.WithContext(ctx).WithError(err).Errorf("file '%s' copy fail, dst= '%s'", src.RealPath(), dst)
|
||||
if errors.Is(err, acp.ErrTargetDropToReadonly) {
|
||||
dropToReadonly = true
|
||||
}
|
||||
}
|
||||
default:
|
||||
return
|
||||
@@ -254,6 +267,14 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
|
||||
defer func() {
|
||||
ctx := tools.WithoutTimeout(ctx)
|
||||
|
||||
// if tape drop to readonly, ltfs cannot write index to partition a.
|
||||
// rollback sources for next try.
|
||||
if dropToReadonly {
|
||||
a.logger.WithContext(ctx).Errorf("tape filesystem had droped to readonly, rollback, barcode= '%s'", barcode)
|
||||
a.rollbackSources(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
report := reportGetter()
|
||||
sort.Slice(report.Jobs, func(i, j int) bool {
|
||||
return entity.NewSourceFromACPJob(report.Jobs[i]).Compare(entity.NewSourceFromACPJob(report.Jobs[j])) < 0
|
||||
@@ -373,6 +394,23 @@ func (a *jobArchiveExecutor) markSourcesAsSubmited(ctx context.Context, jobs []*
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) rollbackSources(ctx context.Context) error {
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
for _, source := range a.state.Sources {
|
||||
if source.Status == entity.CopyStatus_SUBMITED {
|
||||
continue
|
||||
}
|
||||
source.Status = entity.CopyStatus_PENDING
|
||||
}
|
||||
|
||||
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
|
||||
return fmt.Errorf("mark sources as submited, save job, %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) getTodoSources() int {
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user