feat: add rollback and virtual list

This commit is contained in:
Samuel N Cui
2023-10-12 22:54:07 +08:00
parent 7a2fb83d36
commit 59f626cef5
44 changed files with 3170 additions and 2560 deletions

33
executor/device.go Normal file
View File

@@ -0,0 +1,33 @@
package executor
import "sort"
func (e *Executor) ListAvailableDevices() []string {
e.devicesLock.Lock()
defer e.devicesLock.Unlock()
devices := e.availableDevices.ToSlice()
sort.Slice(devices, func(i, j int) bool {
return devices[i] < devices[j]
})
return devices
}
func (e *Executor) OccupyDevice(dev string) bool {
e.devicesLock.Lock()
defer e.devicesLock.Unlock()
if !e.availableDevices.Contains(dev) {
return false
}
e.availableDevices.Remove(dev)
return true
}
func (e *Executor) ReleaseDevice(dev string) {
e.devicesLock.Lock()
defer e.devicesLock.Unlock()
e.availableDevices.Add(dev)
}

View File

@@ -3,12 +3,13 @@ package executor
import (
"context"
"fmt"
"sort"
"sync"
mapset "github.com/deckarep/golang-set/v2"
"github.com/modern-go/reflect2"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/library"
"github.com/samuelncui/yatm/tools"
"gorm.io/gorm"
)
@@ -16,13 +17,14 @@ type Executor struct {
db *gorm.DB
lib *library.Library
devices []string
devicesLock sync.Mutex
devices []string
availableDevices mapset.Set[string]
paths Paths
scripts Scripts
jobExecutors *tools.CacheOnce[int64, JobExecutor]
}
type Paths struct {
@@ -43,7 +45,7 @@ func New(
db *gorm.DB, lib *library.Library,
devices []string, paths Paths, scripts Scripts,
) *Executor {
return &Executor{
e := &Executor{
db: db,
lib: lib,
devices: devices,
@@ -51,89 +53,76 @@ func New(
paths: paths,
scripts: scripts,
}
e.jobExecutors = tools.NewCacheOnce(e.newJobExecutor)
return e
}
func (e *Executor) AutoMigrate() error {
return e.db.AutoMigrate(ModelJob)
}
func (e *Executor) ListAvailableDevices() []string {
e.devicesLock.Lock()
defer e.devicesLock.Unlock()
func (e *Executor) CreateJob(ctx context.Context, job *Job, param *entity.JobParam) (*Job, error) {
job, err := e.SaveJob(ctx, job)
if err != nil {
return nil, fmt.Errorf("save job fail, err= %w", err)
}
devices := e.availableDevices.ToSlice()
sort.Slice(devices, func(i, j int) bool {
return devices[i] < devices[j]
})
typ, found := jobTypes[jobParamToTypes[reflect2.RTypeOf(param.GetParam())]]
if !found || typ == nil {
return nil, fmt.Errorf("job type unexpected, state_type= %T", param.GetParam())
}
return devices
executor, err := typ.GetExecutor(ctx, e, job)
if err != nil {
return nil, fmt.Errorf("get job executor fail, job_id= %d, %w", job.ID, err)
}
if err := executor.Initialize(ctx, param); err != nil {
executor.Logger().WithContext(ctx).WithError(err).Errorf("initialize failed, param= %s", param)
return nil, fmt.Errorf("executor initialize fail, job_id= %d param= %s, %w", job.ID, param, err)
}
if err := executor.Close(ctx); err != nil {
executor.Logger().WithContext(ctx).WithError(err).Errorf("close executor failed, param= %s", param)
return nil, fmt.Errorf("close executor failed, job_id= %d param= %s, %w", job.ID, param, err)
}
return job, nil
}
func (e *Executor) occupyDevice(dev string) bool {
e.devicesLock.Lock()
defer e.devicesLock.Unlock()
if !e.availableDevices.Contains(dev) {
return false
}
e.availableDevices.Remove(dev)
return true
func (e *Executor) GetJobExecutor(ctx context.Context, id int64) (JobExecutor, error) {
return e.jobExecutors.Get(ctx, id)
}
func (e *Executor) releaseDevice(dev string) {
e.devicesLock.Lock()
defer e.devicesLock.Unlock()
e.availableDevices.Add(dev)
func (e *Executor) RemoveJobExecutor(ctx context.Context, id int64) {
e.jobExecutors.Remove(id)
}
func (e *Executor) Start(ctx context.Context, job *Job) error {
job.Status = entity.JobStatus_PROCESSING
if _, err := e.SaveJob(ctx, job); err != nil {
return err
func (e *Executor) newJobExecutor(ctx context.Context, id int64) (JobExecutor, error) {
job, err := e.GetJob(ctx, id)
if err != nil {
return nil, fmt.Errorf("get job fail, id= %d, %w", id, err)
}
if state := job.State.GetArchive(); state != nil {
if err := e.startArchive(ctx, job); err != nil {
return err
}
return nil
}
if state := job.State.GetRestore(); state != nil {
if err := e.startRestore(ctx, job); err != nil {
return err
}
return nil
factory, has := jobTypes[reflect2.RTypeOf(job.State.GetState())]
if !has {
return nil, fmt.Errorf("job type unexpected, state_type= %T", job.State.GetState())
}
return fmt.Errorf("unexpected state type, %T", job.State.State)
return factory.GetExecutor(ctx, e, job)
}
func (e *Executor) Submit(ctx context.Context, job *Job, param *entity.JobNextParam) error {
if job.Status != entity.JobStatus_PROCESSING {
return fmt.Errorf("target job is not on processing, status= %s", job.Status)
func (e *Executor) Dispatch(ctx context.Context, jobID int64, param *entity.JobDispatchParam) error {
executor, err := e.GetJobExecutor(ctx, jobID)
if err != nil {
return fmt.Errorf("get job executor fail, job_id= %d, %w", jobID, err)
}
if state := job.State.GetArchive(); state != nil {
exe, err := e.newArchiveExecutor(ctx, job)
if err != nil {
return err
}
exe.submit(ctx, param.GetArchive())
return nil
}
if state := job.State.GetRestore(); state != nil {
exe, err := e.newRestoreExecutor(ctx, job)
if err != nil {
return err
}
exe.submit(ctx, param.GetRestore())
return nil
if err := executor.Dispatch(ctx, param); err != nil {
executor.Logger().WithContext(ctx).WithError(err).Errorf("dispatch request fail, req= %s", param)
return fmt.Errorf("dispatch request fail, job_id= %d, req= %s, %w", jobID, param, err)
}
return fmt.Errorf("unexpected state type, %T", job.State.State)
return nil
}
func (e *Executor) Display(ctx context.Context, job *Job) (*entity.JobDisplay, error) {
@@ -141,22 +130,16 @@ func (e *Executor) Display(ctx context.Context, job *Job) (*entity.JobDisplay, e
return nil, fmt.Errorf("target job is not on processing, status= %s", job.Status)
}
if state := job.State.GetArchive(); state != nil {
display, err := e.getArchiveDisplay(ctx, job)
if err != nil {
return nil, err
}
return &entity.JobDisplay{Display: &entity.JobDisplay_Archive{Archive: display}}, nil
}
if state := job.State.GetRestore(); state != nil {
display, err := e.getRestoreDisplay(ctx, job)
if err != nil {
return nil, err
}
return &entity.JobDisplay{Display: &entity.JobDisplay_Restore{Restore: display}}, nil
executor, err := e.GetJobExecutor(ctx, job.ID)
if err != nil {
return nil, fmt.Errorf("get job executor fail, job_id= %d, %w", job.ID, err)
}
return nil, fmt.Errorf("unexpected state type, %T", job.State.State)
result, err := executor.Display(ctx)
if err != nil {
executor.Logger().WithContext(ctx).WithError(err).Errorf("get display failed")
return nil, err
}
return result, nil
}

View File

@@ -33,28 +33,6 @@ func (j *Job) BeforeUpdate(tx *gorm.DB) error {
return nil
}
func (e *Executor) initJob(ctx context.Context, job *Job, param *entity.JobParam) error {
if p := param.GetArchive(); p != nil {
return e.createArchive(ctx, job, p)
}
if p := param.GetRestore(); p != nil {
return e.createRestore(ctx, job, p)
}
return fmt.Errorf("unexpected param type, %T", param.Param)
}
func (e *Executor) CreateJob(ctx context.Context, job *Job, param *entity.JobParam) (*Job, error) {
if err := e.initJob(ctx, job, param); err != nil {
return nil, err
}
if r := e.db.WithContext(ctx).Create(job); r.Error != nil {
return nil, fmt.Errorf("save job fail, err= %w", r.Error)
}
return job, nil
}
func (e *Executor) DeleteJobs(ctx context.Context, ids ...int64) error {
jobs, err := e.MGetJob(ctx, ids...)
if err != nil {

3
executor/job_archive.go Normal file
View File

@@ -0,0 +1,3 @@
package executor
type jobTypeArchive struct{}

View File

@@ -1,25 +0,0 @@
package executor
import (
"context"
"sync/atomic"
"github.com/samuelncui/yatm/entity"
)
func (e *Executor) getArchiveDisplay(ctx context.Context, job *Job) (*entity.JobArchiveDisplay, error) {
display := new(entity.JobArchiveDisplay)
if exe := e.getArchiveExecutor(ctx, job); exe != nil && exe.progress != nil {
display.CopiedBytes = atomic.LoadInt64(&exe.progress.bytes)
display.CopiedFiles = atomic.LoadInt64(&exe.progress.files)
display.TotalBytes = atomic.LoadInt64(&exe.progress.totalBytes)
display.TotalFiles = atomic.LoadInt64(&exe.progress.totalFiles)
display.StartTime = exe.progress.startTime.Unix()
speed := atomic.LoadInt64(&exe.progress.speed)
display.Speed = &speed
}
return display, nil
}

View File

@@ -2,44 +2,30 @@ package executor
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path"
"sort"
"sync"
"sync/atomic"
"time"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/lo"
"github.com/samuelncui/acp"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/library"
"github.com/samuelncui/yatm/tools"
"github.com/sirupsen/logrus"
)
var (
runningArchives sync.Map
)
type jobArchiveExecutor struct {
lock sync.Mutex
exe *Executor
job *Job
func (e *Executor) getArchiveExecutor(ctx context.Context, job *Job) *jobArchiveExecutor {
if running, has := runningArchives.Load(job.ID); has {
return running.(*jobArchiveExecutor)
}
return nil
progress *progress
logFile *os.File
logger *logrus.Logger
}
func (e *Executor) newArchiveExecutor(ctx context.Context, job *Job) (*jobArchiveExecutor, error) {
if exe := e.getArchiveExecutor(ctx, job); exe != nil {
return exe, nil
}
logFile, err := e.newLogWriter(job.ID)
func (*jobTypeArchive) GetExecutor(ctx context.Context, exe *Executor, job *Job) (JobExecutor, error) {
logFile, err := exe.newLogWriter(job.ID)
if err != nil {
return nil, fmt.Errorf("get log writer fail, %w", err)
}
@@ -47,39 +33,27 @@ func (e *Executor) newArchiveExecutor(ctx context.Context, job *Job) (*jobArchiv
logger := logrus.New()
logger.SetOutput(io.MultiWriter(os.Stderr, logFile))
exe := &jobArchiveExecutor{
exe: e,
e := &jobArchiveExecutor{
exe: exe,
job: job,
state: job.State.GetArchive(),
logFile: logFile,
logger: logger,
}
runningArchives.Store(job.ID, exe)
return exe, nil
return e, nil
}
type jobArchiveExecutor struct {
exe *Executor
job *Job
stateLock sync.Mutex
state *entity.JobArchiveState
progress *progress
logFile *os.File
logger *logrus.Logger
}
func (a *jobArchiveExecutor) submit(ctx context.Context, param *entity.JobArchiveNextParam) {
if err := a.handle(ctx, param); err != nil {
a.logger.WithContext(ctx).WithError(err).Infof("handler param fail, param= %s", param)
func (a *jobArchiveExecutor) Dispatch(ctx context.Context, next *entity.JobDispatchParam) error {
param := next.GetArchive()
if param == nil {
return fmt.Errorf("unexpected next param type, unexpected= JobArchiveDispatchParam, has= %s", next)
}
return a.dispatch(ctx, param)
}
func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiveNextParam) error {
func (a *jobArchiveExecutor) dispatch(ctx context.Context, param *entity.JobArchiveDispatchParam) error {
if p := param.GetCopying(); p != nil {
if err := a.switchStep(
ctx, entity.JobArchiveStep_COPYING, entity.JobStatus_PROCESSING,
@@ -91,6 +65,7 @@ func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiv
tools.Working()
go tools.WrapWithLogger(ctx, a.logger, func() {
defer tools.Done()
if err := a.makeTape(tools.ShutdownContext, p.Device, p.Barcode, p.Name); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("make tape has error, barcode= '%s' name= '%s'", p.Barcode, p.Name)
}
@@ -101,7 +76,7 @@ func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiv
if p := param.GetWaitForTape(); p != nil {
return a.switchStep(
ctx, entity.JobArchiveStep_WAIT_FOR_TAPE, entity.JobStatus_PROCESSING,
ctx, entity.JobArchiveStep_WAIT_FOR_TAPE, entity.JobStatus_PENDING,
mapset.NewThreadUnsafeSet(entity.JobArchiveStep_PENDING, entity.JobArchiveStep_COPYING),
)
}
@@ -114,322 +89,80 @@ func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiv
return err
}
a.logFile.Close()
runningArchives.Delete(a.job.ID)
return nil
return a.Close(ctx)
}
return nil
}
func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name string) (rerr error) {
if !a.exe.occupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer a.exe.releaseDevice(device)
defer a.makeTapeFinished(tools.WithoutTimeout(ctx))
encryption, keyPath, keyRecycle, err := a.exe.newKey()
if err != nil {
return err
}
defer keyRecycle()
if err := runCmd(a.logger, a.exe.makeEncryptCmd(ctx, device, keyPath, barcode, name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
func (a *jobArchiveExecutor) Display(ctx context.Context) (*entity.JobDisplay, error) {
p := a.progress
if p == nil {
return nil, nil
}
mkfsCmd := exec.CommandContext(ctx, a.exe.scripts.Mkfs)
mkfsCmd.Env = append(mkfsCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("TAPE_BARCODE=%s", barcode), fmt.Sprintf("TAPE_NAME=%s", name))
if err := runCmd(a.logger, mkfsCmd); err != nil {
return fmt.Errorf("run mkfs script fail, %w", err)
display := new(entity.JobArchiveDisplay)
display.CopiedBytes = atomic.LoadInt64(&p.bytes)
display.CopiedFiles = atomic.LoadInt64(&p.files)
display.TotalBytes = atomic.LoadInt64(&p.totalBytes)
display.TotalFiles = atomic.LoadInt64(&p.totalFiles)
display.StartTime = p.startTime.Unix()
speed := atomic.LoadInt64(&p.speed)
display.Speed = &speed
return &entity.JobDisplay{Display: &entity.JobDisplay_Archive{Archive: display}}, nil
}
func (a *jobArchiveExecutor) Close(ctx context.Context) error {
a.logFile.Close()
a.exe.RemoveJobExecutor(ctx, a.job.ID)
return nil
}
func (a *jobArchiveExecutor) Logger() *logrus.Logger {
return a.logger
}
func (a *jobArchiveExecutor) getState() *entity.JobArchiveState {
a.lock.Lock()
defer a.lock.Unlock()
if a.job.State == nil || a.job.State.GetArchive() == nil {
a.job.State = &entity.JobState{State: &entity.JobState_Archive{Archive: &entity.JobArchiveState{}}}
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
return a.job.State.GetArchive()
}
func (a *jobArchiveExecutor) updateJob(ctx context.Context, change func(*Job, *entity.JobArchiveState) error) error {
a.lock.Lock()
defer a.lock.Unlock()
if a.job.State == nil || a.job.State.GetArchive() == nil {
a.job.State = &entity.JobState{State: &entity.JobState_Archive{Archive: &entity.JobArchiveState{}}}
}
mountCmd := exec.CommandContext(ctx, a.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
if err := change(a.job, a.job.State.GetArchive()); err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("update state failed while exec change callback")
return fmt.Errorf("update state failed while exec change callback, %w", err)
}
defer func() {
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.scripts.Umount)
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, umountCmd); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
return
}
if err := os.Remove(mountPoint); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
return
}
}()
wildcardJobOpts := make([]acp.WildcardJobOption, 0, 6)
wildcardJobOpts = append(wildcardJobOpts, acp.Target(mountPoint))
for _, source := range a.state.Sources {
if source.Status == entity.CopyStatus_SUBMITED {
continue
}
wildcardJobOpts = append(wildcardJobOpts, acp.AccurateSource(source.Source.Base, source.Source.Path))
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("update state failed while save job")
return fmt.Errorf("update state failed while save job, %w", err)
}
opts := make([]acp.Option, 0, 4)
opts = append(opts, acp.WildcardJob(wildcardJobOpts...))
opts = append(opts, acp.WithHash(true))
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
opts = append(opts, acp.WithLogger(a.logger))
reportHander, reportGetter := acp.NewReportGetter()
opts = append(opts, acp.WithEventHandler(reportHander))
a.progress = newProgress()
defer func() { a.progress = nil }()
var dropToReadonly bool
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
switch e := ev.(type) {
case *acp.EventUpdateCount:
atomic.StoreInt64(&a.progress.totalBytes, e.Bytes)
atomic.StoreInt64(&a.progress.totalFiles, e.Files)
return
case *acp.EventUpdateProgress:
a.progress.setBytes(e.Bytes)
atomic.StoreInt64(&a.progress.files, e.Files)
return
case *acp.EventReportError:
a.logger.WithContext(ctx).Errorf("acp report error, src= '%s' dst= '%s' err= '%s'", e.Error.Src, e.Error.Dst, e.Error.Err)
return
case *acp.EventUpdateJob:
job := e.Job
src := entity.NewSourceFromACPJob(job)
var targetStatus entity.CopyStatus
switch job.Status {
case acp.JobStatusPending, acp.JobStatusPreparing:
targetStatus = entity.CopyStatus_PENDING
case acp.JobStatusCopying:
targetStatus = entity.CopyStatus_RUNNING
case acp.JobStatusFinished:
targetStatus = entity.CopyStatus_FAILED
if len(job.SuccessTargets) > 0 {
a.logger.WithContext(ctx).Infof("file '%s' copy success, size= %d", src.RealPath(), job.Size)
targetStatus = entity.CopyStatus_STAGED
break // break from switch
}
for dst, err := range job.FailTargets {
if err == nil {
continue
}
if errors.Is(err, acp.ErrTargetNoSpace) {
continue
}
a.logger.WithContext(ctx).WithError(err).Errorf("file '%s' copy fail, dst= '%s'", src.RealPath(), dst)
if errors.Is(err, acp.ErrTargetDropToReadonly) {
dropToReadonly = true
}
}
default:
return
}
a.stateLock.Lock()
defer a.stateLock.Unlock()
idx := sort.Search(len(a.state.Sources), func(idx int) bool {
return src.Compare(a.state.Sources[idx].Source) <= 0
})
if idx < 0 || idx >= len(a.state.Sources) || src.Compare(a.state.Sources[idx].Source) != 0 {
a.logger.Warnf(
"cannot found target file, real_path= %s found_index= %d tape_file_path= %v", src.RealPath(), idx,
lo.Map(a.state.Sources, func(source *entity.SourceState, _ int) string { return source.Source.RealPath() }))
return
}
target := a.state.Sources[idx]
if target == nil || !src.Equal(target.Source) {
return
}
target.Status = targetStatus
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
logrus.WithContext(ctx).Infof("save job for update file fail, name= %s", job.Base+path.Join(job.Path...))
}
return
}
}))
defer func() {
ctx := tools.WithoutTimeout(ctx)
// if tape drop to readonly, ltfs cannot write index to partition a.
// rollback sources for next try.
if dropToReadonly {
a.logger.WithContext(ctx).Errorf("tape filesystem had droped to readonly, rollback, barcode= '%s'", barcode)
a.rollbackSources(ctx)
return
}
report := reportGetter()
sort.Slice(report.Jobs, func(i, j int) bool {
return entity.NewSourceFromACPJob(report.Jobs[i]).Compare(entity.NewSourceFromACPJob(report.Jobs[j])) < 0
})
reportFile, err := a.exe.newReportWriter(barcode)
if err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("open report file fail, barcode= '%s'", barcode)
} else {
defer reportFile.Close()
tools.WrapWithLogger(ctx, a.logger, func() {
reportFile.Write([]byte(report.ToJSONString(false)))
})
}
filteredJobs := make([]*acp.Job, 0, len(report.Jobs))
files := make([]*library.TapeFile, 0, len(report.Jobs))
for _, job := range report.Jobs {
if len(job.SuccessTargets) == 0 {
continue
}
if !job.Mode.IsRegular() {
continue
}
hash, err := hex.DecodeString(job.SHA256)
if err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("decode sha256 fail, path= '%s'", entity.NewSourceFromACPJob(job).RealPath())
continue
}
files = append(files, &library.TapeFile{
Path: path.Join(job.Path...),
Size: job.Size,
Mode: job.Mode,
ModTime: job.ModTime,
WriteTime: job.WriteTime,
Hash: hash,
})
filteredJobs = append(filteredJobs, job)
}
tape, err := a.exe.lib.CreateTape(ctx, &library.Tape{
Barcode: barcode,
Name: name,
Encryption: encryption,
CreateTime: time.Now(),
}, files)
if err != nil {
rerr = tools.AppendError(rerr, fmt.Errorf("create tape fail, barcode= '%s' name= '%s', %w", barcode, name, err))
return
}
a.logger.Infof("create tape success, tape_id= %d", tape.ID)
if err := a.exe.lib.TrimFiles(ctx); err != nil {
a.logger.WithError(err).Warnf("trim library files fail")
}
if err := a.markSourcesAsSubmited(ctx, filteredJobs); err != nil {
rerr = tools.AppendError(rerr, fmt.Errorf("mark source as submited fail, %w", err))
return
}
}()
copyer, err := acp.New(ctx, opts...)
if err != nil {
rerr = fmt.Errorf("start copy fail, %w", err)
return
}
copyer.Wait()
return
return nil
}
func (a *jobArchiveExecutor) switchStep(ctx context.Context, target entity.JobArchiveStep, status entity.JobStatus, expect mapset.Set[entity.JobArchiveStep]) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
if !expect.Contains(a.state.Step) {
return fmt.Errorf("unexpected current step, target= '%s' expect= '%s' has= '%s'", target, expect, a.state.Step)
}
a.state.Step = target
a.job.Status = status
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
return fmt.Errorf("switch to step copying, save job fail, %w", err)
}
return nil
}
func (a *jobArchiveExecutor) markSourcesAsSubmited(ctx context.Context, jobs []*acp.Job) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
searchableSource := a.state.Sources[:]
for _, job := range jobs {
src := entity.NewSourceFromACPJob(job)
for idx, testSrc := range searchableSource {
if src.Compare(testSrc.Source) <= 0 {
searchableSource = searchableSource[idx:]
break
}
return a.updateJob(ctx, func(job *Job, state *entity.JobArchiveState) error {
if !expect.Contains(state.Step) {
return fmt.Errorf("unexpected current step, target= '%s' expect= '%s' has= '%s'", target, expect, state.Step)
}
target := searchableSource[0]
if target == nil || !src.Equal(target.Source) {
continue
}
target.Status = entity.CopyStatus_SUBMITED
}
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
return fmt.Errorf("mark sources as submited, save job, %w", err)
}
return nil
}
func (a *jobArchiveExecutor) rollbackSources(ctx context.Context) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
for _, source := range a.state.Sources {
if source.Status == entity.CopyStatus_SUBMITED {
continue
}
source.Status = entity.CopyStatus_PENDING
}
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
return fmt.Errorf("mark sources as submited, save job, %w", err)
}
return nil
}
func (a *jobArchiveExecutor) getTodoSources() int {
a.stateLock.Lock()
defer a.stateLock.Unlock()
var todo int
for _, s := range a.state.Sources {
if s.Status == entity.CopyStatus_SUBMITED {
continue
}
todo++
}
return todo
}
func (a *jobArchiveExecutor) makeTapeFinished(ctx context.Context) {
if a.getTodoSources() > 0 {
a.submit(ctx, &entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_WaitForTape{WaitForTape: &entity.JobArchiveWaitForTapeParam{}}})
} else {
a.submit(ctx, &entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_Finished{Finished: &entity.JobArchiveFinishedParam{}}})
}
state.Step = target
job.Status = status
return nil
})
}

View File

@@ -0,0 +1,95 @@
package executor
import (
"context"
"fmt"
"os"
"path"
"sort"
"strings"
"github.com/samuelncui/acp"
"github.com/samuelncui/yatm/entity"
)
func (a *jobArchiveExecutor) Initialize(ctx context.Context, param *entity.JobParam) error {
if err := a.applyParam(ctx, param.GetArchive()); err != nil {
return err
}
return a.dispatch(ctx, &entity.JobArchiveDispatchParam{Param: &entity.JobArchiveDispatchParam_WaitForTape{
WaitForTape: &entity.JobArchiveWaitForTapeParam{},
}})
}
func (a *jobArchiveExecutor) applyParam(ctx context.Context, param *entity.JobArchiveParam) error {
if param == nil {
return fmt.Errorf("archive param is nil")
}
return a.updateJob(ctx, func(_ *Job, state *entity.JobArchiveState) error {
var err error
sources := make([]*entity.SourceState, 0, len(param.Sources)*8)
for _, src := range param.Sources {
src.Base = strings.TrimSpace(src.Base)
if src.Base[0] != '/' {
src.Base = path.Join(a.exe.paths.Source, src.Base) + "/"
}
sources, err = a.walk(ctx, src, sources)
if err != nil {
return err
}
}
sort.Slice(sources, func(i, j int) bool {
return sources[i].Source.Compare(sources[j].Source) < 0
})
for idx, src := range sources {
if idx > 0 && sources[idx-1].Source.Equal(src.Source) {
return fmt.Errorf("have multi file with same path, path= %s", src.Source.RealPath())
}
}
state.Step = entity.JobArchiveStep_PENDING
state.Sources = sources
return nil
})
}
func (a *jobArchiveExecutor) walk(ctx context.Context, src *entity.Source, sources []*entity.SourceState) ([]*entity.SourceState, error) {
path := src.RealPath()
stat, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("walk get stat, path= '%s', %w", path, err)
}
mode := stat.Mode()
if mode.IsRegular() {
if stat.Name() == ".DS_Store" {
return sources, nil
}
return append(sources, &entity.SourceState{
Source: src,
Size: stat.Size(),
Status: entity.CopyStatus_PENDING,
}), nil
}
if mode&acp.UnexpectFileMode != 0 {
return sources, nil
}
files, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("walk read dir, path= '%s', %w", path, err)
}
for _, file := range files {
sources, err = a.walk(ctx, src.Append(file.Name()), sources)
if err != nil {
return nil, err
}
}
return sources, nil
}

View File

@@ -1,81 +0,0 @@
package executor
import (
"context"
"fmt"
"os"
"path"
"sort"
"strings"
"github.com/samuelncui/acp"
"github.com/samuelncui/yatm/entity"
)
func (e *Executor) createArchive(ctx context.Context, job *Job, param *entity.JobArchiveParam) error {
var err error
sources := make([]*entity.SourceState, 0, len(param.Sources)*8)
for _, src := range param.Sources {
src.Base = strings.TrimSpace(src.Base)
if src.Base[0] != '/' {
src.Base = path.Join(e.paths.Source, src.Base) + "/"
}
sources, err = walk(ctx, src, sources)
if err != nil {
return err
}
}
sort.Slice(sources, func(i, j int) bool {
return sources[i].Source.Compare(sources[j].Source) < 0
})
for idx, src := range sources {
if idx > 0 && sources[idx-1].Source.Equal(src.Source) {
return fmt.Errorf("have multi file with same path, path= %s", src.Source.RealPath())
}
}
job.State = &entity.JobState{State: &entity.JobState_Archive{Archive: &entity.JobArchiveState{
Step: entity.JobArchiveStep_PENDING,
Sources: sources,
}}}
return nil
}
func walk(ctx context.Context, src *entity.Source, sources []*entity.SourceState) ([]*entity.SourceState, error) {
path := src.RealPath()
stat, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("walk get stat, path= '%s', %w", path, err)
}
mode := stat.Mode()
if mode.IsRegular() {
if stat.Name() == ".DS_Store" {
return sources, nil
}
return append(sources, &entity.SourceState{
Source: src,
Size: stat.Size(),
Status: entity.CopyStatus_PENDING,
}), nil
}
if mode&acp.UnexpectFileMode != 0 {
return sources, nil
}
files, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("walk read dir, path= '%s', %w", path, err)
}
for _, file := range files {
sources, err = walk(ctx, src.Append(file.Name()), sources)
if err != nil {
return nil, err
}
}
return sources, nil
}

View File

@@ -1,15 +0,0 @@
package executor
import (
"context"
"github.com/samuelncui/yatm/entity"
)
func (e *Executor) startArchive(ctx context.Context, job *Job) error {
return e.Submit(ctx, job, &entity.JobNextParam{Param: &entity.JobNextParam_Archive{
Archive: &entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_WaitForTape{
WaitForTape: &entity.JobArchiveWaitForTapeParam{},
}},
}})
}

View File

@@ -0,0 +1,313 @@
package executor
import (
"context"
"encoding/hex"
"errors"
"fmt"
"os"
"os/exec"
"path"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/samber/lo"
"github.com/samuelncui/acp"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/library"
"github.com/samuelncui/yatm/tools"
)
func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name string) (rerr error) {
barcode = strings.ToUpper(barcode)
state := a.getState()
if state == nil {
return fmt.Errorf("cannot found archive state, abort")
}
if !a.exe.OccupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer a.exe.ReleaseDevice(device)
defer a.makeTapeFinished(tools.WithoutTimeout(ctx))
encryption, keyPath, keyRecycle, err := a.exe.newKey()
if err != nil {
return err
}
defer keyRecycle()
if err := runCmd(a.logger, a.exe.makeEncryptCmd(ctx, device, keyPath, barcode, name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
mkfsCmd := exec.CommandContext(ctx, a.exe.scripts.Mkfs)
mkfsCmd.Env = append(mkfsCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("TAPE_BARCODE=%s", barcode), fmt.Sprintf("TAPE_NAME=%s", name))
if err := runCmd(a.logger, mkfsCmd); err != nil {
return fmt.Errorf("run mkfs script fail, %w", err)
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
}
mountCmd := exec.CommandContext(ctx, a.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
defer func() {
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.scripts.Umount)
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, umountCmd); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
return
}
if err := os.Remove(mountPoint); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
return
}
}()
wildcardJobOpts := make([]acp.WildcardJobOption, 0, 6)
wildcardJobOpts = append(wildcardJobOpts, acp.Target(mountPoint))
for _, source := range state.Sources {
if source.Status == entity.CopyStatus_SUBMITED {
continue
}
wildcardJobOpts = append(wildcardJobOpts, acp.AccurateSource(source.Source.Base, source.Source.Path))
}
opts := make([]acp.Option, 0, 4)
opts = append(opts, acp.WildcardJob(wildcardJobOpts...))
opts = append(opts, acp.WithHash(true))
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
opts = append(opts, acp.WithLogger(a.logger))
reportHander, reportGetter := acp.NewReportGetter()
opts = append(opts, acp.WithEventHandler(reportHander))
a.progress = newProgress()
defer func() { a.progress = nil }()
var dropToReadonly bool
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
switch e := ev.(type) {
case *acp.EventUpdateCount:
atomic.StoreInt64(&a.progress.totalBytes, e.Bytes)
atomic.StoreInt64(&a.progress.totalFiles, e.Files)
return
case *acp.EventUpdateProgress:
a.progress.setBytes(e.Bytes)
atomic.StoreInt64(&a.progress.files, e.Files)
return
case *acp.EventReportError:
a.logger.WithContext(ctx).Errorf("acp report error, src= '%s' dst= '%s' err= '%s'", e.Error.Src, e.Error.Dst, e.Error.Err)
return
case *acp.EventUpdateJob:
job := e.Job
src := entity.NewSourceFromACPJob(job)
var targetStatus entity.CopyStatus
switch job.Status {
case acp.JobStatusPending, acp.JobStatusPreparing:
targetStatus = entity.CopyStatus_PENDING
case acp.JobStatusCopying:
targetStatus = entity.CopyStatus_RUNNING
case acp.JobStatusFinished:
targetStatus = entity.CopyStatus_FAILED
if len(job.SuccessTargets) > 0 {
a.logger.WithContext(ctx).Infof("file '%s' copy success, size= %d", src.RealPath(), job.Size)
targetStatus = entity.CopyStatus_STAGED
break // break from switch
}
for dst, err := range job.FailTargets {
if err == nil {
continue
}
if errors.Is(err, acp.ErrTargetNoSpace) {
continue
}
a.logger.WithContext(ctx).WithError(err).Errorf("file '%s' copy fail, dst= '%s'", src.RealPath(), dst)
if errors.Is(err, acp.ErrTargetDropToReadonly) {
dropToReadonly = true
}
}
default:
return
}
a.updateJob(ctx, func(_ *Job, state *entity.JobArchiveState) error {
idx := sort.Search(len(state.Sources), func(idx int) bool {
return src.Compare(state.Sources[idx].Source) <= 0
})
if idx < 0 || idx >= len(state.Sources) || src.Compare(state.Sources[idx].Source) != 0 {
return fmt.Errorf(
"cannot found target file, real_path= %s found_index= %d tape_file_path= %v", src.RealPath(), idx,
lo.Map(state.Sources, func(source *entity.SourceState, _ int) string { return source.Source.RealPath() }),
)
}
founded := state.Sources[idx]
if founded == nil || !src.Equal(founded.Source) {
return fmt.Errorf(
"founded file not match, real_path= %s found_path= %s tape_file_path= %v", src.RealPath(), founded.Source.RealPath(),
lo.Map(state.Sources, func(source *entity.SourceState, _ int) string { return source.Source.RealPath() }),
)
}
founded.Status = targetStatus
return nil
})
}
}))
defer func() {
ctx := tools.WithoutTimeout(ctx)
// if tape drop to readonly, ltfs cannot write index to partition a.
// rollback sources for next try.
if dropToReadonly {
a.logger.WithContext(ctx).Errorf("tape filesystem had droped to readonly, rollback, barcode= '%s'", barcode)
a.rollbackSources(ctx)
return
}
report := reportGetter()
sort.Slice(report.Jobs, func(i, j int) bool {
return entity.NewSourceFromACPJob(report.Jobs[i]).Compare(entity.NewSourceFromACPJob(report.Jobs[j])) < 0
})
reportFile, err := a.exe.newReportWriter(barcode)
if err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("open report file fail, barcode= '%s'", barcode)
} else {
defer reportFile.Close()
tools.WrapWithLogger(ctx, a.logger, func() {
reportFile.Write([]byte(report.ToJSONString(false)))
})
}
filteredJobs := make([]*acp.Job, 0, len(report.Jobs))
files := make([]*library.TapeFile, 0, len(report.Jobs))
for _, job := range report.Jobs {
if len(job.SuccessTargets) == 0 {
continue
}
if !job.Mode.IsRegular() {
continue
}
hash, err := hex.DecodeString(job.SHA256)
if err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("decode sha256 fail, path= '%s'", entity.NewSourceFromACPJob(job).RealPath())
continue
}
files = append(files, &library.TapeFile{
Path: path.Join(job.Path...),
Size: job.Size,
Mode: job.Mode,
ModTime: job.ModTime,
WriteTime: job.WriteTime,
Hash: hash,
})
filteredJobs = append(filteredJobs, job)
}
tape, err := a.exe.lib.CreateTape(ctx, &library.Tape{
Barcode: barcode,
Name: name,
Encryption: encryption,
CreateTime: time.Now(),
}, files)
if err != nil {
rerr = tools.AppendError(rerr, fmt.Errorf("create tape fail, barcode= '%s' name= '%s', %w", barcode, name, err))
return
}
a.logger.Infof("create tape success, tape_id= %d", tape.ID)
if err := a.exe.lib.TrimFiles(ctx); err != nil {
a.logger.WithError(err).Warnf("trim library files fail")
}
if err := a.markSourcesAsSubmited(ctx, filteredJobs); err != nil {
rerr = tools.AppendError(rerr, fmt.Errorf("mark source as submited fail, %w", err))
return
}
}()
copyer, err := acp.New(ctx, opts...)
if err != nil {
rerr = fmt.Errorf("start copy fail, %w", err)
return
}
copyer.Wait()
return
}
func (a *jobArchiveExecutor) markSourcesAsSubmited(ctx context.Context, jobs []*acp.Job) error {
return a.updateJob(ctx, func(_ *Job, state *entity.JobArchiveState) error {
searchableSource := state.Sources[:]
for _, job := range jobs {
src := entity.NewSourceFromACPJob(job)
for idx, testSrc := range searchableSource {
if src.Compare(testSrc.Source) <= 0 {
searchableSource = searchableSource[idx:]
break
}
}
target := searchableSource[0]
if target == nil || !src.Equal(target.Source) {
continue
}
target.Status = entity.CopyStatus_SUBMITED
}
return nil
})
}
func (a *jobArchiveExecutor) rollbackSources(ctx context.Context) error {
return a.updateJob(ctx, func(_ *Job, state *entity.JobArchiveState) error {
for _, source := range state.Sources {
if source.Status == entity.CopyStatus_SUBMITED {
continue
}
source.Status = entity.CopyStatus_PENDING
}
return nil
})
}
func (a *jobArchiveExecutor) getTodoSources() int {
state := a.getState()
var todo int
for _, s := range state.Sources {
if s.Status == entity.CopyStatus_SUBMITED {
continue
}
todo++
}
return todo
}
func (a *jobArchiveExecutor) makeTapeFinished(ctx context.Context) {
if a.getTodoSources() > 0 {
a.dispatch(ctx, &entity.JobArchiveDispatchParam{Param: &entity.JobArchiveDispatchParam_WaitForTape{WaitForTape: &entity.JobArchiveWaitForTapeParam{}}})
} else {
a.dispatch(ctx, &entity.JobArchiveDispatchParam{Param: &entity.JobArchiveDispatchParam_Finished{Finished: &entity.JobArchiveFinishedParam{}}})
}
}

