package library import ( "context" "encoding/json" "fmt" "io/fs" "sort" "strings" mapset "github.com/deckarep/golang-set/v2" "github.com/modern-go/reflect2" "github.com/samber/lo" "github.com/samuelncui/yatm/entity" "gorm.io/gorm" ) const ( batchSize = 100 ) type Library struct { db *gorm.DB } func New(db *gorm.DB) *Library { return &Library{db: db} } func (l *Library) AutoMigrate() error { return l.db.AutoMigrate(ModelFile, ModelPosition, ModelTape) } type ExportLibrary struct { Files *[]*File `json:"files,omitempty"` Tapes *[]*Tape `json:"tapes,omitempty"` Positions *[]*Position `json:"positions,omitempty"` } func (l *Library) Export(ctx context.Context, types []entity.LibraryEntityType) ([]byte, error) { results := new(ExportLibrary) for _, t := range lo.Uniq(types) { switch t { case entity.LibraryEntityType_FILE: files, err := listAll(ctx, l, make([]*File, 0, batchSize)) if err != nil { return nil, fmt.Errorf("list all files fail, %w", err) } results.Files = &files case entity.LibraryEntityType_TAPE: tapes, err := listAll(ctx, l, make([]*Tape, 0, batchSize)) if err != nil { return nil, fmt.Errorf("list all tapes fail, %w", err) } results.Tapes = &tapes case entity.LibraryEntityType_POSITION: positions, err := listAll(ctx, l, make([]*Position, 0, batchSize)) if err != nil { return nil, fmt.Errorf("list all positions fail, %w", err) } results.Positions = &positions } } return json.Marshal(results) } func (l *Library) Import(ctx context.Context, buf []byte) error { results := new(ExportLibrary) if err := json.Unmarshal(buf, results); err != nil { return fmt.Errorf("unmarshal import data fail, %w", err) } if results.Files != nil { if r := l.db.WithContext(ctx).Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(ModelFile); r.Error != nil { return fmt.Errorf("cleanup file fail, %w", r.Error) } if r := l.db.WithContext(ctx).CreateInBatches(*results.Files, 100); r.Error != nil { return fmt.Errorf("insert file fail, %w", r.Error) } } if results.Tapes != nil { if r := l.db.WithContext(ctx).Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(ModelTape); r.Error != nil { return fmt.Errorf("cleanup tape fail, %w", r.Error) } if r := l.db.WithContext(ctx).CreateInBatches(*results.Tapes, 100); r.Error != nil { return fmt.Errorf("insert tape fail, %w", r.Error) } } if results.Positions != nil { if r := l.db.WithContext(ctx).Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(ModelPosition); r.Error != nil { return fmt.Errorf("cleanup position fail, %w", r.Error) } if r := l.db.WithContext(ctx).CreateInBatches(*results.Positions, 100); r.Error != nil { return fmt.Errorf("insert position fail, %w", r.Error) } } return nil } func (l *Library) Trim(ctx context.Context, position, file bool) error { if !position { return nil } var current int64 for { positions := make([]*Position, 0, batchSize) if r := l.db.WithContext(ctx).Where("id > ?", current).Order("id ASC").Limit(batchSize).Find(&positions); r.Error != nil { return fmt.Errorf("scan position fail, err= %w", r.Error) } if len(positions) == 0 { break } current = positions[len(positions)-1].ID tapeIDs := mapset.NewThreadUnsafeSetWithSize[int64](1) for _, posi := range positions { tapeIDs.Add(posi.TapeID) } tapes, err := l.MGetTape(ctx, tapeIDs.ToSlice()...) if err != nil { return fmt.Errorf("mget tape fail, %w", err) } needDelete := make([]int64, 0) for _, posi := range positions { if tape, has := tapes[posi.TapeID]; has && tape != nil { continue } needDelete = append(needDelete, posi.ID) } if len(needDelete) == 0 { continue } if err := l.DeletePositions(ctx, needDelete...); err != nil { return fmt.Errorf("delete position fail, %w", err) } } if !file { return nil } current = 0 for { files := make([]*File, 0, batchSize) if r := l.db.WithContext(ctx).Where("id > ?", current).Order("id ASC").Limit(batchSize).Find(&files); r.Error != nil { return fmt.Errorf("scan file fail, err= %w", r.Error) } if len(files) == 0 { break } current = files[len(files)-1].ID fileIDs := lo.Map( lo.Filter(files, func(f *File, _ int) bool { return fs.FileMode(f.Mode).IsRegular() }), func(f *File, _ int) int64 { return f.ID }, ) positions, err := l.MGetPositionByFileID(ctx, fileIDs...) if err != nil { return fmt.Errorf("mget position by file id fail, %w", err) } needDeleteFileIDs := make([]int64, 0) needDeletePositionIDs := make([]int64, 0) for _, fileID := range fileIDs { posis, has := positions[fileID] if !has || len(posis) == 0 { needDeleteFileIDs = append(needDeleteFileIDs, fileID) continue } if len(posis) == 1 { continue } sort.Slice(posis, func(i int, j int) bool { ii, jj := posis[i], posis[j] if ii.TapeID != jj.TapeID { return ii.TapeID < jj.TapeID } if ii.Path != jj.Path { return strings.ReplaceAll(ii.Path, "/", "\x00") < strings.ReplaceAll(jj.Path, "/", "\x00") } return ii.WriteTime.After(jj.WriteTime) }) for idx, posi := range posis { if idx == 0 { continue } if posis[idx-1].TapeID == posi.TapeID && posis[idx-1].Path == posi.Path { needDeletePositionIDs = append(needDeletePositionIDs, posi.ID) } } } if len(needDeleteFileIDs) > 0 { if r := l.db.WithContext(ctx).Where("id IN (?)", needDeleteFileIDs).Delete(ModelFile); r.Error != nil { return fmt.Errorf("delete files fail, err= %w", r.Error) } } if len(needDeletePositionIDs) > 0 { if r := l.db.WithContext(ctx).Where("id IN (?)", needDeletePositionIDs).Delete(ModelPosition); r.Error != nil { return fmt.Errorf("delete positions fail, err= %w", r.Error) } } } return nil } func listAll[T any](ctx context.Context, l *Library, items []T) ([]T, error) { v := new(T) id := reflect2.TypeOfPtr(*v).Elem().(reflect2.StructType).FieldByName("ID") var cursor int64 for { batch := make([]T, 0, batchSize) if r := l.db.WithContext(ctx).Where("id > ?", cursor).Order("id ASC").Limit(batchSize).Find(&batch); r.Error != nil { return nil, fmt.Errorf("list files fail, cursor= %d, %w", cursor, r.Error) } if len(batch) == 0 { return items, nil } c := id.Get(batch[len(batch)-1]).(*int64) cursor = *c items = append(items, batch...) } }