add shared generic data path 02

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2023-05-17 12:13:23 +08:00
parent 5f008d18fa
commit 9ab85892a7
5 changed files with 25 additions and 24 deletions

View File

@@ -159,7 +159,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
log.WithField("path", path.ByPath).Debugf("Found host path")
if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType,
podvolume.GetPvbRepositoryType(&pvb), r.repositoryEnsurer, r.credentialGetter); err != nil {
podvolume.GetPvbRepositoryType(&pvb), pvb.Spec.RepoIdentifier, r.repositoryEnsurer, r.credentialGetter); err != nil {
return r.errorOut(ctx, &pvb, err, "error to initialize data path", log)
}
@@ -233,8 +233,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp
log.WithError(err).Error("Async fs backup data path failed")
var pvb velerov1api.PodVolumeBackup
if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil {
log.WithError(err).Warn("Failed to get PVB on failure")
if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVB on failure")
} else {
_, _ = r.errorOut(ctx, &pvb, err, "data path backup failed", log)
}
@@ -248,10 +248,10 @@ func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, nam
log.Warn("Async fs backup data path canceled")
var pvb velerov1api.PodVolumeBackup
if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil {
log.WithError(err).Warn("Failed to get PVB on cancel")
if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVB on cancel")
} else {
_, _ = r.errorOut(ctx, &pvb, err, "data path backup canceled", log)
_, _ = r.errorOut(ctx, &pvb, errors.New("PVB is canceled"), "data path backup canceled", log)
}
}

View File

@@ -99,7 +99,7 @@ type fakeFSBR struct {
clock clock.WithTickerAndDelayedExecution
}
func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
return nil
}

View File

@@ -143,7 +143,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
log.WithField("path", volumePath.ByPath).Debugf("Found host path")
if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.Pod.Namespace, pvr.Spec.UploaderType,
podvolume.GetPvrRepositoryType(pvr), c.repositoryEnsurer, c.credentialGetter); err != nil {
podvolume.GetPvrRepositoryType(pvr), pvr.Spec.RepoIdentifier, c.repositoryEnsurer, c.credentialGetter); err != nil {
return c.errorOut(ctx, pvr, err, "error to initialize data path", log)
}
@@ -326,8 +326,8 @@ func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, names
log.WithError(err).Info("Async fs restore data path failed")
var pvr velerov1api.PodVolumeRestore
if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil {
log.WithError(err).Warn("Failed to get PVR on failure")
if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVR on failure")
} else {
_, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log)
}
@@ -341,10 +341,10 @@ func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, na
log.Info("Async fs restore data path canceled")
var pvr velerov1api.PodVolumeRestore
if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil {
log.WithError(err).Warn("Failed to get PVR on cancel")
if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil {
log.WithError(getErr).Warn("Failed to get PVR on cancel")
} else {
_, _ = c.errorOut(ctx, &pvr, err, "data path restore canceled", log)
_, _ = c.errorOut(ctx, &pvr, errors.New("PVR is canceled"), "data path restore canceled", log)
}
}

View File

@@ -30,6 +30,7 @@ import (
repoProvider "github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
type fileSystemBR struct {
@@ -61,7 +62,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client,
}
func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string,
repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
var err error
defer func() {
if err != nil {
@@ -91,7 +92,7 @@ func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespac
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType)
}
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, "",
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, repoIdentifier,
fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log)
if err != nil {
return errors.Wrapf(err, "error creating uploader %s", uploaderType)
@@ -137,11 +138,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, f
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName)
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err)
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
} else {
fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
}
}()
@@ -157,11 +158,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName)
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err)
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
} else {
fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
}
}()
@@ -171,7 +172,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {
fs.callbacks.OnProgress(fs.ctx, fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone})
fs.callbacks.OnProgress(context.Background(), fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone})
}
}
@@ -186,7 +187,7 @@ func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType str
return err
}
} else {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
if err := repoProvider.NewResticRepositoryProvider(credentialGetter.FromFile, filesystem.NewFileSystem(), fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
return err
}
}

View File

@@ -58,7 +58,7 @@ type AccessPoint struct {
// AsyncBR is the interface for asynchronous data path methods
type AsyncBR interface {
// Init initializes an asynchronous data path instance
Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error
Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error
// StartBackup starts an asynchronous data path instance for backup
StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error