Files
yatm/executor/job_restore_init.go
2023-10-12 22:54:07 +08:00

250 lines
5.8 KiB
Go

package executor
import (
"context"
"fmt"
"io/fs"
"sort"
"strings"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/lo"
"github.com/samuelncui/yatm/entity"
"github.com/samuelncui/yatm/library"
"github.com/samuelncui/yatm/tools"
"github.com/sirupsen/logrus"
)
func (a *jobRestoreExecutor) Initialize(ctx context.Context, param *entity.JobParam) error {
if err := a.applyParam(ctx, param.GetRestore()); err != nil {
return err
}
return a.dispatch(ctx, &entity.JobRestoreDispatchParam{Param: &entity.JobRestoreDispatchParam_WaitForTape{
WaitForTape: &entity.JobRestoreWaitForTapeParam{},
}})
}
type restoreFile struct {
*library.File
target string
}
func (a *jobRestoreExecutor) applyParam(ctx context.Context, param *entity.JobRestoreParam) error {
if param == nil {
return fmt.Errorf("restore param is nil")
}
return a.updateJob(ctx, func(_ *Job, state *entity.JobRestoreState) error {
exe := a.exe
files, err := exe.getRestoreFiles(ctx, param.FileIds...)
if err != nil {
return fmt.Errorf("get restore files fail, ids= %v, %w", param.FileIds, err)
}
fileIDs := make([]int64, 0, len(files))
for _, file := range files {
fileIDs = append(fileIDs, file.ID)
}
positions, err := exe.lib.MGetPositionByFileID(ctx, fileIDs...)
if err != nil {
return err
}
tapeMapping := make(map[int64]mapset.Set[int64], 4)
for _, file := range files {
for _, posi := range positions[file.ID] {
set := tapeMapping[posi.TapeID]
if set == nil {
tapeMapping[posi.TapeID] = mapset.NewThreadUnsafeSet(file.ID)
continue
}
set.Add(file.ID)
}
}
tapes, err := exe.lib.MGetTape(ctx, lo.Keys(tapeMapping)...)
if err != nil {
return err
}
for tapeID := range tapeMapping {
if tape, has := tapes[tapeID]; has && tape != nil {
continue
}
logrus.WithContext(ctx).Infof("tape not found, tape_id= %d", tapeID)
delete(tapeMapping, tapeID)
}
restoreTapes := make([]*entity.RestoreTape, 0, len(tapeMapping))
for len(tapeMapping) > 0 {
var maxTapeID int64
for tapeID, files := range tapeMapping {
if maxTapeID == 0 {
maxTapeID = tapeID
continue
}
diff := files.Cardinality() - tapeMapping[maxTapeID].Cardinality()
if diff > 0 {
maxTapeID = tapeID
continue
}
if diff < 0 {
continue
}
if tapeID < maxTapeID {
maxTapeID = tapeID
continue
}
}
if maxTapeID == 0 {
return fmt.Errorf("max tape not found, tape_ids= %v", lo.Keys(tapeMapping))
}
fileIDs := tapeMapping[maxTapeID]
delete(tapeMapping, maxTapeID)
if fileIDs.Cardinality() == 0 {
continue
}
for i, f := range tapeMapping {
tapeMapping[i] = f.Difference(fileIDs)
}
targets := make([]*entity.RestoreFile, 0, fileIDs.Cardinality())
for _, fileID := range fileIDs.ToSlice() {
file := files[fileID]
if file == nil {
continue
}
posi := positions[fileID]
if len(posi) == 0 {
logrus.WithContext(ctx).Infof("file position not found, file_id= %d", fileID)
continue
}
for _, p := range posi {
if p.TapeID != maxTapeID {
continue
}
targets = append(targets, &entity.RestoreFile{
FileId: file.ID,
TapeId: p.TapeID,
PositionId: p.ID,
Status: entity.CopyStatus_PENDING,
Size: file.Size,
Hash: file.Hash,
TapePath: p.Path,
TargetPath: file.target,
})
break
}
}
convertPath := tools.ThreadUnsafeCache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
sort.Slice(targets, func(i, j int) bool {
return convertPath(targets[i].TapePath) < convertPath(targets[j].TapePath)
})
restoreTapes = append(restoreTapes, &entity.RestoreTape{
TapeId: maxTapeID,
Barcode: tapes[maxTapeID].Barcode,
Status: entity.CopyStatus_PENDING,
Files: targets,
})
}
state.Step = entity.JobRestoreStep_PENDING
state.Tapes = restoreTapes
return nil
})
}
func (e *Executor) getRestoreFiles(ctx context.Context, rootIDs ...int64) (map[int64]*restoreFile, error) {
rootIDSet := mapset.NewThreadUnsafeSet(rootIDs...)
for _, id := range rootIDs {
parents, err := e.lib.ListParents(ctx, id)
if err != nil {
return nil, err
}
if len(parents) <= 1 {
continue
}
for _, parent := range parents[:len(parents)-1] {
if !rootIDSet.Contains(parent.ID) {
continue
}
rootIDSet.Remove(id)
break
}
}
rootIDs = rootIDSet.ToSlice()
mapping, err := e.lib.MGetFile(ctx, rootIDs...)
if err != nil {
return nil, fmt.Errorf("mget file fail, ids= %v, %w", rootIDs, err)
}
files := make([]*restoreFile, 0, len(rootIDs)*8)
visited := mapset.NewThreadUnsafeSet[int64]()
for _, root := range mapping {
if visited.Contains(root.ID) {
continue
}
visited.Add(root.ID)
if !fs.FileMode(root.Mode).IsDir() {
files = append(files, &restoreFile{File: root, target: root.Name})
continue
}
found, err := e.visitFiles(ctx, root.Name, nil, visited, root.ID)
if err != nil {
return nil, err
}
files = append(files, found...)
}
results := make(map[int64]*restoreFile, len(files))
for _, f := range files {
results[f.ID] = f
}
return results, nil
}
func (e *Executor) visitFiles(ctx context.Context, path string, files []*restoreFile, visited mapset.Set[int64], parentID int64) ([]*restoreFile, error) {
children, err := e.lib.List(ctx, parentID)
if err != nil {
return nil, err
}
for _, child := range children {
if visited.Contains(child.ID) {
continue
}
visited.Add(child.ID)
target := path + "/" + child.Name
if !fs.FileMode(child.Mode).IsDir() {
files = append(files, &restoreFile{File: child, target: target})
continue
}
files, err = e.visitFiles(ctx, target, files, visited, child.ID)
if err != nil {
return nil, err
}
}
return files, nil
}