fix: Prevent moving headers from corrupting last indexed record & block

This commit is contained in:
Felicitas Pojtinger
2021-12-07 01:37:16 +01:00
parent aa3b558340
commit f26f899612
3 changed files with 78 additions and 43 deletions

View File

@@ -11,6 +11,8 @@ import (
"github.com/pojntfx/stfs/internal/keys"
"github.com/pojntfx/stfs/internal/pax"
"github.com/pojntfx/stfs/internal/persisters"
"github.com/pojntfx/stfs/pkg/config"
"github.com/pojntfx/stfs/pkg/recovery"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/volatiletech/sqlboiler/v4/boil"
@@ -97,6 +99,11 @@ func move(
return err
}
lastIndexedRecord, lastIndexedBlock, err := metadataPersister.GetLastIndexedRecordAndBlock(context.Background(), viper.GetInt(recordSizeFlag))
if err != nil {
return err
}
headersToMove := []*models.Header{}
dbhdr, err := metadataPersister.GetHeader(context.Background(), src)
if err != nil {
@@ -114,16 +121,12 @@ func move(
headersToMove = append(headersToMove, dbhdrs...)
}
// Move the headers in the index
if err := metadataPersister.MoveHeaders(context.Background(), headersToMove, src, dst); err != nil {
return nil
}
if err := formatting.PrintCSV(formatting.TARHeaderCSV); err != nil {
return err
}
// Append move headers to the tape or tar file
hdrs := []*tar.Header{}
for _, dbhdr := range headersToMove {
hdr, err := converters.DBHeaderToTarHeader(dbhdr)
if err != nil {
@@ -153,9 +156,49 @@ func move(
if err := formatting.PrintCSV(formatting.GetTARHeaderAsCSV(-1, -1, -1, -1, hdr)); err != nil {
return err
}
hdrs = append(hdrs, hdr)
}
return nil
return recovery.Index(
config.StateConfig{
Drive: viper.GetString(driveFlag),
Metadata: viper.GetString(metadataFlag),
},
config.PipeConfig{
Compression: viper.GetString(compressionFlag),
Encryption: viper.GetString(encryptionFlag),
Signature: viper.GetString(signatureFlag),
},
config.CryptoConfig{
Recipient: recipient,
Identity: identity,
Password: viper.GetString(passwordFlag),
},
viper.GetInt(recordSizeFlag),
int(lastIndexedRecord),
int(lastIndexedBlock),
false,
func(hdr *tar.Header, i int) error {
// Ignore the first header, which is the last header which we already indexed
if i == 0 {
return nil
}
if len(hdrs) <= i-1 {
return errMissingTarHeader
}
*hdr = *hdrs[i-1]
return nil
},
func(hdr *tar.Header, isRegular bool) error {
return nil // We sign above, no need to verify
},
)
}
func init() {

View File

@@ -66,43 +66,28 @@ func (p *MetadataPersister) UpdateHeaderMetadata(ctx context.Context, dbhdr *mod
return nil
}
func (p *MetadataPersister) moveHeader(ctx context.Context, tx boil.ContextExecutor, oldName string, newName string) error {
func (p *MetadataPersister) MoveHeader(ctx context.Context, oldName string, newName string, lastknownrecord, lastknownblock int64) error {
// We can't do this with `dbhdr.Update` because we are renaming the primary key
if _, err := queries.Raw(
fmt.Sprintf(
` update %v set %v = ? where %v = ?;`,
` update %v set %v = ?, %v = ?, %v = ? where %v = ?;`,
models.TableNames.Headers,
models.HeaderColumns.Name,
models.HeaderColumns.Lastknownrecord,
models.HeaderColumns.Lastknownblock,
models.HeaderColumns.Name,
),
newName,
lastknownrecord,
lastknownblock,
oldName,
).ExecContext(ctx, tx); err != nil {
).ExecContext(ctx, p.db); err != nil {
return err
}
return nil
}
func (p *MetadataPersister) MoveHeader(ctx context.Context, oldName string, newName string) error {
return p.moveHeader(ctx, p.db, oldName, newName)
}
func (p *MetadataPersister) MoveHeaders(ctx context.Context, hdrs models.HeaderSlice, oldName string, newName string) error {
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return err
}
for _, hdr := range hdrs {
if err := p.moveHeader(ctx, tx, hdr.Name, strings.TrimSuffix(newName, "/")+strings.TrimPrefix(hdr.Name, strings.TrimSuffix(oldName, "/"))); err != nil {
return err
}
}
return tx.Commit()
}
func (p *MetadataPersister) GetHeaders(ctx context.Context) (models.HeaderSlice, error) {
return models.Headers(
qm.Where(models.HeaderColumns.Deleted+" != 1"),

View File

@@ -4,6 +4,7 @@ import (
"archive/tar"
"bufio"
"context"
"database/sql"
"io"
"io/ioutil"
"math"
@@ -111,7 +112,6 @@ func Index(
if err != nil {
if err == io.EOF {
// EOF
break
}
@@ -137,7 +137,7 @@ func Index(
}
if err := indexHeader(record, block, hdr, metadataPersister, pipes.Compression, pipes.Encryption); err != nil {
return nil
return err
}
curr, err := f.Seek(0, io.SeekCurrent)
@@ -221,7 +221,7 @@ func Index(
}
if err := indexHeader(record, block, hdr, metadataPersister, pipes.Compression, pipes.Encryption); err != nil {
return nil
return err
}
curr = int64(counter.BytesRead)
@@ -326,28 +326,35 @@ func indexHeader(
}
newHdr = h
if err := metadataPersister.UpdateHeaderMetadata(context.Background(), newHdr); err != nil {
return err
}
} else {
// Metadata-only update; use the old record & block
oldHdr, err := metadataPersister.GetHeader(context.Background(), oldName)
if err != nil {
return err
if err == nil {
h, err := converters.TarHeaderToDBHeader(oldHdr.Record, record, oldHdr.Block, block, hdr)
if err != nil {
return err
}
newHdr = h
if err := metadataPersister.UpdateHeaderMetadata(context.Background(), newHdr); err != nil {
return err
}
}
h, err := converters.TarHeaderToDBHeader(oldHdr.Record, record, oldHdr.Block, block, hdr)
if err != nil {
// To support ignoring previous `Move` operations, we need to ignore non-existent headers here, as moving changes the primary keys
if err != nil && err != sql.ErrNoRows {
return err
}
newHdr = h
}
if err := metadataPersister.UpdateHeaderMetadata(context.Background(), newHdr); err != nil {
return err
}
if moveAfterEdits {
// Move header
if err := metadataPersister.MoveHeader(context.Background(), oldName, hdr.Name); err != nil {
// Move header (will be a no-op if the header has been moved before)
if err := metadataPersister.MoveHeader(context.Background(), oldName, hdr.Name, record, block); err != nil {
return err
}
}