feat: job list

This commit is contained in:
Samuel N Cui
2023-08-29 18:02:57 +08:00
parent cda9244e8e
commit 852cf8212e
80 changed files with 6173 additions and 1854 deletions

View File

@@ -21,27 +21,35 @@ type Executor struct {
devicesLock sync.Mutex
availableDevices mapset.Set[string]
workDirectory string
encryptScript string
mkfsScript string
mountScript string
umountScript string
paths Paths
scripts Scripts
}
type Paths struct {
Work string `yaml:"work"`
Source string `yaml:"source"`
Target string `yaml:"target"`
}
type Scripts struct {
Encrypt string `yaml:"encrypt"`
Mkfs string `yaml:"mkfs"`
Mount string `yaml:"mount"`
Umount string `yaml:"umount"`
ReadInfo string `yaml:"read_info"`
}
func New(
db *gorm.DB, lib *library.Library,
devices []string, workDirectory string,
encryptScript, mkfsScript, mountScript, umountScript string,
devices []string, paths Paths, scripts Scripts,
) *Executor {
return &Executor{
db: db,
lib: lib,
devices: devices,
availableDevices: mapset.NewThreadUnsafeSet(devices...),
encryptScript: encryptScript,
mkfsScript: mkfsScript,
mountScript: mountScript,
umountScript: umountScript,
paths: paths,
scripts: scripts,
}
}
@@ -80,7 +88,7 @@ func (e *Executor) releaseDevice(dev string) {
}
func (e *Executor) Start(ctx context.Context, job *Job) error {
job.Status = entity.JobStatus_Processing
job.Status = entity.JobStatus_PROCESSING
if _, err := e.SaveJob(ctx, job); err != nil {
return err
}
@@ -91,12 +99,18 @@ func (e *Executor) Start(ctx context.Context, job *Job) error {
}
return nil
}
if state := job.State.GetRestore(); state != nil {
if err := e.startRestore(ctx, job); err != nil {
return err
}
return nil
}
return fmt.Errorf("unexpected state type, %T", job.State.State)
}
func (e *Executor) Submit(ctx context.Context, job *Job, param *entity.JobNextParam) error {
if job.Status != entity.JobStatus_Processing {
if job.Status != entity.JobStatus_PROCESSING {
return fmt.Errorf("target job is not on processing, status= %s", job.Status)
}
@@ -109,12 +123,21 @@ func (e *Executor) Submit(ctx context.Context, job *Job, param *entity.JobNextPa
exe.submit(ctx, param.GetArchive())
return nil
}
if state := job.State.GetRestore(); state != nil {
exe, err := e.newRestoreExecutor(ctx, job)
if err != nil {
return err
}
exe.submit(ctx, param.GetRestore())
return nil
}
return fmt.Errorf("unexpected state type, %T", job.State.State)
}
func (e *Executor) Display(ctx context.Context, job *Job) (*entity.JobDisplay, error) {
if job.Status != entity.JobStatus_Processing {
if job.Status != entity.JobStatus_PROCESSING {
return nil, fmt.Errorf("target job is not on processing, status= %s", job.Status)
}
@@ -126,6 +149,14 @@ func (e *Executor) Display(ctx context.Context, job *Job) (*entity.JobDisplay, e
return &entity.JobDisplay{Display: &entity.JobDisplay_Archive{Archive: display}}, nil
}
if state := job.State.GetRestore(); state != nil {
display, err := e.getRestoreDisplay(ctx, job)
if err != nil {
return nil, err
}
return &entity.JobDisplay{Display: &entity.JobDisplay_Restore{Restore: display}}, nil
}
return nil, fmt.Errorf("unexpected state type, %T", job.State.State)
}

View File

@@ -35,7 +35,10 @@ func (j *Job) BeforeUpdate(tx *gorm.DB) error {
func (e *Executor) initJob(ctx context.Context, job *Job, param *entity.JobParam) error {
if p := param.GetArchive(); p != nil {
return e.initArchive(ctx, job, p)
return e.createArchive(ctx, job, p)
}
if p := param.GetRestore(); p != nil {
return e.createRestore(ctx, job, p)
}
return fmt.Errorf("unexpected param type, %T", param.Param)
}
@@ -52,6 +55,13 @@ func (e *Executor) CreateJob(ctx context.Context, job *Job, param *entity.JobPar
return job, nil
}
func (e *Executor) DeleteJobs(ctx context.Context, ids ...int64) error {
if r := e.db.WithContext(ctx).Delete(ModelJob, ids); r.Error != nil {
return fmt.Errorf("delete job fail, err= %w", r.Error)
}
return nil
}
func (e *Executor) SaveJob(ctx context.Context, job *Job) (*Job, error) {
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
return nil, fmt.Errorf("save job fail, err= %w", r.Error)

View File

@@ -7,8 +7,8 @@ import (
"github.com/abc950309/tapewriter/entity"
)
func (e *Executor) getArchiveDisplay(ctx context.Context, job *Job) (*entity.JobDisplayArchive, error) {
display := new(entity.JobDisplayArchive)
func (e *Executor) getArchiveDisplay(ctx context.Context, job *Job) (*entity.JobArchiveDisplay, error) {
display := new(entity.JobArchiveDisplay)
if exe := e.getArchiveExecutor(ctx, job); exe != nil && exe.progress != nil {
display.CopyedBytes = atomic.LoadInt64(&exe.progress.bytes)

View File

@@ -64,7 +64,7 @@ type jobArchiveExecutor struct {
job *Job
stateLock sync.Mutex
state *entity.JobStateArchive
state *entity.JobArchiveState
progress *progress
logFile *os.File
@@ -79,7 +79,10 @@ func (a *jobArchiveExecutor) submit(ctx context.Context, param *entity.JobArchiv
func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiveNextParam) error {
if p := param.GetCopying(); p != nil {
if err := a.switchStep(ctx, entity.JobArchiveStep_Copying, entity.JobStatus_Processing, mapset.NewThreadUnsafeSet(entity.JobArchiveStep_WaitForTape)); err != nil {
if err := a.switchStep(
ctx, entity.JobArchiveStep_COPYING, entity.JobStatus_PROCESSING,
mapset.NewThreadUnsafeSet(entity.JobArchiveStep_WAIT_FOR_TAPE),
); err != nil {
return err
}
@@ -87,7 +90,7 @@ func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiv
go tools.WrapWithLogger(ctx, a.logger, func() {
defer tools.Done()
if err := a.makeTape(tools.ShutdownContext, p.Device, p.Barcode, p.Name); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("make type has error, barcode= '%s' name= '%s'", p.Barcode, p.Name)
a.logger.WithContext(ctx).WithError(err).Errorf("make tape has error, barcode= '%s' name= '%s'", p.Barcode, p.Name)
}
})
@@ -95,11 +98,17 @@ func (a *jobArchiveExecutor) handle(ctx context.Context, param *entity.JobArchiv
}
if p := param.GetWaitForTape(); p != nil {
return a.switchStep(ctx, entity.JobArchiveStep_WaitForTape, entity.JobStatus_Processing, mapset.NewThreadUnsafeSet(entity.JobArchiveStep_Pending, entity.JobArchiveStep_Copying))
return a.switchStep(
ctx, entity.JobArchiveStep_WAIT_FOR_TAPE, entity.JobStatus_PROCESSING,
mapset.NewThreadUnsafeSet(entity.JobArchiveStep_PENDING, entity.JobArchiveStep_COPYING),
)
}
if p := param.GetFinished(); p != nil {
if err := a.switchStep(ctx, entity.JobArchiveStep_Finished, entity.JobStatus_Completed, mapset.NewThreadUnsafeSet(entity.JobArchiveStep_Copying)); err != nil {
if err := a.switchStep(
ctx, entity.JobArchiveStep_FINISHED, entity.JobStatus_COMPLETED,
mapset.NewThreadUnsafeSet(entity.JobArchiveStep_COPYING),
); err != nil {
return err
}
@@ -128,7 +137,7 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
return fmt.Errorf("run encrypt script fail, %w", err)
}
mkfsCmd := exec.CommandContext(ctx, a.exe.mkfsScript)
mkfsCmd := exec.CommandContext(ctx, a.exe.scripts.Mkfs)
mkfsCmd.Env = append(mkfsCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("TAPE_BARCODE=%s", barcode), fmt.Sprintf("TAPE_NAME=%s", name))
if err := runCmd(a.logger, mkfsCmd); err != nil {
return fmt.Errorf("run mkfs script fail, %w", err)
@@ -139,13 +148,13 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
return fmt.Errorf("create temp mountpoint, %w", err)
}
mountCmd := exec.CommandContext(ctx, a.exe.mountScript)
mountCmd := exec.CommandContext(ctx, a.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
defer func() {
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.umountScript)
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.scripts.Umount)
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, umountCmd); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
@@ -157,15 +166,17 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
}
}()
opts := make([]acp.Option, 0, 4)
wildcardJobOpts := make([]acp.WildcardJobOption, 0, 6)
wildcardJobOpts = append(wildcardJobOpts, acp.Target(mountPoint))
for _, source := range a.state.Sources {
if source.Status == entity.CopyStatus_Submited {
if source.Status == entity.CopyStatus_SUBMITED {
continue
}
opts = append(opts, acp.AccurateSource(source.Source.Base, source.Source.Path))
wildcardJobOpts = append(wildcardJobOpts, acp.AccurateSource(source.Source.Base, source.Source.Path))
}
opts = append(opts, acp.Target(mountPoint))
opts := make([]acp.Option, 0, 4)
opts = append(opts, acp.WildcardJob(wildcardJobOpts...))
opts = append(opts, acp.WithHash(true))
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
opts = append(opts, acp.WithLogger(a.logger))
@@ -196,12 +207,12 @@ func (a *jobArchiveExecutor) makeTape(ctx context.Context, device, barcode, name
var targetStatus entity.CopyStatus
switch job.Status {
case "pending":
targetStatus = entity.CopyStatus_Pending
targetStatus = entity.CopyStatus_PENDING
case "preparing":
targetStatus = entity.CopyStatus_Running
targetStatus = entity.CopyStatus_RUNNING
case "finished":
a.logger.WithContext(ctx).Infof("file '%s' copy finished, size= %d", src.RealPath(), job.Size)
targetStatus = entity.CopyStatus_Staged
targetStatus = entity.CopyStatus_STAGED
for dst, err := range job.FailTargets {
if err == nil {
@@ -346,7 +357,7 @@ func (a *jobArchiveExecutor) markSourcesAsSubmited(ctx context.Context, jobs []*
continue
}
target.Status = entity.CopyStatus_Submited
target.Status = entity.CopyStatus_SUBMITED
}
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
@@ -361,7 +372,7 @@ func (a *jobArchiveExecutor) getTodoSources() int {
var todo int
for _, s := range a.state.Sources {
if s.Status == entity.CopyStatus_Submited {
if s.Status == entity.CopyStatus_SUBMITED {
continue
}
todo++

View File

@@ -4,16 +4,23 @@ import (
"context"
"fmt"
"os"
"path"
"sort"
"strings"
"github.com/abc950309/acp"
"github.com/abc950309/tapewriter/entity"
)
func (e *Executor) initArchive(ctx context.Context, job *Job, param *entity.JobParamArchive) error {
func (e *Executor) createArchive(ctx context.Context, job *Job, param *entity.JobArchiveParam) error {
var err error
sources := make([]*entity.SourceState, 0, len(param.Sources)*8)
for _, src := range param.Sources {
src.Base = strings.TrimSpace(src.Base)
if src.Base[0] != '/' {
src.Base = path.Join(e.paths.Source, src.Base) + "/"
}
sources, err = walk(ctx, src, sources)
if err != nil {
return err
@@ -29,8 +36,8 @@ func (e *Executor) initArchive(ctx context.Context, job *Job, param *entity.JobP
}
}
job.State = &entity.JobState{State: &entity.JobState_Archive{Archive: &entity.JobStateArchive{
Step: entity.JobArchiveStep_Pending,
job.State = &entity.JobState{State: &entity.JobState_Archive{Archive: &entity.JobArchiveState{
Step: entity.JobArchiveStep_PENDING,
Sources: sources,
}}}
return nil
@@ -52,7 +59,7 @@ func walk(ctx context.Context, src *entity.Source, sources []*entity.SourceState
return append(sources, &entity.SourceState{
Source: src,
Size: stat.Size(),
Status: entity.CopyStatus_Pending,
Status: entity.CopyStatus_PENDING,
}), nil
}
if mode&acp.UnexpectFileMode != 0 {

View File

@@ -11,13 +11,13 @@ import (
"github.com/sirupsen/logrus"
)
func (e *Executor) RestoreLoadTape(ctx context.Context, device string, tape *library.Tape) error {
if !e.occupyDevice(device) {
func (e *jobRestoreExecutor) loadTape(ctx context.Context, device string, tape *library.Tape) error {
if !e.exe.occupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer e.releaseDevice(device)
defer e.exe.releaseDevice(device)
keyPath, keyRecycle, err := e.restoreKey(tape.Encryption)
keyPath, keyRecycle, err := e.exe.restoreKey(tape.Encryption)
if err != nil {
return err
}
@@ -28,7 +28,7 @@ func (e *Executor) RestoreLoadTape(ctx context.Context, device string, tape *lib
logger := logrus.StandardLogger()
if err := runCmd(logger, e.makeEncryptCmd(ctx, device, keyPath, tape.Barcode, tape.Name)); err != nil {
if err := runCmd(logger, e.exe.makeEncryptCmd(ctx, device, keyPath, tape.Barcode, tape.Name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
@@ -37,7 +37,7 @@ func (e *Executor) RestoreLoadTape(ctx context.Context, device string, tape *lib
return fmt.Errorf("create temp mountpoint, %w", err)
}
mountCmd := exec.CommandContext(ctx, e.mountScript)
mountCmd := exec.CommandContext(ctx, e.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)

View File

@@ -0,0 +1,25 @@
package executor
import (
"context"
"sync/atomic"
"github.com/abc950309/tapewriter/entity"
)
func (e *Executor) getRestoreDisplay(ctx context.Context, job *Job) (*entity.JobRestoreDisplay, error) {
display := new(entity.JobRestoreDisplay)
if exe := e.getRestoreExecutor(ctx, job); exe != nil && exe.progress != nil {
display.CopyedBytes = atomic.LoadInt64(&exe.progress.bytes)
display.CopyedFiles = atomic.LoadInt64(&exe.progress.files)
display.TotalBytes = atomic.LoadInt64(&exe.progress.totalBytes)
display.TotalFiles = atomic.LoadInt64(&exe.progress.totalFiles)
display.StartTime = exe.progress.startTime.Unix()
speed := atomic.LoadInt64(&exe.progress.speed)
display.Speed = &speed
}
return display, nil
}

321
executor/job_restore_exe.go Normal file
View File

@@ -0,0 +1,321 @@
package executor
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/abc950309/acp"
"github.com/abc950309/tapewriter/entity"
"github.com/abc950309/tapewriter/tools"
mapset "github.com/deckarep/golang-set/v2"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
)
var (
runningRestores sync.Map
)
func (e *Executor) getRestoreExecutor(ctx context.Context, job *Job) *jobRestoreExecutor {
if running, has := runningRestores.Load(job.ID); has {
return running.(*jobRestoreExecutor)
}
return nil
}
func (e *Executor) newRestoreExecutor(ctx context.Context, job *Job) (*jobRestoreExecutor, error) {
if exe := e.getRestoreExecutor(ctx, job); exe != nil {
return exe, nil
}
logFile, err := e.newLogWriter(job.ID)
if err != nil {
return nil, fmt.Errorf("get log writer fail, %w", err)
}
logger := logrus.New()
logger.SetOutput(io.MultiWriter(os.Stderr, logFile))
exe := &jobRestoreExecutor{
exe: e,
job: job,
state: job.State.GetRestore(),
logFile: logFile,
logger: logger,
}
runningRestores.Store(job.ID, exe)
return exe, nil
}
type jobRestoreExecutor struct {
exe *Executor
job *Job
stateLock sync.Mutex
state *entity.JobRestoreState
progress *progress
logFile *os.File
logger *logrus.Logger
}
func (a *jobRestoreExecutor) submit(ctx context.Context, param *entity.JobRestoreNextParam) {
if err := a.handle(ctx, param); err != nil {
a.logger.WithContext(ctx).Infof("handler param fail, err= %w", err)
}
}
func (a *jobRestoreExecutor) handle(ctx context.Context, param *entity.JobRestoreNextParam) error {
if p := param.GetCopying(); p != nil {
if err := a.switchStep(
ctx, entity.JobRestoreStep_COPYING, entity.JobStatus_PROCESSING,
mapset.NewThreadUnsafeSet(entity.JobRestoreStep_WAIT_FOR_TAPE),
); err != nil {
return err
}
tools.Working()
go tools.WrapWithLogger(ctx, a.logger, func() {
defer tools.Done()
if err := a.restoreTape(tools.ShutdownContext, p.Device); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("restore tape has error, device= '%s'", p.Device)
}
})
return nil
}
if p := param.GetWaitForTape(); p != nil {
return a.switchStep(
ctx, entity.JobRestoreStep_WAIT_FOR_TAPE, entity.JobStatus_PROCESSING,
mapset.NewThreadUnsafeSet(entity.JobRestoreStep_PENDING, entity.JobRestoreStep_COPYING),
)
}
if p := param.GetFinished(); p != nil {
if err := a.switchStep(
ctx, entity.JobRestoreStep_FINISHED, entity.JobStatus_COMPLETED,
mapset.NewThreadUnsafeSet(entity.JobRestoreStep_COPYING),
); err != nil {
return err
}
a.logFile.Close()
runningRestores.Delete(a.job.ID)
return nil
}
return nil
}
func (a *jobRestoreExecutor) restoreTape(ctx context.Context, device string) (rerr error) {
if !a.exe.occupyDevice(device) {
return fmt.Errorf("device is using, device= %s", device)
}
defer a.exe.releaseDevice(device)
defer func() {
if _, found := lo.Find(a.state.Tapes, func(item *entity.RestoreTape) bool {
return item.Status != entity.CopyStatus_SUBMITED
}); found {
a.submit(tools.WithoutTimeout(ctx), &entity.JobRestoreNextParam{
Param: &entity.JobRestoreNextParam_WaitForTape{WaitForTape: &entity.JobRestoreWaitForTapeParam{}},
})
return
}
a.submit(tools.WithoutTimeout(ctx), &entity.JobRestoreNextParam{
Param: &entity.JobRestoreNextParam_Finished{Finished: &entity.JobRestoreFinishedParam{}},
})
}()
readInfoCmd := exec.CommandContext(ctx, a.exe.scripts.ReadInfo)
readInfoCmd.Env = append(readInfoCmd.Env, fmt.Sprintf("DEVICE=%s", device))
infoBuf, err := runCmdWithReturn(a.logger, readInfoCmd)
if err != nil {
return fmt.Errorf("run read info script fail, %w", err)
}
barcode := jsoniter.Get(infoBuf, "barcode").ToString()
if len(barcode) > 6 {
barcode = barcode[:6]
}
restoreTape, found := lo.Find(a.state.Tapes, func(t *entity.RestoreTape) bool {
return t.Barcode == barcode
})
if !found || restoreTape == nil {
expects := lo.Map(a.state.Tapes, func(t *entity.RestoreTape, _ int) string { return t.Barcode })
return fmt.Errorf("unexpected tape barcode in library, has= '%s' expect= %v", barcode, expects)
}
if restoreTape.Status != entity.CopyStatus_PENDING {
return fmt.Errorf("unexpected restore tape state status, has= '%s' expect= '%s'", restoreTape.Status, entity.CopyStatus_PENDING)
}
tape, err := a.exe.lib.GetTape(ctx, restoreTape.TapeId)
if err != nil {
return fmt.Errorf("get tape info fail, barcode= '%s' id= %d, %w", restoreTape.Barcode, restoreTape.TapeId, err)
}
keyPath, keyRecycle, err := a.exe.restoreKey(tape.Encryption)
if err != nil {
return err
}
defer func() {
time.Sleep(time.Second)
keyRecycle()
}()
if err := runCmd(a.logger, a.exe.makeEncryptCmd(ctx, device, keyPath, barcode, tape.Name)); err != nil {
return fmt.Errorf("run encrypt script fail, %w", err)
}
mountPoint, err := os.MkdirTemp("", "*.ltfs")
if err != nil {
return fmt.Errorf("create temp mountpoint, %w", err)
}
sourcePath := tools.Cache(func(p string) string { return path.Join(mountPoint, p) })
mountCmd := exec.CommandContext(ctx, a.exe.scripts.Mount)
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, mountCmd); err != nil {
return fmt.Errorf("run mount script fail, %w", err)
}
defer func() {
umountCmd := exec.CommandContext(tools.WithoutTimeout(ctx), a.exe.scripts.Umount)
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
if err := runCmd(a.logger, umountCmd); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
return
}
if err := os.Remove(mountPoint); err != nil {
a.logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
return
}
}()
opts := make([]acp.Option, 0, 4)
for _, f := range restoreTape.Files {
if f.Status == entity.CopyStatus_SUBMITED {
continue
}
opts = append(opts, acp.AccurateJob(sourcePath(f.TapePath), []string{path.Join(a.exe.paths.Target, f.TargetPath)}))
}
opts = append(opts, acp.WithHash(true))
opts = append(opts, acp.SetFromDevice(acp.LinearDevice(true)))
opts = append(opts, acp.WithLogger(a.logger))
a.progress = newProgress()
defer func() { a.progress = nil }()
convertPath := tools.Cache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
switch e := ev.(type) {
case *acp.EventUpdateCount:
atomic.StoreInt64(&a.progress.totalBytes, e.Bytes)
atomic.StoreInt64(&a.progress.totalFiles, e.Files)
return
case *acp.EventUpdateProgress:
a.progress.setBytes(e.Bytes)
atomic.StoreInt64(&a.progress.files, e.Files)
return
case *acp.EventReportError:
a.logger.WithContext(ctx).Errorf("acp report error, src= '%s' dst= '%s' err= '%s'", e.Error.Src, e.Error.Dst, e.Error.Err)
return
case *acp.EventUpdateJob:
job := e.Job
src := entity.NewSourceFromACPJob(job)
var targetStatus entity.CopyStatus
switch job.Status {
case "pending":
targetStatus = entity.CopyStatus_PENDING
case "preparing":
targetStatus = entity.CopyStatus_RUNNING
case "finished":
a.logger.WithContext(ctx).Infof("file '%s' copy finished, size= %d", src.RealPath(), job.Size)
targetStatus = entity.CopyStatus_SUBMITED
if len(job.SuccessTargets) > 0 {
targetStatus = entity.CopyStatus_FAILED
}
for dst, err := range job.FailTargets {
if err == nil {
continue
}
a.logger.WithContext(ctx).WithError(err).Errorf("file '%s' copy fail, dst= '%s'", src.RealPath(), dst)
}
default:
return
}
a.stateLock.Lock()
defer a.stateLock.Unlock()
realPath := src.RealPath()
idx := sort.Search(len(restoreTape.Files), func(idx int) bool {
return convertPath(realPath) < convertPath(sourcePath(restoreTape.Files[idx].TapePath))
})
target := restoreTape.Files[idx]
if target == nil || realPath != sourcePath(target.TapePath) {
return
}
target.Status = targetStatus
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
logrus.WithContext(ctx).Infof("save job for update file fail, name= %s", job.Base+path.Join(job.Path...))
}
return
}
}))
defer func() {
restoreTape.Status = entity.CopyStatus_SUBMITED
if _, err := a.exe.SaveJob(tools.WithoutTimeout(ctx), a.job); err != nil {
logrus.WithContext(ctx).Infof("save job for submit tape fail, barcode= %s", restoreTape.Barcode)
}
}()
copyer, err := acp.New(ctx, opts...)
if err != nil {
rerr = fmt.Errorf("start copy fail, %w", err)
return
}
copyer.Wait()
return
}
func (a *jobRestoreExecutor) switchStep(ctx context.Context, target entity.JobRestoreStep, status entity.JobStatus, expect mapset.Set[entity.JobRestoreStep]) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
if !expect.Contains(a.state.Step) {
return fmt.Errorf("unexpected current step, target= '%s' expect= '%s' has= '%s'", target, expect, a.state.Step)
}
a.state.Step = target
a.job.Status = status
if _, err := a.exe.SaveJob(ctx, a.job); err != nil {
return fmt.Errorf("switch to step copying, save job fail, %w", err)
}
return nil
}

View File

@@ -0,0 +1,232 @@
package executor
import (
"context"
"fmt"
"io/fs"
"sort"
"strings"
"github.com/abc950309/tapewriter/entity"
"github.com/abc950309/tapewriter/library"
"github.com/abc950309/tapewriter/tools"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
)
type restoreFile struct {
*library.File
target string
}
func (e *Executor) createRestore(ctx context.Context, job *Job, param *entity.JobRestoreParam) error {
files, err := e.getRestoreFiles(ctx, param.FileIds...)
if err != nil {
return fmt.Errorf("get restore files fail, ids= %v, %w", param.FileIds, err)
}
fileIDs := make([]int64, 0, len(files))
for _, file := range files {
fileIDs = append(fileIDs, file.ID)
}
positions, err := e.lib.MGetPositionByFileID(ctx, fileIDs...)
if err != nil {
return err
}
tapeMapping := make(map[int64]mapset.Set[int64], 4)
for _, file := range files {
for _, posi := range positions[file.ID] {
set := tapeMapping[posi.TapeID]
if set == nil {
tapeMapping[posi.TapeID] = mapset.NewThreadUnsafeSet(file.ID)
continue
}
set.Add(file.ID)
}
}
tapeMap, err := e.lib.MGetTape(ctx, lo.Keys(tapeMapping)...)
if err != nil {
return err
}
for tapeID := range tapeMapping {
if tape, has := tapeMap[tapeID]; has && tape != nil {
continue
}
logrus.WithContext(ctx).Infof("tape not found, tape_id= %d", tapeID)
delete(tapeMap, tapeID)
}
restoreTapes := make([]*entity.RestoreTape, 0, len(tapeMapping))
for len(tapeMapping) > 0 {
var maxTapeID int64
for tapeID, files := range tapeMapping {
if maxTapeID == 0 {
maxTapeID = tapeID
continue
}
diff := files.Cardinality() - tapeMapping[maxTapeID].Cardinality()
if diff > 0 {
maxTapeID = tapeID
continue
}
if diff < 0 {
continue
}
if tapeID < maxTapeID {
maxTapeID = tapeID
continue
}
}
if maxTapeID == 0 {
return fmt.Errorf("max tape not found, tape_ids= %v", lo.Keys(tapeMapping))
}
fileIDs := tapeMapping[maxTapeID]
delete(tapeMapping, maxTapeID)
if fileIDs.Cardinality() == 0 {
continue
}
for i, f := range tapeMapping {
tapeMapping[i] = f.Difference(fileIDs)
}
targets := make([]*entity.RestoreFile, 0, fileIDs.Cardinality())
for _, fileID := range fileIDs.ToSlice() {
file := files[fileID]
if file == nil {
continue
}
posi := positions[fileID]
if len(posi) == 0 {
logrus.WithContext(ctx).Infof("file position not found, file_id= %d", fileID)
continue
}
for _, p := range posi {
if p.TapeID != maxTapeID {
continue
}
targets = append(targets, &entity.RestoreFile{
FileId: file.ID,
TapeId: p.TapeID,
PositionId: p.ID,
Status: entity.CopyStatus_PENDING,
Size: file.Size,
TapePath: p.Path,
TargetPath: file.target,
})
break
}
}
convertPath := tools.Cache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
sort.Slice(targets, func(i, j int) bool {
return convertPath(targets[i].TapePath) < convertPath(targets[j].TapePath)
})
restoreTapes = append(restoreTapes, &entity.RestoreTape{
TapeId: maxTapeID,
Barcode: tapeMap[maxTapeID].Barcode,
Status: entity.CopyStatus_PENDING,
Files: targets,
})
}
job.State = &entity.JobState{State: &entity.JobState_Restore{Restore: &entity.JobRestoreState{
Step: entity.JobRestoreStep_PENDING,
Tapes: restoreTapes,
}}}
return nil
}
func (e *Executor) getRestoreFiles(ctx context.Context, rootIDs ...int64) (map[int64]*restoreFile, error) {
rootIDSet := mapset.NewThreadUnsafeSet(rootIDs...)
for _, id := range rootIDs {
parents, err := e.lib.ListParents(ctx, id)
if err != nil {
return nil, err
}
if len(parents) <= 1 {
continue
}
for _, parent := range parents[:len(parents)-1] {
if !rootIDSet.Contains(parent.ID) {
continue
}
rootIDSet.Remove(id)
break
}
}
rootIDs = rootIDSet.ToSlice()
mapping, err := e.lib.MGetFile(ctx, rootIDs...)
if err != nil {
return nil, fmt.Errorf("mget file fail, ids= %v, %w", rootIDs, err)
}
files := make([]*restoreFile, 0, len(rootIDs)*8)
visited := mapset.NewThreadUnsafeSet[int64]()
for _, root := range mapping {
if visited.Contains(root.ID) {
continue
}
visited.Add(root.ID)
if !fs.FileMode(root.Mode).IsDir() {
files = append(files, &restoreFile{File: root, target: root.Name})
continue
}
found, err := e.visitFiles(ctx, root.Name, nil, visited, root.ID)
if err != nil {
return nil, err
}
files = append(files, found...)
}
results := make(map[int64]*restoreFile, len(files))
for _, f := range files {
results[f.ID] = f
}
return results, nil
}
func (e *Executor) visitFiles(ctx context.Context, path string, files []*restoreFile, visited mapset.Set[int64], parentID int64) ([]*restoreFile, error) {
children, err := e.lib.List(ctx, parentID)
if err != nil {
return nil, err
}
for _, child := range children {
if visited.Contains(child.ID) {
continue
}
visited.Add(child.ID)
target := path + "/" + child.Name
if !fs.FileMode(child.Mode).IsDir() {
files = append(files, &restoreFile{File: child, target: target})
continue
}
files, err = e.visitFiles(ctx, target, files, visited, child.ID)
if err != nil {
return nil, err
}
}
return files, nil
}

View File

@@ -0,0 +1,15 @@
package executor
import (
"context"
"github.com/abc950309/tapewriter/entity"
)
func (e *Executor) startRestore(ctx context.Context, job *Job) error {
return e.Submit(ctx, job, &entity.JobNextParam{Param: &entity.JobNextParam_Restore{
Restore: &entity.JobRestoreNextParam{Param: &entity.JobRestoreNextParam_WaitForTape{
WaitForTape: &entity.JobRestoreWaitForTapeParam{},
}},
}})
}

View File

@@ -48,7 +48,7 @@ func (e *Executor) newKey() (string, string, func(), error) {
}
func (e *Executor) makeEncryptCmd(ctx context.Context, device, keyPath, barcode, name string) *exec.Cmd {
cmd := exec.CommandContext(ctx, e.encryptScript)
cmd := exec.CommandContext(ctx, e.scripts.Encrypt)
cmd.Env = append(cmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("KEY_FILE=%s", keyPath), fmt.Sprintf("TAPE_BARCODE=%s", barcode), fmt.Sprintf("TAPE_NAME=%s", name))
return cmd
}

View File

@@ -3,6 +3,7 @@ package executor
import (
"errors"
"fmt"
"io/fs"
"os"
"os/exec"
"path"
@@ -11,7 +12,7 @@ import (
)
func (e *Executor) logPath(jobID int64) (string, string) {
return path.Join(e.workDirectory, "job-logs"), fmt.Sprintf("%d.log", jobID)
return path.Join(e.paths.Work, "job-logs"), fmt.Sprintf("%d.log", jobID)
}
func (e *Executor) newLogWriter(jobID int64) (*os.File, error) {
@@ -41,6 +42,28 @@ func (e *Executor) NewLogReader(jobID int64) (*os.File, error) {
return file, nil
}
func runCmdWithReturn(logger *logrus.Logger, cmd *exec.Cmd) ([]byte, error) {
out, err := os.CreateTemp("", "*.out")
if err != nil {
return nil, fmt.Errorf("create cmd out fail, %w", err)
}
out.Chmod(fs.ModePerm)
out.Close()
defer os.Remove(out.Name())
cmd.Env = append(cmd.Env, fmt.Sprintf("OUT=%s", out.Name()))
if err := runCmd(logger, cmd); err != nil {
return nil, err
}
buf, err := os.ReadFile(out.Name())
if err != nil {
return nil, fmt.Errorf("read cmd out fail, %w", err)
}
return buf, nil
}
func runCmd(logger *logrus.Logger, cmd *exec.Cmd) error {
writer := logger.WriterLevel(logrus.InfoLevel)
cmd.Stdout = writer
@@ -50,7 +73,7 @@ func runCmd(logger *logrus.Logger, cmd *exec.Cmd) error {
}
func (e *Executor) reportPath(barcode string) (string, string) {
return path.Join(e.workDirectory, "write-reports"), fmt.Sprintf("%s.log", barcode)
return path.Join(e.paths.Work, "write-reports"), fmt.Sprintf("%s.log", barcode)
}
func (e *Executor) newReportWriter(barcode string) (*os.File, error) {