new data path for data mover ms

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2024-06-06 15:52:28 +08:00
parent a8d77eae95
commit 20676c1ae7
16 changed files with 205 additions and 139 deletions

View File

@@ -33,6 +33,26 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
// FSBRInitParam define the input param for FSBR init
type FSBRInitParam struct {
BSLName string
SourceNamespace string
UploaderType string
RepositoryType string
RepoIdentifier string
RepositoryEnsurer *repository.Ensurer
CredentialGetter *credentials.CredentialGetter
Filesystem filesystem.Interface
}
// FSBRStartParam define the input param for FSBR start
type FSBRStartParam struct {
RealSource string
ParentSnapshot string
ForceFull bool
Tags map[string]string
}
type fileSystemBR struct {
ctx context.Context
cancel context.CancelFunc
@@ -61,8 +81,9 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client,
return fs
}
func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string,
repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error {
initParam := param.(*FSBRInitParam)
var err error
defer func() {
if err != nil {
@@ -75,27 +96,27 @@ func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespac
backupLocation := &velerov1api.BackupStorageLocation{}
if err = fs.client.Get(ctx, client.ObjectKey{
Namespace: fs.namespace,
Name: bslName,
Name: initParam.BSLName,
}, backupLocation); err != nil {
return errors.Wrapf(err, "error getting backup storage location %s", bslName)
return errors.Wrapf(err, "error getting backup storage location %s", initParam.BSLName)
}
fs.backupLocation = backupLocation
fs.backupRepo, err = repositoryEnsurer.EnsureRepo(ctx, fs.namespace, sourceNamespace, bslName, repositoryType)
fs.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, fs.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType)
if err != nil {
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", bslName, sourceNamespace, repositoryType)
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)
}
err = fs.boostRepoConnect(ctx, repositoryType, credentialGetter)
err = fs.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter)
if err != nil {
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType)
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)
}
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, repoIdentifier,
fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log)
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, initParam.UploaderType, fs.requestorType, initParam.RepoIdentifier,
fs.backupLocation, fs.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), fs.log)
if err != nil {
return errors.Wrapf(err, "error creating uploader %s", uploaderType)
return errors.Wrapf(err, "error creating uploader %s", initParam.UploaderType)
}
fs.initialized = true
@@ -103,10 +124,10 @@ func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespac
fs.log.WithFields(
logrus.Fields{
"jobName": fs.jobName,
"bsl": bslName,
"source namespace": sourceNamespace,
"uploader": uploaderType,
"repository": repositoryType,
"bsl": initParam.BSLName,
"source namespace": initParam.SourceNamespace,
"uploader": initParam.UploaderType,
"repository": initParam.RepositoryType,
}).Info("FileSystemBR is initialized")
return nil
@@ -129,14 +150,16 @@ func (fs *fileSystemBR) Close(ctx context.Context) {
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}
func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfig map[string]string) error {
func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
backupParam := param.(*FSBRStartParam)
go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, uploaderConfig, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@@ -192,7 +215,7 @@ func (fs *fileSystemBR) Cancel() {
func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error {
if repositoryType == velerov1api.BackupRepositoryTypeKopia {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.client, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
return err
}
} else {