From 411469b90c7752c237b58ab9eb08be1545452c55 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Tue, 7 Jan 2025 16:01:24 +0800 Subject: [PATCH] update du/dd progress on completion Signed-off-by: Lyndon-Li --- changelogs/unreleased/8608-Lyndon-Li | 1 + pkg/datapath/file_system.go | 8 +-- pkg/datapath/micro_service_watcher.go | 17 ++++++- pkg/datapath/types.go | 4 +- pkg/uploader/provider/kopia.go | 20 ++++---- pkg/uploader/provider/kopia_test.go | 4 +- pkg/uploader/provider/mocks/Provider.go | 66 ++++++++++++++++++------- pkg/uploader/provider/provider.go | 4 +- pkg/uploader/provider/restic.go | 30 +++++------ pkg/uploader/provider/restic_test.go | 8 +-- 10 files changed, 104 insertions(+), 58 deletions(-) create mode 100644 changelogs/unreleased/8608-Lyndon-Li diff --git a/changelogs/unreleased/8608-Lyndon-Li b/changelogs/unreleased/8608-Lyndon-Li new file mode 100644 index 000000000..56b3b9495 --- /dev/null +++ b/changelogs/unreleased/8608-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8497, update du/dd progress on completion \ No newline at end of file diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 5d3b54f28..277012e9a 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -181,7 +181,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin fs.wgDataPath.Done() }() - snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, + snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs) if err == provider.ErrorCanceled { @@ -193,7 +193,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin } fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr) } else { - fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) + fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}}) } }() @@ -215,7 +215,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo fs.wgDataPath.Done() }() - err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) + totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) @@ -226,7 +226,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo } fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr) } else { - fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) + fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}}) } }() diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index 8d0927538..02c99299c 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -320,7 +320,9 @@ func (ms *microServiceBRWatcher) startWatch() { logger.Info("Calling callback on data path pod termination") if lastPod.Status.Phase == v1.PodSucceeded { - ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)) + result := funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log) + ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, getCompletionProgressFromResult(ms.taskType, result)) + ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, result) } else { if strings.HasSuffix(terminateMessage, ErrCancelled) { ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName) @@ -390,6 +392,19 @@ func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader return progress } +func getCompletionProgressFromResult(taskType string, result Result) *uploader.Progress { + progress := &uploader.Progress{} + if taskType == TaskTypeBackup { + progress.BytesDone = result.Backup.TotalBytes + progress.TotalBytes = result.Backup.TotalBytes + } else { + progress.BytesDone = result.Restore.TotalBytes + progress.TotalBytes = result.Restore.TotalBytes + } + + return progress +} + func (ms *microServiceBRWatcher) Cancel() { ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled") } diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index a2fac3ed5..f4315b447 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -33,11 +33,13 @@ type BackupResult struct { SnapshotID string `json:"snapshotID"` EmptySnapshot bool `json:"emptySnapshot"` Source AccessPoint `json:"source,omitempty"` + TotalBytes int64 `json:"totalBytes,omitempty"` } // RestoreResult represents the result of a restore type RestoreResult struct { - Target AccessPoint `json:"target,omitempty"` + Target AccessPoint `json:"target,omitempty"` + TotalBytes int64 `json:"totalBytes,omitempty"` } // Callbacks defines the collection of callbacks during backup/restore diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index eafac59dc..d8c55fa56 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -120,13 +120,13 @@ func (kp *kopiaProvider) RunBackup( parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, - updater uploader.ProgressUpdater) (string, bool, error) { + updater uploader.ProgressUpdater) (string, bool, int64, error) { if updater == nil { - return "", false, errors.New("Need to initial backup progress updater first") + return "", false, 0, errors.New("Need to initial backup progress updater first") } if path == "" { - return "", false, errors.New("path is empty") + return "", false, 0, errors.New("path is empty") } log := kp.log.WithFields(logrus.Fields{ @@ -175,9 +175,9 @@ func (kp *kopiaProvider) RunBackup( if kpUploader.IsCanceled() { log.Warn("Kopia backup is canceled") - return snapshotID, false, ErrorCanceled + return snapshotID, false, 0, ErrorCanceled } - return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup") + return snapshotID, false, 0, errors.Wrapf(err, "Failed to run kopia backup") } // which ensure that the statistic data of TotalBytes equal to BytesDone when finished @@ -189,7 +189,7 @@ func (kp *kopiaProvider) RunBackup( ) log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size) - return snapshotInfo.ID, false, nil + return snapshotInfo.ID, false, snapshotInfo.Size, nil } func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) { @@ -211,7 +211,7 @@ func (kp *kopiaProvider) RunRestore( volumePath string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, - updater uploader.ProgressUpdater) error { + updater uploader.ProgressUpdater) (int64, error) { log := kp.log.WithFields(logrus.Fields{ "snapshotID": snapshotID, "volumePath": volumePath, @@ -235,12 +235,12 @@ func (kp *kopiaProvider) RunRestore( size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel) if err != nil { - return errors.Wrapf(err, "Failed to run kopia restore") + return 0, errors.Wrapf(err, "Failed to run kopia restore") } if atomic.LoadInt32(&kp.canceling) == 1 { log.Error("Kopia restore is canceled") - return ErrorCanceled + return 0, ErrorCanceled } // which ensure that the statistic data of TotalBytes equal to BytesDone when finished @@ -253,5 +253,5 @@ func (kp *kopiaProvider) RunRestore( log.Info(output) - return nil + return size, nil } diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 7a7d5d271..8391d1c7d 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -106,7 +106,7 @@ func TestRunBackup(t *testing.T) { tc.volMode = uploader.PersistentVolumeFilesystem } BackupFunc = tc.hookBackupFunc - _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater) + _, _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater) if tc.notError { assert.NoError(t, err) } else { @@ -157,7 +157,7 @@ func TestRunRestore(t *testing.T) { tc.volMode = uploader.PersistentVolumeFilesystem } RestoreFunc = tc.hookRestoreFunc - err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater) + _, err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater) if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/mocks/Provider.go b/pkg/uploader/provider/mocks/Provider.go index 7651431b2..f09472d56 100644 --- a/pkg/uploader/provider/mocks/Provider.go +++ b/pkg/uploader/provider/mocks/Provider.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -19,6 +19,10 @@ type Provider struct { func (_m *Provider) Close(ctx context.Context) error { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -30,13 +34,18 @@ func (_m *Provider) Close(ctx context.Context) error { } // RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater -func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, updater uploader.ProgressUpdater) (string, bool, error) { +func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, updater uploader.ProgressUpdater) (string, bool, int64, error) { ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater) + if len(ret) == 0 { + panic("no return value specified for RunBackup") + } + var r0 string var r1 bool - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (string, bool, error)); ok { + var r2 int64 + var r3 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (string, bool, int64, error)); ok { return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater) } if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) string); ok { @@ -51,36 +60,55 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin r1 = ret.Get(1).(bool) } - if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok { + if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) int64); ok { r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater) } else { - r2 = ret.Error(2) + r2 = ret.Get(2).(int64) } - return r0, r1, r2 + if rf, ok := ret.Get(3).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok { + r3 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 } // RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, volMode, uploaderConfig, updater -func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, uploaderConfig map[string]string, updater uploader.ProgressUpdater) error { +func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, uploaderConfig map[string]string, updater uploader.ProgressUpdater) (int64, error) { ret := _m.Called(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok { - r0 = rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater) - } else { - r0 = ret.Error(0) + if len(ret) == 0 { + panic("no return value specified for RunRestore") } - return r0 -} + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (int64, error)); ok { + return rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) int64); ok { + r0 = rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater) + } else { + r0 = ret.Get(0).(int64) + } -type mockConstructorTestingTNewProvider interface { - mock.TestingT - Cleanup(func()) + if rf, ok := ret.Get(1).(func(context.Context, string, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok { + r1 = rf(ctx, snapshotID, volumePath, volMode, uploaderConfig, updater) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewProvider(t mockConstructorTestingTNewProvider) *Provider { +// The first argument is typically a *testing.T value. +func NewProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *Provider { mock := &Provider{} mock.Mock.Test(t) diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index 20a3dc436..2ce3f5755 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -50,7 +50,7 @@ type Provider interface { parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, - updater uploader.ProgressUpdater) (string, bool, error) + updater uploader.ProgressUpdater) (string, bool, int64, error) // RunRestore which will do restore for one specific volume with given snapshot id and return error // updater is used for updating backup progress which implement by third-party RunRestore( @@ -59,7 +59,7 @@ type Provider interface { volumePath string, volMode uploader.PersistentVolumeMode, uploaderConfig map[string]string, - updater uploader.ProgressUpdater) error + updater uploader.ProgressUpdater) (int64, error) // Close which will close related repository Close(ctx context.Context) error } diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go index 5878461f4..92eef84a1 100644 --- a/pkg/uploader/provider/restic.go +++ b/pkg/uploader/provider/restic.go @@ -124,21 +124,21 @@ func (rp *resticProvider) RunBackup( parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, - updater uploader.ProgressUpdater) (string, bool, error) { + updater uploader.ProgressUpdater) (string, bool, int64, error) { if updater == nil { - return "", false, errors.New("Need to initial backup progress updater first") + return "", false, 0, errors.New("Need to initial backup progress updater first") } if path == "" { - return "", false, errors.New("path is empty") + return "", false, 0, errors.New("path is empty") } if realSource != "" { - return "", false, errors.New("real source is not empty, this is not supported by restic uploader") + return "", false, 0, errors.New("real source is not empty, this is not supported by restic uploader") } if volMode == uploader.PersistentVolumeBlock { - return "", false, errors.New("unable to support block mode") + return "", false, 0, errors.New("unable to support block mode") } log := rp.log.WithFields(logrus.Fields{ @@ -149,7 +149,7 @@ func (rp *resticProvider) RunBackup( if len(uploaderCfg) > 0 { parallelFilesUpload, err := uploaderutil.GetParallelFilesUpload(uploaderCfg) if err != nil { - return "", false, errors.Wrap(err, "failed to get uploader config") + return "", false, 0, errors.Wrap(err, "failed to get uploader config") } if parallelFilesUpload > 0 { log.Warnf("ParallelFilesUpload is set to %d, but restic does not support parallel file uploads. Ignoring.", parallelFilesUpload) @@ -171,9 +171,9 @@ func (rp *resticProvider) RunBackup( if err != nil { if strings.Contains(stderrBuf, "snapshot is empty") { log.Debugf("Restic backup got empty dir with %s path", path) - return "", true, nil + return "", true, 0, nil } - return "", false, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf)) + return "", false, 0, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf)) } // GetSnapshotID snapshotIDCmd := resticGetSnapshotFunc(rp.repoIdentifier, rp.credentialsFile, tags) @@ -184,10 +184,10 @@ func (rp *resticProvider) RunBackup( } snapshotID, err := resticGetSnapshotIDFunc(snapshotIDCmd) if err != nil { - return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err)) + return "", false, 0, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err)) } log.Infof("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf) - return snapshotID, false, nil + return snapshotID, false, 0, nil } // RunRestore runs a `restore` command and monitors the volume size to @@ -198,9 +198,9 @@ func (rp *resticProvider) RunRestore( volumePath string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, - updater uploader.ProgressUpdater) error { + updater uploader.ProgressUpdater) (int64, error) { if updater == nil { - return errors.New("Need to initial backup progress updater first") + return 0, errors.New("Need to initial backup progress updater first") } log := rp.log.WithFields(logrus.Fields{ "snapshotID": snapshotID, @@ -208,7 +208,7 @@ func (rp *resticProvider) RunRestore( }) if volMode == uploader.PersistentVolumeBlock { - return errors.New("unable to support block mode") + return 0, errors.New("unable to support block mode") } restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath) @@ -220,7 +220,7 @@ func (rp *resticProvider) RunRestore( extraFlags, err := rp.parseRestoreExtraFlags(uploaderCfg) if err != nil { - return errors.Wrap(err, "failed to parse uploader config") + return 0, errors.Wrap(err, "failed to parse uploader config") } else if len(extraFlags) != 0 { restoreCmd.ExtraFlags = append(restoreCmd.ExtraFlags, extraFlags...) } @@ -228,7 +228,7 @@ func (rp *resticProvider) RunRestore( stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater) log.Infof("Run command=%v, stdout=%s, stderr=%s", restoreCmd, stdout, stderr) - return err + return 0, err } func (rp *resticProvider) parseRestoreExtraFlags(uploaderCfg map[string]string) ([]string, error) { diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go index 2ef72b134..92abeabcc 100644 --- a/pkg/uploader/provider/restic_test.go +++ b/pkg/uploader/provider/restic_test.go @@ -150,9 +150,9 @@ func TestResticRunBackup(t *testing.T) { } if !tc.nilUpdater { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()} - _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, &updater) + _, _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, &updater) } else { - _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, nil) + _, _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, nil) } tc.rp.log.Infof("test name %v error %v", tc.name, err) @@ -223,9 +223,9 @@ func TestResticRunRestore(t *testing.T) { var err error if !tc.nilUpdater { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()} - err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, &updater) + _, err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, &updater) } else { - err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, nil) + _, err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, map[string]string{}, nil) } tc.rp.log.Infof("test name %v error %v", tc.name, err)