View File

@@ -1,59 +1,3 @@
package executor
import (
"context"
"fmt"
"os"
"os/exec"
"time"
"github.com/samuelncui/yatm/library"
"github.com/sirupsen/logrus"
)
func (e *jobRestoreExecutor) loadTape(ctx context.Context, device string, tape *library.Tape) error {
if !e.exe.occupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer e.exe.releaseDevice(device)
keyPath, keyRecycle, err := e.exe.restoreKey(tape.Encryption)
if err != nil {
return err
}
defer func() {
time.Sleep(time.Second)
keyRecycle()
}()
logger := logrus.StandardLogger()
if err := runCmd(logger, e.exe.makeEncryptCmd(ctx, device, keyPath, tape.Barcode, tape.Name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
}
mountCmd := exec.CommandContext(ctx, e.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
// defer func() {
// umountCmd := exec.CommandContext(ctx, e.umountScript)
// umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
// if err := runCmd(logger, umountCmd); err != nil {
// logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
// return
// }
// if err := os.Remove(mountPoint); err != nil {
// logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
// return
// }
// }()
return nil
}
type jobTypeRestore struct{}

View File

@@ -1,25 +0,0 @@
package executor
import (
"context"
"sync/atomic"
"github.com/samuelncui/yatm/entity"
)
func (e *Executor) getRestoreDisplay(ctx context.Context, job *Job) (*entity.JobRestoreDisplay, error) {
display := new(entity.JobRestoreDisplay)
if exe := e.getRestoreExecutor(ctx, job); exe != nil && exe.progress != nil {
display.CopiedBytes = atomic.LoadInt64(&exe.progress.bytes)
display.CopiedFiles = atomic.LoadInt64(&exe.progress.files)
display.TotalBytes = atomic.LoadInt64(&exe.progress.totalBytes)
display.TotalFiles = atomic.LoadInt64(&exe.progress.totalFiles)
display.StartTime = exe.progress.startTime.Unix()
speed := atomic.LoadInt64(&exe.progress.speed)
display.Speed = &speed
}
return display, nil
}

View File

@@ -2,44 +2,30 @@ package executor
import (
"context"
"encoding/hex"
"fmt"
"io"
"os"
"os/exec"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
mapset "github.com/deckarep/golang-set/v2"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/samuelncui/acp"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/tools"
"github.com/sirupsen/logrus"
)
var (
runningRestores sync.Map
)
type jobRestoreExecutor struct {
lock sync.Mutex
exe *Executor
job *Job
func (e *Executor) getRestoreExecutor(ctx context.Context, job *Job) *jobRestoreExecutor {
if running, has := runningRestores.Load(job.ID); has {
return running.(*jobRestoreExecutor)
}
return nil
progress *progress
logFile *os.File
logger *logrus.Logger
}
func (e *Executor) newRestoreExecutor(ctx context.Context, job *Job) (*jobRestoreExecutor, error) {
if exe := e.getRestoreExecutor(ctx, job); exe != nil {
return exe, nil
}
logFile, err := e.newLogWriter(job.ID)
func (*jobTypeRestore) GetExecutor(ctx context.Context, exe *Executor, job *Job) (JobExecutor, error) {
logFile, err := exe.newLogWriter(job.ID)
if err != nil {
return nil, fmt.Errorf("get log writer fail, %w", err)
}
@@ -47,39 +33,27 @@ func (e *Executor) newRestoreExecutor(ctx context.Context, job *Job) (*jobRestor
logger := logrus.New()
logger.SetOutput(io.MultiWriter(os.Stderr, logFile))
exe := &jobRestoreExecutor{
exe: e,
e := &jobRestoreExecutor{
exe: exe,
job: job,
state: job.State.GetRestore(),
logFile: logFile,
logger: logger,
}
runningRestores.Store(job.ID, exe)
return exe, nil
return e, nil
}
type jobRestoreExecutor struct {
exe *Executor
job *Job
stateLock sync.Mutex
state *entity.JobRestoreState
progress *progress
logFile *os.File
logger *logrus.Logger
}
func (a *jobRestoreExecutor) submit(ctx context.Context, param *entity.JobRestoreNextParam) {
if err := a.handle(ctx, param); err != nil {
a.logger.WithContext(ctx).WithError(err).Infof("handler param fail, param= %s", param)
func (a *jobRestoreExecutor) Dispatch(ctx context.Context, next *entity.JobDispatchParam) error {
param := next.GetRestore()
if param == nil {
return fmt.Errorf("unexpected next param type, unexpected= JobRestoreDispatchParam, has= %s", next)
}
return a.dispatch(ctx, param)
}
func (a *jobRestoreExecutor) handle(ctx context.Context, param *entity.JobRestoreNextParam) error {
func (a *jobRestoreExecutor) dispatch(ctx context.Context, param *entity.JobRestoreDispatchParam) error {
if p := param.GetCopying(); p != nil {
if err := a.switchStep(
ctx, entity.JobRestoreStep_COPYING, entity.JobStatus_PROCESSING,
@@ -102,7 +76,7 @@ func (a *jobRestoreExecutor) handle(ctx context.Context, param *entity.JobRestor
if p := param.GetWaitForTape(); p != nil {
return a.switchStep(
ctx, entity.JobRestoreStep_WAIT_FOR_TAPE, entity.JobStatus_PROCESSING,
ctx, entity.JobRestoreStep_WAIT_FOR_TAPE, entity.JobStatus_PENDING,
mapset.NewThreadUnsafeSet(entity.JobRestoreStep_PENDING, entity.JobRestoreStep_COPYING),
)
}
@@ -115,258 +89,80 @@ func (a *jobRestoreExecutor) handle(ctx context.Context, param *entity.JobRestor
return err
}
a.logFile.Close()
runningRestores.Delete(a.job.ID)
return nil
return a.Close(ctx)
}
return nil
}
func (a *jobRestoreExecutor) restoreTape(ctx context.Context, device string) (rerr error) {
if !a.exe.occupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer a.exe.releaseDevice(device)
defer func() {
if _, found := lo.Find(a.state.Tapes, func(item *entity.RestoreTape) bool {
return item.Status != entity.CopyStatus_SUBMITED
}); found {
a.submit(tools.WithoutTimeout(ctx), &entity.JobRestoreNextParam{
Param: &entity.JobRestoreNextParam_WaitForTape{WaitForTape: &entity.JobRestoreWaitForTapeParam{}},
})
return
}
a.submit(tools.WithoutTimeout(ctx), &entity.JobRestoreNextParam{
Param: &entity.JobRestoreNextParam_Finished{Finished: &entity.JobRestoreFinishedParam{}},
})
}()
readInfoCmd := exec.CommandContext(ctx, a.exe.scripts.ReadInfo)
readInfoCmd.Env = append(readInfoCmd.Env, fmt.Sprintf("DEVICE=%s", device))
infoBuf, err := runCmdWithReturn(a.logger, readInfoCmd)
if err != nil {
return fmt.Errorf("run read info script fail, %w", err)
func (a *jobRestoreExecutor) Display(ctx context.Context) (*entity.JobDisplay, error) {
p := a.progress
if p == nil {
return nil, nil
}
barcode := jsoniter.Get(infoBuf, "barcode").ToString()
if len(barcode) > 6 {
barcode = barcode[:6]
display := new(entity.JobRestoreDisplay)
display.CopiedBytes = atomic.LoadInt64(&p.bytes)
display.CopiedFiles = atomic.LoadInt64(&p.files)
display.TotalBytes = atomic.LoadInt64(&p.totalBytes)
display.TotalFiles = atomic.LoadInt64(&p.totalFiles)
display.StartTime = p.startTime.Unix()
speed := atomic.LoadInt64(&p.speed)
display.Speed = &speed
return &entity.JobDisplay{Display: &entity.JobDisplay_Restore{Restore: display}}, nil
}
func (a *jobRestoreExecutor) Close(ctx context.Context) error {
a.logFile.Close()
a.exe.RemoveJobExecutor(ctx, a.job.ID)
return nil
}
func (a *jobRestoreExecutor) Logger() *logrus.Logger {
return a.logger
}
func (a *jobRestoreExecutor) getState() *entity.JobRestoreState {
a.lock.Lock()
defer a.lock.Unlock()
if a.job.State == nil || a.job.State.GetRestore() == nil {
a.job.State = &entity.JobState{State: &entity.JobState_Restore{Restore: &entity.JobRestoreState{}}}
}
restoreTape, found := lo.Find(a.state.Tapes, func(t *entity.RestoreTape) bool {
return t.Barcode == barcode
})
if !found || restoreTape == nil {
expects := lo.Map(a.state.Tapes, func(t *entity.RestoreTape, _ int) string { return t.Barcode })
return fmt.Errorf("unexpected tape barcode in library, has= '%s' expect= %v", barcode, expects)
}
if restoreTape.Status == entity.CopyStatus_SUBMITED {
return fmt.Errorf("unexpected restore tape state status, tape is restored, status= '%s'", restoreTape.Status)
return a.job.State.GetRestore()
}
func (a *jobRestoreExecutor) updateJob(ctx context.Context, change func(*Job, *entity.JobRestoreState) error) error {
a.lock.Lock()
defer a.lock.Unlock()
if a.job.State == nil || a.job.State.GetRestore() == nil {
a.job.State = &entity.JobState{State: &entity.JobState_Restore{Restore: &entity.JobRestoreState{}}}
}
tape, err := a.exe.lib.GetTape(ctx, restoreTape.TapeId)
if err != nil {
return fmt.Errorf("get tape info fail, barcode= '%s' id= %d, %w", restoreTape.Barcode, restoreTape.TapeId, err)
if err := change(a.job, a.job.State.GetRestore()); err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("update state failed while exec change callback")
return fmt.Errorf("update state failed while exec change callback, %w", err)
}
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
a.logger.WithContext(ctx).WithError(err).Warnf("update state failed while save job")
return fmt.Errorf("update state failed while save job, %w", err)
}
keyPath, keyRecycle, err := a.exe.restoreKey(tape.Encryption)
if err != nil {
return err
}
defer func() {
time.Sleep(time.Second)
keyRecycle()
}()
if err := runCmd(a.logger, a.exe.makeEncryptCmd(ctx, device, keyPath, barcode, tape.Name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
}
sourcePath := tools.Cache(func(p string) string { return path.Join(mountPoint, p) })
mountCmd := exec.CommandContext(ctx, a.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
defer func() {
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.scripts.Umount)
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, umountCmd); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
return
}
if err := os.Remove(mountPoint); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
return
}
}()
opts := make([]acp.Option, 0, 16)
for _, f := range restoreTape.Files {
if f.Status == entity.CopyStatus_SUBMITED {
continue
}
opts = append(opts, acp.AccurateJob(sourcePath(f.TapePath), []string{path.Join(a.exe.paths.Target, f.TargetPath)}))
}
opts = append(opts, acp.WithHash(true))
opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true)))
opts = append(opts, acp.WithLogger(a.logger))
a.progress = newProgress()
defer func() { a.progress = nil }()
convertPath := tools.Cache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
switch e := ev.(type) {
case *acp.EventUpdateCount:
atomic.StoreInt64(&a.progress.totalBytes, e.Bytes)
atomic.StoreInt64(&a.progress.totalFiles, e.Files)
return
case *acp.EventUpdateProgress:
a.progress.setBytes(e.Bytes)
atomic.StoreInt64(&a.progress.files, e.Files)
return
case *acp.EventReportError:
a.logger.WithContext(ctx).Errorf("acp report error, src= '%s' dst= '%s' err= '%s'", e.Error.Src, e.Error.Dst, e.Error.Err)
return
case *acp.EventUpdateJob:
job := e.Job
src := entity.NewSourceFromACPJob(job)
var targetStatus entity.CopyStatus
switch job.Status {
case "pending":
targetStatus = entity.CopyStatus_PENDING
case "preparing":
targetStatus = entity.CopyStatus_RUNNING
case "finished":
a.logger.WithContext(ctx).Infof("file '%s' copy finished, size= %d", src.RealPath(), job.Size)
targetStatus = entity.CopyStatus_STAGED
if len(job.FailTargets) > 0 {
targetStatus = entity.CopyStatus_FAILED
}
for dst, err := range job.FailTargets {
if err == nil {
continue
}
a.logger.WithContext(ctx).WithError(err).Errorf("file '%s' copy fail, dst= '%s'", src.RealPath(), dst)
}
default:
return
}
a.stateLock.Lock()
defer a.stateLock.Unlock()
realPath := src.RealPath()
idx := sort.Search(len(restoreTape.Files), func(idx int) bool {
return convertPath(realPath) <= convertPath(sourcePath(restoreTape.Files[idx].TapePath))
})
if idx < 0 || idx >= len(restoreTape.Files) {
a.logger.Warnf(
"cannot found target file, real_path= %s found_index= %d tape_file_path= %v", realPath, idx,
lo.Map(restoreTape.Files, func(file *entity.RestoreFile, _ int) string { return sourcePath(file.TapePath) }),
)
return
}
targetFile := restoreTape.Files[idx]
if targetFile == nil || realPath != sourcePath(targetFile.TapePath) {
a.logger.Warnf(
"cannot match target file, real_path= %s found_index= %d found_file_path= %s",
realPath, idx, sourcePath(targetFile.TapePath),
)
return
}
if targetStatus == entity.CopyStatus_STAGED {
if targetHash := hex.EncodeToString(targetFile.Hash); targetHash != job.SHA256 {
targetStatus = entity.CopyStatus_FAILED
a.logger.Warnf(
"copy checksum do not match target file hash, real_path= %s target_hash= %s copy_hash= %s",
realPath, targetHash, job.SHA256,
)
}
if targetSize := targetFile.Size; targetSize != job.Size {
targetStatus = entity.CopyStatus_FAILED
a.logger.Warnf(
"copy size do not match target file hash, real_path= %s target_size= %d copy_size= %d",
realPath, targetSize, job.Size,
)
}
}
targetFile.Status = targetStatus
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
a.logger.WithContext(ctx).Infof("save job for update file fail, name= %s", job.Base+path.Join(job.Path...))
}
return
}
}))
restoreTape.Status = entity.CopyStatus_RUNNING
if _, err := a.exe.SaveJob(tools.WithoutTimeout(ctx), a.job); err != nil {
a.logger.WithContext(ctx).Infof("save job for submit tape fail, barcode= %s", restoreTape.Barcode)
}
defer func() {
a.stateLock.Lock()
defer a.stateLock.Unlock()
restoreTape.Status = entity.CopyStatus_SUBMITED
for _, file := range restoreTape.Files {
if file.Status == entity.CopyStatus_STAGED {
file.Status = entity.CopyStatus_SUBMITED
}
if file.Status != entity.CopyStatus_SUBMITED {
restoreTape.Status = entity.CopyStatus_FAILED
}
}
if _, err := a.exe.SaveJob(tools.WithoutTimeout(ctx), a.job); err != nil {
a.logger.WithContext(ctx).Infof("save job for submit tape fail, barcode= %s", restoreTape.Barcode)
}
}()
copyer, err := acp.New(ctx, opts...)
if err != nil {
rerr = fmt.Errorf("start copy fail, %w", err)
return
}
copyer.Wait()
return
return nil
}
func (a *jobRestoreExecutor) switchStep(ctx context.Context, target entity.JobRestoreStep, status entity.JobStatus, expect mapset.Set[entity.JobRestoreStep]) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.updateJob(ctx, func(job *Job, state *entity.JobRestoreState) error {
if !expect.Contains(state.Step) {
return fmt.Errorf("unexpected current step, target= '%s' expect= '%s' has= '%s'", target, expect, state.Step)
}
if !expect.Contains(a.state.Step) {
return fmt.Errorf("unexpected current step, target= '%s' expect= '%s' has= '%s'", target, expect, a.state.Step)
}
a.state.Step = target
a.job.Status = status
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
return fmt.Errorf("switch to step copying, save job fail, %w", err)
}
return nil
state.Step = target
job.Status = status
return nil
})
}

View File

@@ -0,0 +1,249 @@
package executor
import (
"context"
"fmt"
"io/fs"
"sort"
"strings"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/lo"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/library"
"github.com/samuelncui/yatm/tools"
"github.com/sirupsen/logrus"
)
func (a *jobRestoreExecutor) Initialize(ctx context.Context, param *entity.JobParam) error {
if err := a.applyParam(ctx, param.GetRestore()); err != nil {
return err
}
return a.dispatch(ctx, &entity.JobRestoreDispatchParam{Param: &entity.JobRestoreDispatchParam_WaitForTape{
WaitForTape: &entity.JobRestoreWaitForTapeParam{},
}})
}
type restoreFile struct {
*library.File
target string
}
func (a *jobRestoreExecutor) applyParam(ctx context.Context, param *entity.JobRestoreParam) error {
if param == nil {
return fmt.Errorf("restore param is nil")
}
return a.updateJob(ctx, func(_ *Job, state *entity.JobRestoreState) error {
exe := a.exe
files, err := exe.getRestoreFiles(ctx, param.FileIds...)
if err != nil {
return fmt.Errorf("get restore files fail, ids= %v, %w", param.FileIds, err)
}
fileIDs := make([]int64, 0, len(files))
for _, file := range files {
fileIDs = append(fileIDs, file.ID)
}
positions, err := exe.lib.MGetPositionByFileID(ctx, fileIDs...)
if err != nil {
return err
}
tapeMapping := make(map[int64]mapset.Set[int64], 4)
for _, file := range files {
for _, posi := range positions[file.ID] {
set := tapeMapping[posi.TapeID]
if set == nil {
tapeMapping[posi.TapeID] = mapset.NewThreadUnsafeSet(file.ID)
continue
}
set.Add(file.ID)
}
}
tapes, err := exe.lib.MGetTape(ctx, lo.Keys(tapeMapping)...)
if err != nil {
return err
}
for tapeID := range tapeMapping {
if tape, has := tapes[tapeID]; has && tape != nil {
continue
}
logrus.WithContext(ctx).Infof("tape not found, tape_id= %d", tapeID)
delete(tapeMapping, tapeID)
}
restoreTapes := make([]*entity.RestoreTape, 0, len(tapeMapping))
for len(tapeMapping) > 0 {
var maxTapeID int64
for tapeID, files := range tapeMapping {
if maxTapeID == 0 {
maxTapeID = tapeID
continue
}
diff := files.Cardinality() - tapeMapping[maxTapeID].Cardinality()
if diff > 0 {
maxTapeID = tapeID
continue
}
if diff < 0 {
continue
}
if tapeID < maxTapeID {
maxTapeID = tapeID
continue
}
}
if maxTapeID == 0 {
return fmt.Errorf("max tape not found, tape_ids= %v", lo.Keys(tapeMapping))
}
fileIDs := tapeMapping[maxTapeID]
delete(tapeMapping, maxTapeID)
if fileIDs.Cardinality() == 0 {
continue
}
for i, f := range tapeMapping {
tapeMapping[i] = f.Difference(fileIDs)
}
targets := make([]*entity.RestoreFile, 0, fileIDs.Cardinality())
for _, fileID := range fileIDs.ToSlice() {
file := files[fileID]
if file == nil {
continue
}
posi := positions[fileID]
if len(posi) == 0 {
logrus.WithContext(ctx).Infof("file position not found, file_id= %d", fileID)
continue
}
for _, p := range posi {
if p.TapeID != maxTapeID {
continue
}
targets = append(targets, &entity.RestoreFile{
FileId: file.ID,
TapeId: p.TapeID,
PositionId: p.ID,
Status: entity.CopyStatus_PENDING,
Size: file.Size,
Hash: file.Hash,
TapePath: p.Path,
TargetPath: file.target,
})
break
}
}
convertPath := tools.ThreadUnsafeCache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
sort.Slice(targets, func(i, j int) bool {
return convertPath(targets[i].TapePath) < convertPath(targets[j].TapePath)
})
restoreTapes = append(restoreTapes, &entity.RestoreTape{
TapeId: maxTapeID,
Barcode: tapes[maxTapeID].Barcode,
Status: entity.CopyStatus_PENDING,
Files: targets,
})
}
state.Step = entity.JobRestoreStep_PENDING
state.Tapes = restoreTapes
return nil
})
}
func (e *Executor) getRestoreFiles(ctx context.Context, rootIDs ...int64) (map[int64]*restoreFile, error) {
rootIDSet := mapset.NewThreadUnsafeSet(rootIDs...)
for _, id := range rootIDs {
parents, err := e.lib.ListParents(ctx, id)
if err != nil {
return nil, err
}
if len(parents) <= 1 {
continue
}
for _, parent := range parents[:len(parents)-1] {
if !rootIDSet.Contains(parent.ID) {
continue
}
rootIDSet.Remove(id)
break
}
}
rootIDs = rootIDSet.ToSlice()
mapping, err := e.lib.MGetFile(ctx, rootIDs...)
if err != nil {
return nil, fmt.Errorf("mget file fail, ids= %v, %w", rootIDs, err)
}
files := make([]*restoreFile, 0, len(rootIDs)*8)
visited := mapset.NewThreadUnsafeSet[int64]()
for _, root := range mapping {
if visited.Contains(root.ID) {
continue
}
visited.Add(root.ID)
if !fs.FileMode(root.Mode).IsDir() {
files = append(files, &restoreFile{File: root, target: root.Name})
continue
}
found, err := e.visitFiles(ctx, root.Name, nil, visited, root.ID)
if err != nil {
return nil, err
}
files = append(files, found...)
}
results := make(map[int64]*restoreFile, len(files))
for _, f := range files {
results[f.ID] = f
}
return results, nil
}
func (e *Executor) visitFiles(ctx context.Context, path string, files []*restoreFile, visited mapset.Set[int64], parentID int64) ([]*restoreFile, error) {
children, err := e.lib.List(ctx, parentID)
if err != nil {
return nil, err
}
for _, child := range children {
if visited.Contains(child.ID) {
continue
}
visited.Add(child.ID)
target := path + "/" + child.Name
if !fs.FileMode(child.Mode).IsDir() {
files = append(files, &restoreFile{File: child, target: target})
continue
}
files, err = e.visitFiles(ctx, target, files, visited, child.ID)
if err != nil {
return nil, err
}
}
return files, nil
}

View File

@@ -0,0 +1,59 @@
package executor
import (
"context"
"fmt"
"os"
"os/exec"
"time"
"github.com/samuelncui/yatm/library"
"github.com/sirupsen/logrus"
)
func (e *jobRestoreExecutor) loadTape(ctx context.Context, device string, tape *library.Tape) error {
if !e.exe.OccupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer e.exe.ReleaseDevice(device)
keyPath, keyRecycle, err := e.exe.restoreKey(tape.Encryption)
if err != nil {
return err
}
defer func() {
time.Sleep(time.Second)
keyRecycle()
}()
logger := logrus.StandardLogger()
if err := runCmd(logger, e.exe.makeEncryptCmd(ctx, device, keyPath, tape.Barcode, tape.Name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
}
mountCmd := exec.CommandContext(ctx, e.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
// defer func() {
// umountCmd := exec.CommandContext(ctx, e.umountScript)
// umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
// if err := runCmd(logger, umountCmd); err != nil {
// logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
// return
// }
// if err := os.Remove(mountPoint); err != nil {
// logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
// return
// }
// }()
return nil
}

View File

@@ -1,233 +0,0 @@
package executor
import (
"context"
"fmt"
"io/fs"
"sort"
"strings"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/lo"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/library"
"github.com/samuelncui/yatm/tools"
"github.com/sirupsen/logrus"
)
type restoreFile struct {
*library.File
target string
}
func (e *Executor) createRestore(ctx context.Context, job *Job, param *entity.JobRestoreParam) error {
files, err := e.getRestoreFiles(ctx, param.FileIds...)
if err != nil {
return fmt.Errorf("get restore files fail, ids= %v, %w", param.FileIds, err)
}
fileIDs := make([]int64, 0, len(files))
for _, file := range files {
fileIDs = append(fileIDs, file.ID)
}
positions, err := e.lib.MGetPositionByFileID(ctx, fileIDs...)
if err != nil {
return err
}
tapeMapping := make(map[int64]mapset.Set[int64], 4)
for _, file := range files {
for _, posi := range positions[file.ID] {
set := tapeMapping[posi.TapeID]
if set == nil {
tapeMapping[posi.TapeID] = mapset.NewThreadUnsafeSet(file.ID)
continue
}
set.Add(file.ID)
}
}
tapes, err := e.lib.MGetTape(ctx, lo.Keys(tapeMapping)...)
if err != nil {
return err
}
for tapeID := range tapeMapping {
if tape, has := tapes[tapeID]; has && tape != nil {
continue
}
logrus.WithContext(ctx).Infof("tape not found, tape_id= %d", tapeID)
delete(tapeMapping, tapeID)
}
restoreTapes := make([]*entity.RestoreTape, 0, len(tapeMapping))
for len(tapeMapping) > 0 {
var maxTapeID int64
for tapeID, files := range tapeMapping {
if maxTapeID == 0 {
maxTapeID = tapeID
continue
}
diff := files.Cardinality() - tapeMapping[maxTapeID].Cardinality()
if diff > 0 {
maxTapeID = tapeID
continue
}
if diff < 0 {
continue
}
if tapeID < maxTapeID {
maxTapeID = tapeID
continue
}
}
if maxTapeID == 0 {
return fmt.Errorf("max tape not found, tape_ids= %v", lo.Keys(tapeMapping))
}
fileIDs := tapeMapping[maxTapeID]
delete(tapeMapping, maxTapeID)
if fileIDs.Cardinality() == 0 {
continue
}
for i, f := range tapeMapping {
tapeMapping[i] = f.Difference(fileIDs)
}
targets := make([]*entity.RestoreFile, 0, fileIDs.Cardinality())
for _, fileID := range fileIDs.ToSlice() {
file := files[fileID]
if file == nil {
continue
}
posi := positions[fileID]
if len(posi) == 0 {
logrus.WithContext(ctx).Infof("file position not found, file_id= %d", fileID)
continue
}
for _, p := range posi {
if p.TapeID != maxTapeID {
continue
}
targets = append(targets, &entity.RestoreFile{
FileId: file.ID,
TapeId: p.TapeID,
PositionId: p.ID,
Status: entity.CopyStatus_PENDING,
Size: file.Size,
Hash: file.Hash,
TapePath: p.Path,
TargetPath: file.target,
})
break
}
}
convertPath := tools.Cache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
sort.Slice(targets, func(i, j int) bool {
return convertPath(targets[i].TapePath) < convertPath(targets[j].TapePath)
})
restoreTapes = append(restoreTapes, &entity.RestoreTape{
TapeId: maxTapeID,
Barcode: tapes[maxTapeID].Barcode,
Status: entity.CopyStatus_PENDING,
Files: targets,
})
}
job.State = &entity.JobState{State: &entity.JobState_Restore{Restore: &entity.JobRestoreState{
Step: entity.JobRestoreStep_PENDING,
Tapes: restoreTapes,
}}}
return nil
}
func (e *Executor) getRestoreFiles(ctx context.Context, rootIDs ...int64) (map[int64]*restoreFile, error) {
rootIDSet := mapset.NewThreadUnsafeSet(rootIDs...)
for _, id := range rootIDs {
parents, err := e.lib.ListParents(ctx, id)
if err != nil {
return nil, err
}
if len(parents) <= 1 {
continue
}
for _, parent := range parents[:len(parents)-1] {
if !rootIDSet.Contains(parent.ID) {
continue
}
rootIDSet.Remove(id)
break
}
}
rootIDs = rootIDSet.ToSlice()
mapping, err := e.lib.MGetFile(ctx, rootIDs...)
if err != nil {
return nil, fmt.Errorf("mget file fail, ids= %v, %w", rootIDs, err)
}
files := make([]*restoreFile, 0, len(rootIDs)*8)
visited := mapset.NewThreadUnsafeSet[int64]()
for _, root := range mapping {
if visited.Contains(root.ID) {
continue
}
visited.Add(root.ID)
if !fs.FileMode(root.Mode).IsDir() {
files = append(files, &restoreFile{File: root, target: root.Name})
continue
}
found, err := e.visitFiles(ctx, root.Name, nil, visited, root.ID)
if err != nil {
return nil, err
}
files = append(files, found...)
}
results := make(map[int64]*restoreFile, len(files))
for _, f := range files {
results[f.ID] = f
}
return results, nil
}
func (e *Executor) visitFiles(ctx context.Context, path string, files []*restoreFile, visited mapset.Set[int64], parentID int64) ([]*restoreFile, error) {
children, err := e.lib.List(ctx, parentID)
if err != nil {
return nil, err
}
for _, child := range children {
if visited.Contains(child.ID) {
continue
}
visited.Add(child.ID)
target := path + "/" + child.Name
if !fs.FileMode(child.Mode).IsDir() {
files = append(files, &restoreFile{File: child, target: target})
continue
}
files, err = e.visitFiles(ctx, target, files, visited, child.ID)
if err != nil {
return nil, err
}
}
return files, nil
}

View File

@@ -1,15 +0,0 @@
package executor
import (
"context"
"github.com/samuelncui/yatm/entity"
)
func (e *Executor) startRestore(ctx context.Context, job *Job) error {
return e.Submit(ctx, job, &entity.JobNextParam{Param: &entity.JobNextParam_Restore{
Restore: &entity.JobRestoreNextParam{Param: &entity.JobRestoreNextParam_WaitForTape{
WaitForTape: &entity.JobRestoreWaitForTapeParam{},
}},
}})
}

View File

@@ -0,0 +1,260 @@
package executor
import (
"context"
"encoding/hex"
"fmt"
"os"
"os/exec"
"path"
"sort"
"strings"
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/samuelncui/acp"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/tools"
)
func (a *jobRestoreExecutor) restoreTape(ctx context.Context, device string) (rerr error) {
if !a.exe.OccupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer a.exe.ReleaseDevice(device)
defer func() {
tapes := a.getState().Tapes
if _, found := lo.Find(tapes, func(item *entity.RestoreTape) bool {
return item.Status != entity.CopyStatus_SUBMITED
}); found {
a.dispatch(tools.WithoutTimeout(ctx), &entity.JobRestoreDispatchParam{
Param: &entity.JobRestoreDispatchParam_WaitForTape{WaitForTape: &entity.JobRestoreWaitForTapeParam{}},
})
return
}
a.dispatch(tools.WithoutTimeout(ctx), &entity.JobRestoreDispatchParam{
Param: &entity.JobRestoreDispatchParam_Finished{Finished: &entity.JobRestoreFinishedParam{}},
})
}()
readInfoCmd := exec.CommandContext(ctx, a.exe.scripts.ReadInfo)
readInfoCmd.Env = append(readInfoCmd.Env, fmt.Sprintf("DEVICE=%s", device))
infoBuf, err := runCmdWithReturn(a.logger, readInfoCmd)
if err != nil {
return fmt.Errorf("run read info script fail, %w", err)
}
barcode := jsoniter.Get(infoBuf, "barcode").ToString()
if len(barcode) > 6 {
barcode = barcode[:6]
}
barcode = strings.ToUpper(barcode)
tapes := a.getState().Tapes
tape, found := lo.Find(tapes, func(t *entity.RestoreTape) bool {
return t.Barcode == barcode
})
if !found || tape == nil {
expects := lo.Map(tapes, func(t *entity.RestoreTape, _ int) string { return t.Barcode })
return fmt.Errorf("unexpected tape barcode in library, has= '%s' expect= %v", barcode, expects)
}
if tape.Status == entity.CopyStatus_SUBMITED {
return fmt.Errorf("unexpected restore tape state status, tape is restored, status= '%s'", tape.Status)
}
libTape, err := a.exe.lib.GetTape(ctx, tape.TapeId)
if err != nil {
return fmt.Errorf("get tape info fail, barcode= '%s' id= %d, %w", tape.Barcode, tape.TapeId, err)
}
keyPath, keyRecycle, err := a.exe.restoreKey(libTape.Encryption)
if err != nil {
return err
}
defer func() {
time.Sleep(time.Second)
keyRecycle()
}()
if err := runCmd(a.logger, a.exe.makeEncryptCmd(ctx, device, keyPath, barcode, libTape.Name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
}
sourcePath := tools.ThreadUnsafeCache(func(p string) string { return path.Join(mountPoint, p) })
mountCmd := exec.CommandContext(ctx, a.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
defer func() {
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.scripts.Umount)
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, umountCmd); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
return
}
if err := os.Remove(mountPoint); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
return
}
}()
opts := make([]acp.Option, 0, 16)
for _, f := range tape.Files {
if f.Status == entity.CopyStatus_SUBMITED {
continue
}
opts = append(opts, acp.AccurateJob(sourcePath(f.TapePath), []string{path.Join(a.exe.paths.Target, f.TargetPath)}))
}
opts = append(opts, acp.WithHash(true))
opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true)))
opts = append(opts, acp.WithLogger(a.logger))
a.progress = newProgress()
defer func() { a.progress = nil }()
convertPath := tools.ThreadUnsafeCache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
switch e := ev.(type) {
case *acp.EventUpdateCount:
atomic.StoreInt64(&a.progress.totalBytes, e.Bytes)
atomic.StoreInt64(&a.progress.totalFiles, e.Files)
return
case *acp.EventUpdateProgress:
a.progress.setBytes(e.Bytes)
atomic.StoreInt64(&a.progress.files, e.Files)
return
case *acp.EventReportError:
a.logger.WithContext(ctx).Errorf("acp report error, src= '%s' dst= '%s' err= '%s'", e.Error.Src, e.Error.Dst, e.Error.Err)
return
case *acp.EventUpdateJob:
job := e.Job
src := entity.NewSourceFromACPJob(job)
var targetStatus entity.CopyStatus
switch job.Status {
case "pending":
targetStatus = entity.CopyStatus_PENDING
case "preparing":
targetStatus = entity.CopyStatus_RUNNING
case "finished":
a.logger.WithContext(ctx).Infof("file '%s' copy finished, size= %d", src.RealPath(), job.Size)
targetStatus = entity.CopyStatus_STAGED
if len(job.FailTargets) > 0 {
targetStatus = entity.CopyStatus_FAILED
}
for dst, err := range job.FailTargets {
if err == nil {
continue
}
a.logger.WithContext(ctx).WithError(err).Errorf("file '%s' copy fail, dst= '%s'", src.RealPath(), dst)
}
default:
return
}
realPath := src.RealPath()
a.updateJob(ctx, func(_ *Job, state *entity.JobRestoreState) error {
tape, has := lo.Find(state.Tapes, func(tape *entity.RestoreTape) bool { return tape.Barcode == barcode })
if !has || tape == nil {
return fmt.Errorf("cannot found tape, barcode= %s", barcode)
}
idx := sort.Search(len(tape.Files), func(idx int) bool {
return convertPath(realPath) <= convertPath(sourcePath(tape.Files[idx].TapePath))
})
if idx < 0 || idx >= len(tape.Files) {
return fmt.Errorf(
"cannot found target file, real_path= %s found_index= %d tape_file_path= %v", realPath, idx,
lo.Map(tape.Files, func(file *entity.RestoreFile, _ int) string { return sourcePath(file.TapePath) }),
)
}
found := tape.Files[idx]
if found == nil || realPath != sourcePath(found.TapePath) {
return fmt.Errorf(
"cannot match found file, real_path= %s found_index= %d found_file_path= %s",
realPath, idx, sourcePath(found.TapePath),
)
}
if targetStatus == entity.CopyStatus_STAGED {
if targetHash := hex.EncodeToString(found.Hash); targetHash != job.SHA256 {
targetStatus = entity.CopyStatus_FAILED
a.logger.Warnf(
"copy checksum do not match target file hash, real_path= %s target_hash= %s copy_hash= %s",
realPath, targetHash, job.SHA256,
)
}
if targetSize := found.Size; targetSize != job.Size {
targetStatus = entity.CopyStatus_FAILED
a.logger.Warnf(
"copy size do not match target file hash, real_path= %s target_size= %d copy_size= %d",
realPath, targetSize, job.Size,
)
}
}
found.Status = targetStatus
return nil
})
}
}))
a.updateJob(ctx, func(_ *Job, state *entity.JobRestoreState) error {
tape, has := lo.Find(state.Tapes, func(tape *entity.RestoreTape) bool { return tape.Barcode == barcode })
if !has || tape == nil {
return fmt.Errorf("cannot found tape, barcode= %s", barcode)
}
tape.Status = entity.CopyStatus_RUNNING
return nil
})
defer func() {
a.updateJob(ctx, func(job *Job, state *entity.JobRestoreState) error {
tape, has := lo.Find(state.Tapes, func(tape *entity.RestoreTape) bool { return tape.Barcode == barcode })
if !has || tape == nil {
return fmt.Errorf("cannot found tape, barcode= %s", barcode)
}
tape.Status = entity.CopyStatus_SUBMITED
for _, file := range tape.Files {
if file.Status == entity.CopyStatus_STAGED {
file.Status = entity.CopyStatus_SUBMITED
}
if file.Status != entity.CopyStatus_SUBMITED {
tape.Status = entity.CopyStatus_FAILED
}
}
return nil
})
}()
copyer, err := acp.New(ctx, opts...)
if err != nil {
rerr = fmt.Errorf("start copy fail, %w", err)
return
}
copyer.Wait()
return
}

33
executor/job_type.go Normal file
View File

@@ -0,0 +1,33 @@
package executor
import (
"context"
"github.com/modern-go/reflect2"
"github.com/samuelncui/yatm/entity"
"github.com/sirupsen/logrus"
)
var (
jobParamToTypes = map[uintptr]uintptr{
reflect2.RTypeOf(&entity.JobParam_Archive{}): reflect2.RTypeOf(&entity.JobState_Archive{}),
reflect2.RTypeOf(&entity.JobParam_Restore{}): reflect2.RTypeOf(&entity.JobState_Restore{}),
}
jobTypes = map[uintptr]JobType{
reflect2.RTypeOf(&entity.JobState_Archive{}): new(jobTypeArchive),
reflect2.RTypeOf(&entity.JobState_Restore{}): new(jobTypeRestore),
}
)
type JobType interface {
GetExecutor(ctx context.Context, exe *Executor, job *Job) (JobExecutor, error)
}
type JobExecutor interface {
Initialize(ctx context.Context, param *entity.JobParam) error
Dispatch(ctx context.Context, param *entity.JobDispatchParam) error
Display(ctx context.Context) (*entity.JobDisplay, error)
Close(ctx context.Context) error
Logger() *logrus.Logger
}