Merge pull request #7955 from Lyndon-Li/data-mover-ms-new-data-path

New data path for data mover ms
This commit is contained in:
Wenkai Yin(尹文开)
2024-07-08 18:34:22 +08:00
committed by GitHub
16 changed files with 205 additions and 139 deletions

View File

@@ -334,10 +334,19 @@ func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRe
}
log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsRestore.Init(ctx, dd.Spec.BackupStorageLocation, dd.Spec.SourceNamespace, datamover.GetUploaderType(dd.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repositoryEnsurer, r.credentialGetter); err != nil {
if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: dd.Spec.BackupStorageLocation,
SourceNamespace: dd.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(dd.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize data path", log)
}
log.WithField("path", path.ByPath).Info("fs init")
if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path, dd.Spec.DataMoverConfig); err != nil {

View File

@@ -338,23 +338,38 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
}
log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsBackup.Init(ctx, du.Spec.BackupStorageLocation, du.Spec.SourceNamespace, datamover.GetUploaderType(du.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repoEnsurer, r.credentialGetter); err != nil {
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: du.Spec.BackupStorageLocation,
SourceNamespace: du.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(du.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, du, err, "error to initialize data path", log)
}
log.WithField("path", path.ByPath).Info("fs init")
tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}
if err := fsBackup.StartBackup(path, datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, tags, du.Spec.DataMoverConfig); err != nil {
if err := fsBackup.StartBackup(path, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
RealSource: datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}

View File

@@ -51,7 +51,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
@@ -297,11 +296,11 @@ type fakeDataUploadFSBR struct {
clock clock.WithTickerAndDelayedExecution
}
func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error {
return nil
}
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs map[string]string) error {
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
du := f.du
original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted

View File

@@ -159,8 +159,15 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
log.WithField("path", path.ByPath).Debugf("Found host path")
if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType,
podvolume.GetPvbRepositoryType(&pvb), pvb.Spec.RepoIdentifier, r.repositoryEnsurer, r.credentialGetter); err != nil {
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvb.Spec.BackupStorageLocation,
SourceNamespace: pvb.Spec.Pod.Namespace,
UploaderType: pvb.Spec.UploaderType,
RepositoryType: podvolume.GetPvbRepositoryType(&pvb),
RepoIdentifier: pvb.Spec.RepoIdentifier,
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, &pvb, err, "error to initialize data path", log)
}
@@ -179,7 +186,12 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}
if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags, pvb.Spec.UploaderSettings); err != nil {
if err := fsBackup.StartBackup(path, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
RealSource: "",
ParentSnapshot: parentSnapshotID,
ForceFull: false,
Tags: pvb.Spec.Tags,
}); err != nil {
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log)
}

View File

@@ -41,7 +41,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
@@ -99,11 +98,11 @@ type fakeFSBR struct {
clock clock.WithTickerAndDelayedExecution
}
func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (b *fakeFSBR) Init(ctx context.Context, param interface{}) error {
return nil
}
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs map[string]string) error {
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
pvb := b.pvb
original := b.pvb.DeepCopy()

View File

@@ -141,8 +141,15 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
log.WithField("path", volumePath.ByPath).Debugf("Found host path")
if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.SourceNamespace, pvr.Spec.UploaderType,
podvolume.GetPvrRepositoryType(pvr), pvr.Spec.RepoIdentifier, c.repositoryEnsurer, c.credentialGetter); err != nil {
if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvr.Spec.BackupStorageLocation,
SourceNamespace: pvr.Spec.SourceNamespace,
UploaderType: pvr.Spec.UploaderType,
RepositoryType: podvolume.GetPvrRepositoryType(pvr),
RepoIdentifier: pvr.Spec.RepoIdentifier,
RepositoryEnsurer: c.repositoryEnsurer,
CredentialGetter: c.credentialGetter,
}); err != nil {
return c.errorOut(ctx, pvr, err, "error to initialize data path", log)
}