diff --git a/pkg/uploader/kopia/flush.go b/pkg/uploader/kopia/flush.go index 8e6c81aa5..04c10c7f6 100644 --- a/pkg/uploader/kopia/flush.go +++ b/pkg/uploader/kopia/flush.go @@ -19,3 +19,7 @@ package kopia import "github.com/pkg/errors" var errFlushUnsupported = errors.New("flush is not supported") + +type Flusher interface { + Flush() error +} diff --git a/pkg/uploader/kopia/flush_volume_linux.go b/pkg/uploader/kopia/flush_volume_linux.go index 708d60f48..3f4901614 100644 --- a/pkg/uploader/kopia/flush_volume_linux.go +++ b/pkg/uploader/kopia/flush_volume_linux.go @@ -43,5 +43,5 @@ func flushVolume(dirPath string) error { } }) - return errors.Wrapf(err, "error syncing dir %v", dirPath) + return errors.Wrapf(err, "error syncing fs from %v", dirPath) } diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 679440635..ca05f7568 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -53,6 +53,7 @@ var loadSnapshotFunc = snapshot.LoadSnapshot var listSnapshotsFunc = snapshot.ListSnapshots var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath var restoreEntryFunc = restore.Entry +var flushVolumeFunc = flushVolume const UploaderConfigMultipartKey = "uploader-multipart" const MaxErrorReported = 10 @@ -375,16 +376,12 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour return result, nil } -type flusher interface { - Flush() error -} - type fileSystemRestoreOutput struct { *restore.FilesystemOutput } func (o *fileSystemRestoreOutput) Flush() error { - return flushVolume(o.TargetPath) + return flushVolumeFunc(o.TargetPath) } // Restore restore specific sourcePath with given snapshotID and update progress @@ -446,22 +443,22 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrap(err, "error to init output") } - var output restore.Output = fsOutput - var fls flusher + var output restore.Output + var flusher Flusher if volMode == uploader.PersistentVolumeBlock { o := &BlockOutput{ FilesystemOutput: fsOutput, } output = o - fls = o + flusher = o } else { o := &fileSystemRestoreOutput{ FilesystemOutput: fsOutput, } output = o - fls = o + flusher = o } stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ @@ -477,12 +474,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target") } - if err := fls.Flush(); err != nil { + if err := flusher.Flush(); err != nil { if err == errFlushUnsupported { log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS) } else { return 0, 0, errors.Wrapf(err, "Failed to flush data to target") } + } else { + log.Warnf("Flush done for volume dir %v", path) } return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 2423ba590..984b92af5 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -675,6 +675,7 @@ func TestRestore(t *testing.T) { invalidManifestType bool filesystemEntryFunc func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) restoreEntryFunc func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) + flushVolumeFunc func(string) error dest string expectedBytes int64 expectedCount int32 @@ -757,6 +758,30 @@ func TestRestore(t *testing.T) { volMode: uploader.PersistentVolumeBlock, dest: "/tmp", }, + { + name: "Flush is not supported", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + return restore.Stats{}, nil + }, + flushVolumeFunc: func(string) error { return errFlushUnsupported }, + snapshotID: "snapshot-123", + expectedError: nil, + }, + { + name: "Flush fails", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + return restore.Stats{}, nil + }, + flushVolumeFunc: func(string) error { return errors.New("fake-flush-error") }, + snapshotID: "snapshot-123", + expectedError: errors.New("fake-flush-error"), + }, } em := &manifest.EntryMetadata{ @@ -784,6 +809,10 @@ func TestRestore(t *testing.T) { restoreEntryFunc = tc.restoreEntryFunc } + if tc.flushVolumeFunc != nil { + flushVolumeFunc = tc.flushVolumeFunc + } + repoWriterMock := &repomocks.RepositoryWriter{} repoWriterMock.On("GetManifest", mock.Anything, mock.Anything, mock.Anything).Return(em, nil) repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil)