From 9ab85892a7bddd821d1cb17c2cf74e506bcc5366 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 17 May 2023 12:13:23 +0800 Subject: [PATCH] add shared generic data path 02 Signed-off-by: Lyndon-Li --- .../pod_volume_backup_controller.go | 12 +++++------ .../pod_volume_backup_controller_test.go | 2 +- .../pod_volume_restore_controller.go | 12 +++++------ pkg/datapath/file_system.go | 21 ++++++++++--------- pkg/datapath/types.go | 2 +- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 9b4a4cb8c..71fb4850f 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -159,7 +159,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ log.WithField("path", path.ByPath).Debugf("Found host path") if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType, - podvolume.GetPvbRepositoryType(&pvb), r.repositoryEnsurer, r.credentialGetter); err != nil { + podvolume.GetPvbRepositoryType(&pvb), pvb.Spec.RepoIdentifier, r.repositoryEnsurer, r.credentialGetter); err != nil { return r.errorOut(ctx, &pvb, err, "error to initialize data path", log) } @@ -233,8 +233,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp log.WithError(err).Error("Async fs backup data path failed") var pvb velerov1api.PodVolumeBackup - if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { - log.WithError(err).Warn("Failed to get PVB on failure") + if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVB on failure") } else { _, _ = r.errorOut(ctx, &pvb, err, "data path backup failed", log) } @@ -248,10 +248,10 @@ func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, nam log.Warn("Async fs backup data path canceled") var pvb velerov1api.PodVolumeBackup - if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { - log.WithError(err).Warn("Failed to get PVB on cancel") + if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVB on cancel") } else { - _, _ = r.errorOut(ctx, &pvb, err, "data path backup canceled", log) + _, _ = r.errorOut(ctx, &pvb, errors.New("PVB is canceled"), "data path backup canceled", log) } } diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index ede8aa9d1..1f1e9e543 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -99,7 +99,7 @@ type fakeFSBR struct { clock clock.WithTickerAndDelayedExecution } -func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { +func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { return nil } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 3e75422d3..a2db1ba49 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -143,7 +143,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req log.WithField("path", volumePath.ByPath).Debugf("Found host path") if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.Pod.Namespace, pvr.Spec.UploaderType, - podvolume.GetPvrRepositoryType(pvr), c.repositoryEnsurer, c.credentialGetter); err != nil { + podvolume.GetPvrRepositoryType(pvr), pvr.Spec.RepoIdentifier, c.repositoryEnsurer, c.credentialGetter); err != nil { return c.errorOut(ctx, pvr, err, "error to initialize data path", log) } @@ -326,8 +326,8 @@ func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, names log.WithError(err).Info("Async fs restore data path failed") var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVR on failure") + if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVR on failure") } else { _, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log) } @@ -341,10 +341,10 @@ func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, na log.Info("Async fs restore data path canceled") var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVR on cancel") + if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVR on cancel") } else { - _, _ = c.errorOut(ctx, &pvr, err, "data path restore canceled", log) + _, _ = c.errorOut(ctx, &pvr, errors.New("PVR is canceled"), "data path restore canceled", log) } } diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 6d7d915bb..ce0e8dc0a 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -30,6 +30,7 @@ import ( repoProvider "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/provider" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) type fileSystemBR struct { @@ -61,7 +62,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client, } func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, - repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { var err error defer func() { if err != nil { @@ -91,7 +92,7 @@ func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespac return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType) } - fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, "", + fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, repoIdentifier, fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log) if err != nil { return errors.Wrapf(err, "error creating uploader %s", uploaderType) @@ -137,11 +138,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, f snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs) if err == provider.ErrorCanceled { - fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName) + fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { - fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err) + fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err) } else { - fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) + fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) } }() @@ -157,11 +158,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs) if err == provider.ErrorCanceled { - fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName) + fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { - fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err) + fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err) } else { - fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) + fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) } }() @@ -171,7 +172,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro // UpdateProgress which implement ProgressUpdater interface to update progress status func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) { if fs.callbacks.OnProgress != nil { - fs.callbacks.OnProgress(fs.ctx, fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}) + fs.callbacks.OnProgress(context.Background(), fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}) } } @@ -186,7 +187,7 @@ func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType str return err } } else { - if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { + if err := repoProvider.NewResticRepositoryProvider(credentialGetter.FromFile, filesystem.NewFileSystem(), fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { return err } } diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index 97926ca15..0b1e47c12 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -58,7 +58,7 @@ type AccessPoint struct { // AsyncBR is the interface for asynchronous data path methods type AsyncBR interface { // Init initializes an asynchronous data path instance - Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error + Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error // StartBackup starts an asynchronous data path instance for backup StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error