flush volume after restore

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2026-01-26 11:34:37 +08:00
parent e2bbace03b
commit 1e6f02dc24
4 changed files with 43 additions and 11 deletions

View File

@@ -19,3 +19,7 @@ package kopia
import "github.com/pkg/errors"
var errFlushUnsupported = errors.New("flush is not supported")
type Flusher interface {
Flush() error
}

View File

@@ -43,5 +43,5 @@ func flushVolume(dirPath string) error {
}
})
return errors.Wrapf(err, "error syncing dir %v", dirPath)
return errors.Wrapf(err, "error syncing fs from %v", dirPath)
}

View File

@@ -53,6 +53,7 @@ var loadSnapshotFunc = snapshot.LoadSnapshot
var listSnapshotsFunc = snapshot.ListSnapshots
var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
var restoreEntryFunc = restore.Entry
var flushVolumeFunc = flushVolume
const UploaderConfigMultipartKey = "uploader-multipart"
const MaxErrorReported = 10
@@ -375,16 +376,12 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
return result, nil
}
type flusher interface {
Flush() error
}
type fileSystemRestoreOutput struct {
*restore.FilesystemOutput
}
func (o *fileSystemRestoreOutput) Flush() error {
return flushVolume(o.TargetPath)
return flushVolumeFunc(o.TargetPath)
}
// Restore restore specific sourcePath with given snapshotID and update progress
@@ -446,22 +443,22 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
return 0, 0, errors.Wrap(err, "error to init output")
}
var output restore.Output = fsOutput
var fls flusher
var output restore.Output
var flusher Flusher
if volMode == uploader.PersistentVolumeBlock {
o := &BlockOutput{
FilesystemOutput: fsOutput,
}
output = o
fls = o
flusher = o
} else {
o := &fileSystemRestoreOutput{
FilesystemOutput: fsOutput,
}
output = o
fls = o
flusher = o
}
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
@@ -477,12 +474,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target")
}
if err := fls.Flush(); err != nil {
if err := flusher.Flush(); err != nil {
if err == errFlushUnsupported {
log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS)
} else {
return 0, 0, errors.Wrapf(err, "Failed to flush data to target")
}
} else {
log.Warnf("Flush done for volume dir %v", path)
}
return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil

View File

@@ -675,6 +675,7 @@ func TestRestore(t *testing.T) {
invalidManifestType bool
filesystemEntryFunc func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error)
restoreEntryFunc func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error)
flushVolumeFunc func(string) error
dest string
expectedBytes int64
expectedCount int32
@@ -757,6 +758,30 @@ func TestRestore(t *testing.T) {
volMode: uploader.PersistentVolumeBlock,
dest: "/tmp",
},
{
name: "Flush is not supported",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, nil
},
flushVolumeFunc: func(string) error { return errFlushUnsupported },
snapshotID: "snapshot-123",
expectedError: nil,
},
{
name: "Flush fails",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, nil
},
flushVolumeFunc: func(string) error { return errors.New("fake-flush-error") },
snapshotID: "snapshot-123",
expectedError: errors.New("fake-flush-error"),
},
}
em := &manifest.EntryMetadata{
@@ -784,6 +809,10 @@ func TestRestore(t *testing.T) {
restoreEntryFunc = tc.restoreEntryFunc
}
if tc.flushVolumeFunc != nil {
flushVolumeFunc = tc.flushVolumeFunc
}
repoWriterMock := &repomocks.RepositoryWriter{}
repoWriterMock.On("GetManifest", mock.Anything, mock.Anything, mock.Anything).Return(em, nil)
repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil)