mirror of
https://github.com/samuelncui/yatm.git
synced 2026-01-06 21:36:30 +00:00
feat: useable
This commit is contained in:
131
executor/executor.go
Normal file
131
executor/executor.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/abc950309/tapewriter/entity"
|
||||
"github.com/abc950309/tapewriter/library"
|
||||
mapset "github.com/deckarep/golang-set/v2"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Executor struct {
|
||||
db *gorm.DB
|
||||
lib *library.Library
|
||||
|
||||
devices []string
|
||||
|
||||
devicesLock sync.Mutex
|
||||
availableDevices mapset.Set[string]
|
||||
|
||||
workDirectory string
|
||||
encryptScript string
|
||||
mkfsScript string
|
||||
mountScript string
|
||||
umountScript string
|
||||
}
|
||||
|
||||
func New(
|
||||
db *gorm.DB, lib *library.Library,
|
||||
devices []string, workDirectory string,
|
||||
encryptScript, mkfsScript, mountScript, umountScript string,
|
||||
) *Executor {
|
||||
return &Executor{
|
||||
db: db,
|
||||
lib: lib,
|
||||
devices: devices,
|
||||
availableDevices: mapset.NewThreadUnsafeSet(devices...),
|
||||
encryptScript: encryptScript,
|
||||
mkfsScript: mkfsScript,
|
||||
mountScript: mountScript,
|
||||
umountScript: umountScript,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Executor) AutoMigrate() error {
|
||||
return e.db.AutoMigrate(ModelJob)
|
||||
}
|
||||
|
||||
func (e *Executor) ListAvailableDevices() []string {
|
||||
e.devicesLock.Lock()
|
||||
defer e.devicesLock.Unlock()
|
||||
|
||||
devices := e.availableDevices.ToSlice()
|
||||
sort.Slice(devices, func(i, j int) bool {
|
||||
return devices[i] < devices[j]
|
||||
})
|
||||
|
||||
return devices
|
||||
}
|
||||
|
||||
func (e *Executor) occupyDevice(dev string) bool {
|
||||
e.devicesLock.Lock()
|
||||
defer e.devicesLock.Unlock()
|
||||
|
||||
if !e.availableDevices.Contains(dev) {
|
||||
return false
|
||||
}
|
||||
|
||||
e.availableDevices.Remove(dev)
|
||||
return true
|
||||
}
|
||||
|
||||
func (e *Executor) releaseDevice(dev string) {
|
||||
e.devicesLock.Lock()
|
||||
defer e.devicesLock.Unlock()
|
||||
e.availableDevices.Add(dev)
|
||||
}
|
||||
|
||||
func (e *Executor) Start(ctx context.Context, job *Job) error {
|
||||
job.Status = entity.JobStatus_Processing
|
||||
if _, err := e.SaveJob(ctx, job); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if state := job.State.GetArchive(); state != nil {
|
||||
if err := e.startArchive(ctx, job); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("unexpected state type, %T", job.State.State)
|
||||
}
|
||||
|
||||
func (e *Executor) Submit(ctx context.Context, job *Job, param *entity.JobNextParam) error {
|
||||
if job.Status != entity.JobStatus_Processing {
|
||||
return fmt.Errorf("target job is not on processing, status= %s", job.Status)
|
||||
}
|
||||
|
||||
if state := job.State.GetArchive(); state != nil {
|
||||
exe, err := e.newArchiveExecutor(ctx, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
exe.submit(param.GetArchive())
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("unexpected state type, %T", job.State.State)
|
||||
}
|
||||
|
||||
func (e *Executor) Display(ctx context.Context, job *Job) (*entity.JobDisplay, error) {
|
||||
if job.Status != entity.JobStatus_Processing {
|
||||
return nil, fmt.Errorf("target job is not on processing, status= %s", job.Status)
|
||||
}
|
||||
|
||||
if state := job.State.GetArchive(); state != nil {
|
||||
display, err := e.getArchiveDisplay(ctx, job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &entity.JobDisplay{Display: &entity.JobDisplay_Archive{Archive: display}}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unexpected state type, %T", job.State.State)
|
||||
}
|
||||
132
executor/job.go
Normal file
132
executor/job.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/abc950309/tapewriter/entity"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
var (
|
||||
ModelJob = &Job{}
|
||||
|
||||
ErrJobNotFound = fmt.Errorf("get job: job not found")
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
ID int64 `gorm:"primaryKey;autoIncrement"`
|
||||
Status entity.JobStatus
|
||||
Priority int64
|
||||
State *entity.JobState
|
||||
|
||||
CreateTime time.Time
|
||||
UpdateTime time.Time
|
||||
}
|
||||
|
||||
func (j *Job) BeforeUpdate(tx *gorm.DB) error {
|
||||
j.UpdateTime = time.Now()
|
||||
if j.CreateTime.IsZero() {
|
||||
j.CreateTime = j.UpdateTime
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Executor) initJob(ctx context.Context, job *Job, param *entity.JobParam) error {
|
||||
if p := param.GetArchive(); p != nil {
|
||||
return e.initArchive(ctx, job, p)
|
||||
}
|
||||
return fmt.Errorf("unexpected param type, %T", param.Param)
|
||||
}
|
||||
|
||||
func (e *Executor) CreateJob(ctx context.Context, job *Job, param *entity.JobParam) (*Job, error) {
|
||||
if err := e.initJob(ctx, job, param); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if r := e.db.WithContext(ctx).Create(job); r.Error != nil {
|
||||
return nil, fmt.Errorf("save job fail, err= %w", r.Error)
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (e *Executor) SaveJob(ctx context.Context, job *Job) (*Job, error) {
|
||||
if r := e.db.WithContext(ctx).Save(job); r.Error != nil {
|
||||
return nil, fmt.Errorf("save job fail, err= %w", r.Error)
|
||||
}
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (e *Executor) MGetJob(ctx context.Context, ids ...int64) (map[int64]*Job, error) {
|
||||
if len(ids) == 0 {
|
||||
return map[int64]*Job{}, nil
|
||||
}
|
||||
|
||||
jobs := make([]*Job, 0, len(ids))
|
||||
if r := e.db.WithContext(ctx).Where("id IN (?)", ids).Find(&jobs); r.Error != nil {
|
||||
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
|
||||
}
|
||||
|
||||
result := make(map[int64]*Job, len(jobs))
|
||||
for _, job := range jobs {
|
||||
result[job.ID] = job
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (e *Executor) GetJob(ctx context.Context, id int64) (*Job, error) {
|
||||
jobs, err := e.MGetJob(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
job, ok := jobs[id]
|
||||
if !ok || job == nil {
|
||||
return nil, ErrJobNotFound
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// func (e *Executor) getNextJob(ctx context.Context) (*Job, error) {
|
||||
// job := new(Job)
|
||||
// if r := e.db.WithContext(ctx).
|
||||
// Where("status = ?", entity.JobStatus_Pending).
|
||||
// Order("priority DESC, create_time ASC").
|
||||
// Limit(1).First(job); r.Error != nil {
|
||||
// if errors.Is(r.Error, gorm.ErrRecordNotFound) {
|
||||
// return nil, nil
|
||||
// }
|
||||
// return nil, r.Error
|
||||
// }
|
||||
|
||||
// return job, nil
|
||||
// }
|
||||
|
||||
func (e *Executor) ListJob(ctx context.Context, filter *entity.JobFilter) ([]*Job, error) {
|
||||
db := e.db.WithContext(ctx)
|
||||
if filter.Status != nil {
|
||||
db.Where("status = ?", *filter.Status)
|
||||
}
|
||||
|
||||
if filter.Limit != nil {
|
||||
db.Limit(int(*filter.Limit))
|
||||
} else {
|
||||
db.Limit(20)
|
||||
}
|
||||
if filter.Offset != nil {
|
||||
db.Offset(int(*filter.Offset))
|
||||
}
|
||||
|
||||
db.Order("create_time DESC")
|
||||
|
||||
jobs := make([]*Job, 0, 20)
|
||||
if r := db.Find(&jobs); r.Error != nil {
|
||||
return nil, fmt.Errorf("list jobs fail, err= %w", r.Error)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
22
executor/job_archive_display.go
Normal file
22
executor/job_archive_display.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/abc950309/tapewriter/entity"
|
||||
)
|
||||
|
||||
func (e *Executor) getArchiveDisplay(ctx context.Context, job *Job) (*entity.JobDisplayArchive, error) {
|
||||
display := new(entity.JobDisplayArchive)
|
||||
|
||||
if exe := e.getArchiveExecutor(ctx, job); exe != nil && exe.progress != nil {
|
||||
display.CopyedBytes = atomic.LoadInt64(&exe.progress.bytes)
|
||||
display.CopyedFiles = atomic.LoadInt64(&exe.progress.files)
|
||||
display.TotalBytes = atomic.LoadInt64(&exe.progress.totalBytes)
|
||||
display.TotalFiles = atomic.LoadInt64(&exe.progress.totalFiles)
|
||||
display.Speed = atomic.LoadInt64(&exe.progress.speed)
|
||||
}
|
||||
|
||||
return display, nil
|
||||
}
|
||||
357
executor/job_archive_exe.go
Normal file
357
executor/job_archive_exe.go
Normal file
@@ -0,0 +1,357 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/abc950309/acp"
|
||||
"github.com/abc950309/tapewriter/entity"
|
||||
"github.com/abc950309/tapewriter/library"
|
||||
"github.com/abc950309/tapewriter/tools"
|
||||
mapset "github.com/deckarep/golang-set/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
runningArchives sync.Map
|
||||
)
|
||||
|
||||
func (e *Executor) getArchiveExecutor(ctx context.Context, job *Job) *jobArchiveExecutor {
|
||||
if running, has := runningArchives.Load(job.ID); has {
|
||||
return running.(*jobArchiveExecutor)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Executor) newArchiveExecutor(ctx context.Context, job *Job) (*jobArchiveExecutor, error) {
|
||||
if exe := e.getArchiveExecutor(ctx, job); exe != nil {
|
||||
return exe, nil
|
||||
}
|
||||
|
||||
logFile, err := e.newLogWriter(job.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get log writer fail, %w", err)
|
||||
}
|
||||
|
||||
logger := logrus.New()
|
||||
logger.SetOutput(io.MultiWriter(os.Stderr, logFile))
|
||||
|
||||
exe := &jobArchiveExecutor{
|
||||
ctx: context.Background(),
|
||||
exe: e,
|
||||
job: job,
|
||||
|
||||
state: job.State.GetArchive(),
|
||||
|
||||
progress: new(progress),
|
||||
logFile: logFile,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
runningArchives.Store(job.ID, exe)
|
||||
return exe, nil
|
||||
}
|
||||
|
||||
type jobArchiveExecutor struct {
|
||||
ctx context.Context
|
||||
exe *Executor
|
||||
job *Job
|
||||
|
||||
stateLock sync.Mutex
|
||||
state *entity.JobStateArchive
|
||||
|
||||
progress *progress
|
||||
logFile *os.File
|
||||
logger *logrus.Logger
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) submit(param *entity.JobArchiveNextParam) {
|
||||
if err := a.handle(param); err != nil {
|
||||
a.logger.WithContext(a.ctx).Infof("handler param fail, err= %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) handle(param *entity.JobArchiveNextParam) error {
|
||||
if p := param.GetCopying(); p != nil {
|
||||
if err := a.switchStep(entity.JobArchiveStep_Copying, entity.JobStatus_Processing, mapset.NewThreadUnsafeSet(entity.JobArchiveStep_WaitForTape)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go tools.Wrap(a.ctx, func() {
|
||||
_, err := a.makeTape(p.Device, p.Barcode, p.Name)
|
||||
if err != nil {
|
||||
a.logger.WithContext(a.ctx).WithError(err).Errorf("make type has error, barcode= '%s' name= '%s'", p.Barcode, p.Name)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if p := param.GetWaitForTape(); p != nil {
|
||||
return a.switchStep(entity.JobArchiveStep_WaitForTape, entity.JobStatus_Processing, mapset.NewThreadUnsafeSet(entity.JobArchiveStep_Pending, entity.JobArchiveStep_Copying))
|
||||
}
|
||||
|
||||
if p := param.GetFinished(); p != nil {
|
||||
if err := a.switchStep(entity.JobArchiveStep_Finished, entity.JobStatus_Completed, mapset.NewThreadUnsafeSet(entity.JobArchiveStep_Copying)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.logFile.Close()
|
||||
runningArchives.Delete(a.job.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) makeTape(device, barcode, name string) (*library.Tape, error) {
|
||||
if !a.exe.occupyDevice(device) {
|
||||
return nil, fmt.Errorf("device is using, device= %s", device)
|
||||
}
|
||||
defer a.exe.releaseDevice(device)
|
||||
defer a.makeTapeFinished()
|
||||
|
||||
encryption, keyPath, keyRecycle, err := a.exe.newKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
time.Sleep(time.Second)
|
||||
keyRecycle()
|
||||
}()
|
||||
|
||||
if err := runCmd(a.logger, a.exe.makeEncryptCmd(a.ctx, device, keyPath, barcode, name)); err != nil {
|
||||
return nil, fmt.Errorf("run encrypt script fail, %w", err)
|
||||
}
|
||||
|
||||
mkfsCmd := exec.CommandContext(a.ctx, a.exe.mkfsScript)
|
||||
mkfsCmd.Env = append(mkfsCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("TAPE_BARCODE=%s", barcode), fmt.Sprintf("TAPE_NAME=%s", name))
|
||||
if err := runCmd(a.logger, mkfsCmd); err != nil {
|
||||
return nil, fmt.Errorf("run mkfs script fail, %w", err)
|
||||
}
|
||||
|
||||
mountPoint, err := os.MkdirTemp("", "*.ltfs")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create temp mountpoint, %w", err)
|
||||
}
|
||||
|
||||
mountCmd := exec.CommandContext(a.ctx, a.exe.mountScript)
|
||||
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
|
||||
if err := runCmd(a.logger, mountCmd); err != nil {
|
||||
return nil, fmt.Errorf("run mount script fail, %w", err)
|
||||
}
|
||||
defer func() {
|
||||
umountCmd := exec.CommandContext(a.ctx, a.exe.umountScript)
|
||||
umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
|
||||
if err := runCmd(a.logger, umountCmd); err != nil {
|
||||
a.logger.WithContext(a.ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
|
||||
return
|
||||
}
|
||||
if err := os.Remove(mountPoint); err != nil {
|
||||
a.logger.WithContext(a.ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
opts := make([]acp.Option, 0, 4)
|
||||
for _, source := range a.state.Sources {
|
||||
if source.Status == entity.CopyStatus_Submited {
|
||||
continue
|
||||
}
|
||||
opts = append(opts, acp.AccurateSource(source.Source.Base, source.Source.Path))
|
||||
}
|
||||
|
||||
opts = append(opts, acp.Target(mountPoint))
|
||||
opts = append(opts, acp.WithHash(true))
|
||||
opts = append(opts, acp.SetToDevice(acp.LinearDevice(true)))
|
||||
opts = append(opts, acp.WithLogger(a.logger))
|
||||
|
||||
reportHander, reportGetter := acp.NewReportGetter()
|
||||
opts = append(opts, acp.WithEventHandler(reportHander))
|
||||
opts = append(opts, acp.WithEventHandler(func(ev acp.Event) {
|
||||
switch e := ev.(type) {
|
||||
case *acp.EventUpdateCount:
|
||||
atomic.StoreInt64(&a.progress.totalBytes, e.Bytes)
|
||||
atomic.StoreInt64(&a.progress.totalFiles, e.Files)
|
||||
return
|
||||
case *acp.EventUpdateProgress:
|
||||
atomic.StoreInt64(&a.progress.bytes, e.Bytes)
|
||||
atomic.StoreInt64(&a.progress.files, e.Files)
|
||||
return
|
||||
case *acp.EventUpdateJob:
|
||||
job := e.Job
|
||||
src := entity.NewSourceFromACPJob(job)
|
||||
|
||||
var targetStatus entity.CopyStatus
|
||||
switch job.Status {
|
||||
case "pending":
|
||||
targetStatus = entity.CopyStatus_Pending
|
||||
case "preparing":
|
||||
a.logger.Infof("file '%s' starts to prepare for copy, size= %d", src.RealPath(), job.Size)
|
||||
targetStatus = entity.CopyStatus_Running
|
||||
case "finished":
|
||||
a.logger.Infof("file '%s' copy finished, size= %d", src.RealPath(), job.Size)
|
||||
targetStatus = entity.CopyStatus_Staged
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
idx := sort.Search(len(a.state.Sources), func(idx int) bool {
|
||||
return src.Compare(a.state.Sources[idx].Source) <= 0
|
||||
})
|
||||
|
||||
target := a.state.Sources[idx]
|
||||
if target == nil || !src.Equal(target.Source) {
|
||||
return
|
||||
}
|
||||
target.Status = targetStatus
|
||||
|
||||
if _, err := a.exe.SaveJob(a.ctx, a.job); err != nil {
|
||||
logrus.WithContext(a.ctx).Infof("save job for update file fail, name= %s", job.Base+path.Join(job.Path...))
|
||||
}
|
||||
return
|
||||
}
|
||||
}))
|
||||
|
||||
copyer, err := acp.New(a.ctx, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("start copy fail, %w", err)
|
||||
}
|
||||
copyer.Wait()
|
||||
|
||||
report := reportGetter()
|
||||
sort.Slice(report.Jobs, func(i, j int) bool {
|
||||
return entity.NewSourceFromACPJob(report.Jobs[i]).Compare(entity.NewSourceFromACPJob(report.Jobs[j])) < 0
|
||||
})
|
||||
|
||||
filteredJobs := make([]*acp.Job, 0, len(report.Jobs))
|
||||
files := make([]*library.TapeFile, 0, len(report.Jobs))
|
||||
for _, job := range report.Jobs {
|
||||
if len(job.SuccessTargets) == 0 {
|
||||
continue
|
||||
}
|
||||
if !job.Mode.IsRegular() {
|
||||
continue
|
||||
}
|
||||
|
||||
hash, err := hex.DecodeString(job.SHA256)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode sha256 fail, err= %w", err)
|
||||
}
|
||||
|
||||
files = append(files, &library.TapeFile{
|
||||
Path: path.Join(job.Path...),
|
||||
Size: job.Size,
|
||||
Mode: job.Mode,
|
||||
ModTime: job.ModTime,
|
||||
WriteTime: job.WriteTime,
|
||||
Hash: hash,
|
||||
})
|
||||
filteredJobs = append(filteredJobs, job)
|
||||
}
|
||||
|
||||
tape, err := a.exe.lib.CreateTape(a.ctx, &library.Tape{
|
||||
Barcode: barcode,
|
||||
Name: name,
|
||||
Encryption: encryption,
|
||||
CreateTime: time.Now(),
|
||||
}, files)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create tape fail, barcode= '%s' name= '%s', %w", barcode, name, err)
|
||||
}
|
||||
if err := a.exe.lib.TrimFiles(a.ctx); err != nil {
|
||||
a.logger.WithError(err).Warnf("trim library files fail")
|
||||
}
|
||||
|
||||
if err := a.markSourcesAsSubmited(filteredJobs); err != nil {
|
||||
a.submit(&entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_WaitForTape{WaitForTape: &entity.JobArchiveWaitForTapeParam{}}})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tape, nil
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) switchStep(target entity.JobArchiveStep, status entity.JobStatus, expect mapset.Set[entity.JobArchiveStep]) error {
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
if !expect.Contains(a.state.Step) {
|
||||
return fmt.Errorf("unexpected current step, target= '%s' expect= '%s' has= '%s'", target, expect, a.state.Step)
|
||||
}
|
||||
|
||||
a.state.Step = target
|
||||
a.job.Status = status
|
||||
if _, err := a.exe.SaveJob(a.ctx, a.job); err != nil {
|
||||
return fmt.Errorf("switch to step copying, save job fail, %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) markSourcesAsSubmited(jobs []*acp.Job) error {
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
searchableSource := a.state.Sources[:]
|
||||
for _, job := range jobs {
|
||||
src := entity.NewSourceFromACPJob(job)
|
||||
for idx, testSrc := range searchableSource {
|
||||
if src.Compare(testSrc.Source) <= 0 {
|
||||
searchableSource = searchableSource[idx:]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
target := searchableSource[0]
|
||||
if target == nil || !src.Equal(target.Source) {
|
||||
continue
|
||||
}
|
||||
|
||||
target.Status = entity.CopyStatus_Submited
|
||||
}
|
||||
|
||||
if _, err := a.exe.SaveJob(a.ctx, a.job); err != nil {
|
||||
return fmt.Errorf("mark sources as submited, save job, %w", err)
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&a.progress.bytes, 0)
|
||||
atomic.StoreInt64(&a.progress.files, 0)
|
||||
atomic.StoreInt64(&a.progress.totalBytes, 0)
|
||||
atomic.StoreInt64(&a.progress.totalFiles, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) getTodoSources() int {
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
var todo int
|
||||
for _, s := range a.state.Sources {
|
||||
if s.Status == entity.CopyStatus_Submited {
|
||||
continue
|
||||
}
|
||||
todo++
|
||||
}
|
||||
|
||||
return todo
|
||||
}
|
||||
|
||||
func (a *jobArchiveExecutor) makeTapeFinished() {
|
||||
if a.getTodoSources() > 0 {
|
||||
a.submit(&entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_WaitForTape{WaitForTape: &entity.JobArchiveWaitForTapeParam{}}})
|
||||
} else {
|
||||
a.submit(&entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_Finished{Finished: &entity.JobArchiveFinishedParam{}}})
|
||||
}
|
||||
}
|
||||
74
executor/job_archive_param.go
Normal file
74
executor/job_archive_param.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/abc950309/acp"
|
||||
"github.com/abc950309/tapewriter/entity"
|
||||
)
|
||||
|
||||
func (e *Executor) initArchive(ctx context.Context, job *Job, param *entity.JobParamArchive) error {
|
||||
var err error
|
||||
sources := make([]*entity.SourceState, 0, len(param.Sources)*8)
|
||||
for _, src := range param.Sources {
|
||||
sources, err = walk(ctx, src, sources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sort.Slice(sources, func(i, j int) bool {
|
||||
return sources[i].Source.Compare(sources[j].Source) < 0
|
||||
})
|
||||
|
||||
for idx, src := range sources {
|
||||
if idx > 0 && sources[idx-1].Source.Equal(src.Source) {
|
||||
return fmt.Errorf("have multi file with same path, path= %s", src.Source.RealPath())
|
||||
}
|
||||
}
|
||||
|
||||
job.State = &entity.JobState{State: &entity.JobState_Archive{Archive: &entity.JobStateArchive{
|
||||
Step: entity.JobArchiveStep_Pending,
|
||||
Sources: sources,
|
||||
}}}
|
||||
return nil
|
||||
}
|
||||
|
||||
func walk(ctx context.Context, src *entity.Source, sources []*entity.SourceState) ([]*entity.SourceState, error) {
|
||||
path := src.RealPath()
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("walk get stat, path= '%s', %w", path, err)
|
||||
}
|
||||
|
||||
mode := stat.Mode()
|
||||
if mode.IsRegular() {
|
||||
if stat.Name() == ".DS_Store" {
|
||||
return sources, nil
|
||||
}
|
||||
return append(sources, &entity.SourceState{
|
||||
Source: src,
|
||||
Size: stat.Size(),
|
||||
Status: entity.CopyStatus_Pending,
|
||||
}), nil
|
||||
}
|
||||
if mode&acp.UnexpectFileMode != 0 {
|
||||
return sources, nil
|
||||
}
|
||||
|
||||
files, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("walk read dir, path= '%s', %w", path, err)
|
||||
}
|
||||
for _, file := range files {
|
||||
sources, err = walk(ctx, src.Append(file.Name()), sources)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return sources, nil
|
||||
}
|
||||
15
executor/job_archive_start.go
Normal file
15
executor/job_archive_start.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/abc950309/tapewriter/entity"
|
||||
)
|
||||
|
||||
func (e *Executor) startArchive(ctx context.Context, job *Job) error {
|
||||
return e.Submit(ctx, job, &entity.JobNextParam{Param: &entity.JobNextParam_Archive{
|
||||
Archive: &entity.JobArchiveNextParam{Param: &entity.JobArchiveNextParam_WaitForTape{
|
||||
WaitForTape: &entity.JobArchiveWaitForTapeParam{},
|
||||
}},
|
||||
}})
|
||||
}
|
||||
59
executor/job_restore.go
Normal file
59
executor/job_restore.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/abc950309/tapewriter/library"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (e *Executor) RestoreLoadTape(ctx context.Context, device string, tape *library.Tape) error {
|
||||
if !e.occupyDevice(device) {
|
||||
return fmt.Errorf("device is using, device= %s", device)
|
||||
}
|
||||
defer e.releaseDevice(device)
|
||||
|
||||
keyPath, keyRecycle, err := e.restoreKey(tape.Encryption)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
time.Sleep(time.Second)
|
||||
keyRecycle()
|
||||
}()
|
||||
|
||||
logger := logrus.StandardLogger()
|
||||
|
||||
if err := runCmd(logger, e.makeEncryptCmd(ctx, device, keyPath, tape.Barcode, tape.Name)); err != nil {
|
||||
return fmt.Errorf("run encrypt script fail, %w", err)
|
||||
}
|
||||
|
||||
mountPoint, err := os.MkdirTemp("", "*.ltfs")
|
||||
if err != nil {
|
||||
return fmt.Errorf("create temp mountpoint, %w", err)
|
||||
}
|
||||
|
||||
mountCmd := exec.CommandContext(ctx, e.mountScript)
|
||||
mountCmd.Env = append(mountCmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
|
||||
if err := runCmd(logger, mountCmd); err != nil {
|
||||
return fmt.Errorf("run mount script fail, %w", err)
|
||||
}
|
||||
// defer func() {
|
||||
// umountCmd := exec.CommandContext(ctx, e.umountScript)
|
||||
// umountCmd.Env = append(umountCmd.Env, fmt.Sprintf("MOUNT_POINT=%s", mountPoint))
|
||||
// if err := runCmd(logger, umountCmd); err != nil {
|
||||
// logger.WithContext(ctx).WithError(err).Errorf("run umount script fail, %s", mountPoint)
|
||||
// return
|
||||
// }
|
||||
// if err := os.Remove(mountPoint); err != nil {
|
||||
// logger.WithContext(ctx).WithError(err).Errorf("remove mount point fail, %s", mountPoint)
|
||||
// return
|
||||
// }
|
||||
// }()
|
||||
|
||||
return nil
|
||||
}
|
||||
54
executor/key.go
Normal file
54
executor/key.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
keySize = 256
|
||||
keyV1Header = "v1:"
|
||||
)
|
||||
|
||||
// restoreKey returns (path, recycle, error)
|
||||
func (e *Executor) restoreKey(str string) (string, func(), error) {
|
||||
file, err := os.CreateTemp("", "*.key")
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("restore key, create temp, %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
if strings.HasPrefix(str, keyV1Header) {
|
||||
if _, err := file.WriteString(str[len(keyV1Header):]); err != nil {
|
||||
return "", nil, fmt.Errorf("restore key, write key, %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return file.Name(), func() { os.Remove(file.Name()) }, nil
|
||||
}
|
||||
|
||||
// newKey returns (key, path, recycle, error)
|
||||
func (e *Executor) newKey() (string, string, func(), error) {
|
||||
keyBuf := make([]byte, keySize/8)
|
||||
if _, err := rand.Reader.Read(keyBuf); err != nil {
|
||||
return "", "", nil, fmt.Errorf("gen key fail, %w", err)
|
||||
}
|
||||
key := keyV1Header + hex.EncodeToString(keyBuf)
|
||||
|
||||
path, recycle, err := e.restoreKey(key)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
return key, path, recycle, nil
|
||||
}
|
||||
|
||||
func (e *Executor) makeEncryptCmd(ctx context.Context, device, keyPath, barcode, name string) *exec.Cmd {
|
||||
cmd := exec.CommandContext(ctx, e.encryptScript)
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("DEVICE=%s", device), fmt.Sprintf("KEY_FILE=%s", keyPath), fmt.Sprintf("TAPE_BARCODE=%s", barcode), fmt.Sprintf("TAPE_NAME=%s", name))
|
||||
return cmd
|
||||
}
|
||||
50
executor/log.go
Normal file
50
executor/log.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (e *Executor) logPath(jobID int64) (string, string) {
|
||||
return path.Join(e.workDirectory, "job-logs"), fmt.Sprintf("%d.log", jobID)
|
||||
}
|
||||
|
||||
func (e *Executor) newLogWriter(jobID int64) (*os.File, error) {
|
||||
dir, filename := e.logPath(jobID)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("make job log dir fail, path= '%s', err= %w", dir, err)
|
||||
}
|
||||
|
||||
file, err := os.OpenFile(path.Join(dir, filename), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create file fail, path= '%s', err= %w", path.Join(dir, filename), err)
|
||||
}
|
||||
|
||||
return file, nil
|
||||
}
|
||||
|
||||
func (e *Executor) NewLogReader(jobID int64) (*os.File, error) {
|
||||
dir, filename := e.logPath(jobID)
|
||||
file, err := os.OpenFile(path.Join(dir, filename), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("create file")
|
||||
}
|
||||
|
||||
return file, nil
|
||||
}
|
||||
|
||||
func runCmd(logger *logrus.Logger, cmd *exec.Cmd) error {
|
||||
writer := logger.WriterLevel(logrus.InfoLevel)
|
||||
cmd.Stdout = writer
|
||||
cmd.Stderr = writer
|
||||
|
||||
return cmd.Run()
|
||||
}
|
||||
8
executor/progress.go
Normal file
8
executor/progress.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package executor
|
||||
|
||||
type progress struct {
|
||||
speed int64
|
||||
|
||||
totalBytes, totalFiles int64
|
||||
bytes, files int64
|
||||
}
|
||||
Reference in New Issue
Block a user