mirror of
https://github.com/samuelncui/yatm.git
synced 2025-12-23 06:15:22 +00:00
250 lines
5.8 KiB
Go
250 lines
5.8 KiB
Go
package executor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/fs"
|
|
"sort"
|
|
"strings"
|
|
|
|
mapset "github.com/deckarep/golang-set/v2"
|
|
"github.com/samber/lo"
|
|
"github.com/samuelncui/yatm/entity"
|
|
"github.com/samuelncui/yatm/library"
|
|
"github.com/samuelncui/yatm/tools"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func (a *jobRestoreExecutor) Initialize(ctx context.Context, param *entity.JobParam) error {
|
|
if err := a.applyParam(ctx, param.GetRestore()); err != nil {
|
|
return err
|
|
}
|
|
|
|
return a.dispatch(ctx, &entity.JobRestoreDispatchParam{Param: &entity.JobRestoreDispatchParam_WaitForTape{
|
|
WaitForTape: &entity.JobRestoreWaitForTapeParam{},
|
|
}})
|
|
}
|
|
|
|
type restoreFile struct {
|
|
*library.File
|
|
target string
|
|
}
|
|
|
|
func (a *jobRestoreExecutor) applyParam(ctx context.Context, param *entity.JobRestoreParam) error {
|
|
if param == nil {
|
|
return fmt.Errorf("restore param is nil")
|
|
}
|
|
|
|
return a.updateJob(ctx, func(_ *Job, state *entity.JobRestoreState) error {
|
|
exe := a.exe
|
|
|
|
files, err := exe.getRestoreFiles(ctx, param.FileIds...)
|
|
if err != nil {
|
|
return fmt.Errorf("get restore files fail, ids= %v, %w", param.FileIds, err)
|
|
}
|
|
|
|
fileIDs := make([]int64, 0, len(files))
|
|
for _, file := range files {
|
|
fileIDs = append(fileIDs, file.ID)
|
|
}
|
|
|
|
positions, err := exe.lib.MGetPositionByFileID(ctx, fileIDs...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tapeMapping := make(map[int64]mapset.Set[int64], 4)
|
|
for _, file := range files {
|
|
for _, posi := range positions[file.ID] {
|
|
set := tapeMapping[posi.TapeID]
|
|
if set == nil {
|
|
tapeMapping[posi.TapeID] = mapset.NewThreadUnsafeSet(file.ID)
|
|
continue
|
|
}
|
|
set.Add(file.ID)
|
|
}
|
|
}
|
|
|
|
tapes, err := exe.lib.MGetTape(ctx, lo.Keys(tapeMapping)...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for tapeID := range tapeMapping {
|
|
if tape, has := tapes[tapeID]; has && tape != nil {
|
|
continue
|
|
}
|
|
|
|
logrus.WithContext(ctx).Infof("tape not found, tape_id= %d", tapeID)
|
|
delete(tapeMapping, tapeID)
|
|
}
|
|
|
|
restoreTapes := make([]*entity.RestoreTape, 0, len(tapeMapping))
|
|
for len(tapeMapping) > 0 {
|
|
var maxTapeID int64
|
|
for tapeID, files := range tapeMapping {
|
|
if maxTapeID == 0 {
|
|
maxTapeID = tapeID
|
|
continue
|
|
}
|
|
|
|
diff := files.Cardinality() - tapeMapping[maxTapeID].Cardinality()
|
|
if diff > 0 {
|
|
maxTapeID = tapeID
|
|
continue
|
|
}
|
|
if diff < 0 {
|
|
continue
|
|
}
|
|
if tapeID < maxTapeID {
|
|
maxTapeID = tapeID
|
|
continue
|
|
}
|
|
}
|
|
if maxTapeID == 0 {
|
|
return fmt.Errorf("max tape not found, tape_ids= %v", lo.Keys(tapeMapping))
|
|
}
|
|
|
|
fileIDs := tapeMapping[maxTapeID]
|
|
delete(tapeMapping, maxTapeID)
|
|
if fileIDs.Cardinality() == 0 {
|
|
continue
|
|
}
|
|
for i, f := range tapeMapping {
|
|
tapeMapping[i] = f.Difference(fileIDs)
|
|
}
|
|
|
|
targets := make([]*entity.RestoreFile, 0, fileIDs.Cardinality())
|
|
for _, fileID := range fileIDs.ToSlice() {
|
|
file := files[fileID]
|
|
if file == nil {
|
|
continue
|
|
}
|
|
|
|
posi := positions[fileID]
|
|
if len(posi) == 0 {
|
|
logrus.WithContext(ctx).Infof("file position not found, file_id= %d", fileID)
|
|
continue
|
|
}
|
|
|
|
for _, p := range posi {
|
|
if p.TapeID != maxTapeID {
|
|
continue
|
|
}
|
|
|
|
targets = append(targets, &entity.RestoreFile{
|
|
FileId: file.ID,
|
|
TapeId: p.TapeID,
|
|
PositionId: p.ID,
|
|
Status: entity.CopyStatus_PENDING,
|
|
Size: file.Size,
|
|
Hash: file.Hash,
|
|
TapePath: p.Path,
|
|
TargetPath: file.target,
|
|
})
|
|
break
|
|
}
|
|
}
|
|
|
|
convertPath := tools.ThreadUnsafeCache(func(p string) string { return strings.ReplaceAll(p, "/", "\x00") })
|
|
sort.Slice(targets, func(i, j int) bool {
|
|
return convertPath(targets[i].TapePath) < convertPath(targets[j].TapePath)
|
|
})
|
|
|
|
restoreTapes = append(restoreTapes, &entity.RestoreTape{
|
|
TapeId: maxTapeID,
|
|
Barcode: tapes[maxTapeID].Barcode,
|
|
Status: entity.CopyStatus_PENDING,
|
|
Files: targets,
|
|
})
|
|
}
|
|
|
|
state.Step = entity.JobRestoreStep_PENDING
|
|
state.Tapes = restoreTapes
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (e *Executor) getRestoreFiles(ctx context.Context, rootIDs ...int64) (map[int64]*restoreFile, error) {
|
|
rootIDSet := mapset.NewThreadUnsafeSet(rootIDs...)
|
|
for _, id := range rootIDs {
|
|
parents, err := e.lib.ListParents(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(parents) <= 1 {
|
|
continue
|
|
}
|
|
|
|
for _, parent := range parents[:len(parents)-1] {
|
|
if !rootIDSet.Contains(parent.ID) {
|
|
continue
|
|
}
|
|
|
|
rootIDSet.Remove(id)
|
|
break
|
|
}
|
|
}
|
|
|
|
rootIDs = rootIDSet.ToSlice()
|
|
mapping, err := e.lib.MGetFile(ctx, rootIDs...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("mget file fail, ids= %v, %w", rootIDs, err)
|
|
}
|
|
|
|
files := make([]*restoreFile, 0, len(rootIDs)*8)
|
|
visited := mapset.NewThreadUnsafeSet[int64]()
|
|
for _, root := range mapping {
|
|
if visited.Contains(root.ID) {
|
|
continue
|
|
}
|
|
|
|
visited.Add(root.ID)
|
|
if !fs.FileMode(root.Mode).IsDir() {
|
|
files = append(files, &restoreFile{File: root, target: root.Name})
|
|
continue
|
|
}
|
|
|
|
found, err := e.visitFiles(ctx, root.Name, nil, visited, root.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
files = append(files, found...)
|
|
}
|
|
|
|
results := make(map[int64]*restoreFile, len(files))
|
|
for _, f := range files {
|
|
results[f.ID] = f
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (e *Executor) visitFiles(ctx context.Context, path string, files []*restoreFile, visited mapset.Set[int64], parentID int64) ([]*restoreFile, error) {
|
|
children, err := e.lib.List(ctx, parentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, child := range children {
|
|
if visited.Contains(child.ID) {
|
|
continue
|
|
}
|
|
|
|
visited.Add(child.ID)
|
|
|
|
target := path + "/" + child.Name
|
|
if !fs.FileMode(child.Mode).IsDir() {
|
|
files = append(files, &restoreFile{File: child, target: target})
|
|
continue
|
|
}
|
|
|
|
files, err = e.visitFiles(ctx, target, files, visited, child.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return files, nil
|
|
}
|