mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-06-10 00:03:10 +00:00
Merge remote-tracking branch 'origin/main' into copilot/replace-vmware-tanzu-velero
Co-authored-by: kaovilai <11228024+kaovilai@users.noreply.github.com>
This commit is contained in:
1
changelogs/unreleased/9819-Lyndon-Li
Normal file
1
changelogs/unreleased/9819-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
Data path naming adjustment for block data mover
|
||||
@@ -170,14 +170,14 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
|
||||
OnProgress: r.OnDataUploadProgress,
|
||||
}
|
||||
|
||||
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
|
||||
dp, err := r.dataPathMgr.CreateGenericDataPath(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to create data path")
|
||||
}
|
||||
|
||||
log.Debug("Async fs br created")
|
||||
|
||||
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
|
||||
if err := dp.Init(ctx, &datapath.InitParam{
|
||||
BSLName: du.Spec.BackupStorageLocation,
|
||||
SourceNamespace: du.Spec.SourceNamespace,
|
||||
UploaderType: GetUploaderType(du.Spec.DataMover),
|
||||
@@ -195,7 +195,7 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
|
||||
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
|
||||
}
|
||||
|
||||
if err := fsBackup.StartBackup(r.sourceTargetPath, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
|
||||
if err := dp.StartBackup(r.sourceTargetPath, du.Spec.DataMoverConfig, &datapath.BackupStartParam{
|
||||
RealSource: GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
|
||||
ParentSnapshot: "",
|
||||
ForceFull: false,
|
||||
|
||||
@@ -403,7 +403,7 @@ func TestRunCancelableDataPath(t *testing.T) {
|
||||
bs.dataPathMgr = test.dataPathMgr
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
datapath.VGDPCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
fsBR := datapathmockes.NewAsyncBR(t)
|
||||
if test.initErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||
|
||||
@@ -159,14 +159,14 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
|
||||
OnProgress: r.OnDataDownloadProgress,
|
||||
}
|
||||
|
||||
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
|
||||
dp, err := r.dataPathMgr.CreateGenericDataPath(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to create data path")
|
||||
}
|
||||
|
||||
log.Debug("Found volume path")
|
||||
if err := fsRestore.Init(ctx,
|
||||
&datapath.FSBRInitParam{
|
||||
if err := dp.Init(ctx,
|
||||
&datapath.InitParam{
|
||||
BSLName: dd.Spec.BackupStorageLocation,
|
||||
SourceNamespace: dd.Spec.SourceNamespace,
|
||||
UploaderType: GetUploaderType(dd.Spec.DataMover),
|
||||
@@ -180,7 +180,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
|
||||
}
|
||||
log.Info("fs init")
|
||||
|
||||
if err := fsRestore.StartRestore(dd.Spec.SnapshotID, r.sourceTargetPath, dd.Spec.DataMoverConfig); err != nil {
|
||||
if err := dp.StartRestore(dd.Spec.SnapshotID, r.sourceTargetPath, dd.Spec.DataMoverConfig); err != nil {
|
||||
return "", errors.Wrap(err, "error starting data path restore")
|
||||
}
|
||||
|
||||
|
||||
@@ -347,7 +347,7 @@ func TestRunCancelableRestore(t *testing.T) {
|
||||
rs.dataPathMgr = test.dataPathMgr
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
datapath.VGDPCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
fsBR := datapathmockes.NewAsyncBR(t)
|
||||
if test.initErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||
|
||||
@@ -34,8 +34,8 @@ import (
|
||||
"github.com/velero-io/velero/pkg/util/filesystem"
|
||||
)
|
||||
|
||||
// FSBRInitParam define the input param for FSBR init
|
||||
type FSBRInitParam struct {
|
||||
// InitParam define the input param for data path init
|
||||
type InitParam struct {
|
||||
BSLName string
|
||||
SourceNamespace string
|
||||
UploaderType string
|
||||
@@ -47,15 +47,15 @@ type FSBRInitParam struct {
|
||||
CacheDir string
|
||||
}
|
||||
|
||||
// FSBRStartParam define the input param for FSBR start
|
||||
type FSBRStartParam struct {
|
||||
// BackupStartParam define the input param for backup start
|
||||
type BackupStartParam struct {
|
||||
RealSource string
|
||||
ParentSnapshot string
|
||||
ForceFull bool
|
||||
Tags map[string]string
|
||||
}
|
||||
|
||||
type fileSystemBR struct {
|
||||
type generalDataPath struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backupRepo *velerov1api.BackupRepository
|
||||
@@ -72,8 +72,8 @@ type fileSystemBR struct {
|
||||
dataPathLock sync.Mutex
|
||||
}
|
||||
|
||||
func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR {
|
||||
fs := &fileSystemBR{
|
||||
func newGeneralDataPath(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR {
|
||||
dp := &generalDataPath{
|
||||
jobName: jobName,
|
||||
requestorType: requestorType,
|
||||
client: client,
|
||||
@@ -83,151 +83,151 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client,
|
||||
log: log,
|
||||
}
|
||||
|
||||
return fs
|
||||
return dp
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) Init(ctx context.Context, param any) error {
|
||||
initParam := param.(*FSBRInitParam)
|
||||
func (dp *generalDataPath) Init(ctx context.Context, param any) error {
|
||||
initParam := param.(*InitParam)
|
||||
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
fs.Close(ctx)
|
||||
dp.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
fs.ctx, fs.cancel = context.WithCancel(ctx)
|
||||
dp.ctx, dp.cancel = context.WithCancel(ctx)
|
||||
|
||||
backupLocation := &velerov1api.BackupStorageLocation{}
|
||||
if err = fs.client.Get(ctx, client.ObjectKey{
|
||||
Namespace: fs.namespace,
|
||||
if err = dp.client.Get(ctx, client.ObjectKey{
|
||||
Namespace: dp.namespace,
|
||||
Name: initParam.BSLName,
|
||||
}, backupLocation); err != nil {
|
||||
return errors.Wrapf(err, "error getting backup storage location %s", initParam.BSLName)
|
||||
}
|
||||
|
||||
fs.backupLocation = backupLocation
|
||||
dp.backupLocation = backupLocation
|
||||
|
||||
fs.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, fs.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType)
|
||||
dp.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, dp.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)
|
||||
}
|
||||
|
||||
err = fs.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter, initParam.CacheDir)
|
||||
err = dp.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter, initParam.CacheDir)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)
|
||||
}
|
||||
|
||||
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, initParam.UploaderType, fs.requestorType, initParam.RepoIdentifier,
|
||||
fs.backupLocation, fs.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), fs.log)
|
||||
dp.uploaderProv, err = provider.NewUploaderProvider(ctx, dp.client, initParam.UploaderType, dp.requestorType, initParam.RepoIdentifier,
|
||||
dp.backupLocation, dp.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), dp.log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error creating uploader %s", initParam.UploaderType)
|
||||
}
|
||||
|
||||
fs.initialized = true
|
||||
dp.initialized = true
|
||||
|
||||
fs.log.WithFields(
|
||||
dp.log.WithFields(
|
||||
logrus.Fields{
|
||||
"jobName": fs.jobName,
|
||||
"jobName": dp.jobName,
|
||||
"bsl": initParam.BSLName,
|
||||
"source namespace": initParam.SourceNamespace,
|
||||
"uploader": initParam.UploaderType,
|
||||
"repository": initParam.RepositoryType,
|
||||
}).Info("FileSystemBR is initialized")
|
||||
}).Info("Data path is initialized")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) Close(ctx context.Context) {
|
||||
if fs.cancel != nil {
|
||||
fs.cancel()
|
||||
func (dp *generalDataPath) Close(ctx context.Context) {
|
||||
if dp.cancel != nil {
|
||||
dp.cancel()
|
||||
}
|
||||
|
||||
fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR")
|
||||
dp.log.WithField("user", dp.jobName).Info("Closing data path")
|
||||
|
||||
fs.wgDataPath.Wait()
|
||||
dp.wgDataPath.Wait()
|
||||
|
||||
fs.close(ctx)
|
||||
dp.close(ctx)
|
||||
|
||||
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
|
||||
dp.log.WithField("user", dp.jobName).Info("Data path is closed")
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) close(ctx context.Context) {
|
||||
fs.dataPathLock.Lock()
|
||||
defer fs.dataPathLock.Unlock()
|
||||
func (dp *generalDataPath) close(ctx context.Context) {
|
||||
dp.dataPathLock.Lock()
|
||||
defer dp.dataPathLock.Unlock()
|
||||
|
||||
if fs.uploaderProv != nil {
|
||||
if err := fs.uploaderProv.Close(ctx); err != nil {
|
||||
fs.log.Errorf("failed to close uploader provider with error %v", err)
|
||||
if dp.uploaderProv != nil {
|
||||
if err := dp.uploaderProv.Close(ctx); err != nil {
|
||||
dp.log.Errorf("failed to close uploader provider with error %v", err)
|
||||
}
|
||||
|
||||
fs.uploaderProv = nil
|
||||
dp.uploaderProv = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param any) error {
|
||||
if !fs.initialized {
|
||||
func (dp *generalDataPath) StartBackup(source AccessPoint, uploaderConfig map[string]string, param any) error {
|
||||
if !dp.initialized {
|
||||
return errors.New("file system data path is not initialized")
|
||||
}
|
||||
|
||||
fs.wgDataPath.Add(1)
|
||||
dp.wgDataPath.Add(1)
|
||||
|
||||
backupParam := param.(*FSBRStartParam)
|
||||
backupParam := param.(*BackupStartParam)
|
||||
|
||||
go func() {
|
||||
fs.log.Info("Start data path backup")
|
||||
dp.log.Info("Start data path backup")
|
||||
|
||||
defer func() {
|
||||
fs.close(context.Background())
|
||||
fs.wgDataPath.Done()
|
||||
dp.close(context.Background())
|
||||
dp.wgDataPath.Done()
|
||||
}()
|
||||
|
||||
snapshotID, emptySnapshot, totalBytes, incrementalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
|
||||
backupParam.ParentSnapshot, provider.CBTParam{}, source.VolMode, uploaderConfig, fs)
|
||||
snapshotID, emptySnapshot, totalBytes, incrementalBytes, err := dp.uploaderProv.RunBackup(dp.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
|
||||
backupParam.ParentSnapshot, provider.CBTParam{}, source.VolMode, uploaderConfig, dp)
|
||||
|
||||
if err == provider.ErrorCanceled {
|
||||
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
|
||||
dp.callbacks.OnCancelled(context.Background(), dp.namespace, dp.jobName)
|
||||
} else if err != nil {
|
||||
dataPathErr := DataPathError{
|
||||
snapshotID: snapshotID,
|
||||
err: err,
|
||||
}
|
||||
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
|
||||
dp.callbacks.OnFailed(context.Background(), dp.namespace, dp.jobName, dataPathErr)
|
||||
} else {
|
||||
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes, incrementalBytes}})
|
||||
dp.callbacks.OnCompleted(context.Background(), dp.namespace, dp.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes, incrementalBytes}})
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error {
|
||||
if !fs.initialized {
|
||||
return errors.New("file system data path is not initialized")
|
||||
func (dp *generalDataPath) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error {
|
||||
if !dp.initialized {
|
||||
return errors.New("data path is not initialized")
|
||||
}
|
||||
|
||||
fs.wgDataPath.Add(1)
|
||||
dp.wgDataPath.Add(1)
|
||||
|
||||
go func() {
|
||||
fs.log.Info("Start data path restore")
|
||||
dp.log.Info("Start data path restore")
|
||||
|
||||
defer func() {
|
||||
fs.close(context.Background())
|
||||
fs.wgDataPath.Done()
|
||||
dp.close(context.Background())
|
||||
dp.wgDataPath.Done()
|
||||
}()
|
||||
|
||||
totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
|
||||
totalBytes, err := dp.uploaderProv.RunRestore(dp.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, dp)
|
||||
|
||||
if err == provider.ErrorCanceled {
|
||||
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
|
||||
dp.callbacks.OnCancelled(context.Background(), dp.namespace, dp.jobName)
|
||||
} else if err != nil {
|
||||
dataPathErr := DataPathError{
|
||||
snapshotID: snapshotID,
|
||||
err: err,
|
||||
}
|
||||
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
|
||||
dp.callbacks.OnFailed(context.Background(), dp.namespace, dp.jobName, dataPathErr)
|
||||
} else {
|
||||
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
|
||||
dp.callbacks.OnCompleted(context.Background(), dp.namespace, dp.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -235,20 +235,20 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
|
||||
}
|
||||
|
||||
// UpdateProgress which implement ProgressUpdater interface to update progress status
|
||||
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
|
||||
if fs.callbacks.OnProgress != nil {
|
||||
fs.callbacks.OnProgress(context.Background(), fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone})
|
||||
func (dp *generalDataPath) UpdateProgress(p *uploader.Progress) {
|
||||
if dp.callbacks.OnProgress != nil {
|
||||
dp.callbacks.OnProgress(context.Background(), dp.namespace, dp.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone})
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) Cancel() {
|
||||
fs.cancel()
|
||||
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is canceled")
|
||||
func (dp *generalDataPath) Cancel() {
|
||||
dp.cancel()
|
||||
dp.log.WithField("user", dp.jobName).Info("FileSystemBR is canceled")
|
||||
}
|
||||
|
||||
func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter, cacheDir string) error {
|
||||
func (dp *generalDataPath) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter, cacheDir string) error {
|
||||
if repositoryType == velerov1api.BackupRepositoryTypeKopia {
|
||||
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo, CacheDir: cacheDir}); err != nil {
|
||||
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, dp.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: dp.backupLocation, BackupRepo: dp.backupRepo, CacheDir: cacheDir}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -94,21 +94,21 @@ func TestAsyncBackup(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
|
||||
dp := newGeneralDataPath("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*generalDataPath)
|
||||
mockProvider := providerMock.NewProvider(t)
|
||||
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.result.Backup.TotalBytes, test.result.Backup.IncrementalBytes, test.err)
|
||||
mockProvider.On("Close", mock.Anything).Return(nil)
|
||||
fs.uploaderProv = mockProvider
|
||||
fs.initialized = true
|
||||
fs.callbacks = test.callbacks
|
||||
dp.uploaderProv = mockProvider
|
||||
dp.initialized = true
|
||||
dp.callbacks = test.callbacks
|
||||
|
||||
err := fs.StartBackup(AccessPoint{ByPath: test.path}, map[string]string{}, &FSBRStartParam{})
|
||||
err := dp.StartBackup(AccessPoint{ByPath: test.path}, map[string]string{}, &BackupStartParam{})
|
||||
require.NoError(t, err)
|
||||
|
||||
<-finish
|
||||
|
||||
// Ensure the goroutine finishes so deferred fs.close executes, satisfying mock expectations.
|
||||
fs.wgDataPath.Wait()
|
||||
dp.wgDataPath.Wait()
|
||||
|
||||
assert.Equal(t, test.err, asyncErr)
|
||||
assert.Equal(t, test.result, asyncResult)
|
||||
@@ -182,21 +182,21 @@ func TestAsyncRestore(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
|
||||
dp := newGeneralDataPath("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*generalDataPath)
|
||||
mockProvider := providerMock.NewProvider(t)
|
||||
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Restore.TotalBytes, test.err)
|
||||
mockProvider.On("Close", mock.Anything).Return(nil)
|
||||
fs.uploaderProv = mockProvider
|
||||
fs.initialized = true
|
||||
fs.callbacks = test.callbacks
|
||||
dp.uploaderProv = mockProvider
|
||||
dp.initialized = true
|
||||
dp.callbacks = test.callbacks
|
||||
|
||||
err := fs.StartRestore(test.snapshot, AccessPoint{ByPath: test.path}, map[string]string{})
|
||||
err := dp.StartRestore(test.snapshot, AccessPoint{ByPath: test.path}, map[string]string{})
|
||||
require.NoError(t, err)
|
||||
|
||||
<-finish
|
||||
|
||||
// Ensure the goroutine finishes so deferred fs.close executes, satisfying mock expectations.
|
||||
fs.wgDataPath.Wait()
|
||||
dp.wgDataPath.Wait()
|
||||
|
||||
assert.Equal(t, asyncErr, test.err)
|
||||
assert.Equal(t, asyncResult, test.result)
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
var ConcurrentLimitExceed error = errors.New("Concurrent number exceeds")
|
||||
var FSBRCreator = newFileSystemBR
|
||||
var VGDPCreator = newGeneralDataPath
|
||||
var MicroServiceBRWatcherCreator = newMicroServiceBRWatcher
|
||||
|
||||
type Manager struct {
|
||||
@@ -45,8 +45,8 @@ func NewManager(cocurrentNum int) *Manager {
|
||||
}
|
||||
}
|
||||
|
||||
// CreateFileSystemBR creates a new file system backup/restore data path instance
|
||||
func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx context.Context, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) (AsyncBR, error) {
|
||||
// CreateGenericDataPath creates a new generic data path instance
|
||||
func (m *Manager) CreateGenericDataPath(jobName string, requestorType string, ctx context.Context, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) (AsyncBR, error) {
|
||||
m.trackerLock.Lock()
|
||||
defer m.trackerLock.Unlock()
|
||||
|
||||
@@ -54,7 +54,7 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c
|
||||
return nil, ConcurrentLimitExceed
|
||||
}
|
||||
|
||||
m.tracker[jobName] = FSBRCreator(jobName, requestorType, client, namespace, callbacks, log)
|
||||
m.tracker[jobName] = VGDPCreator(jobName, requestorType, client, namespace, callbacks, log)
|
||||
|
||||
return m.tracker[jobName], nil
|
||||
}
|
||||
|
||||
@@ -23,16 +23,16 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCreateFileSystemBR(t *testing.T) {
|
||||
func TestCreateGenericDataPath(t *testing.T) {
|
||||
m := NewManager(2)
|
||||
|
||||
async_job_1, err := m.CreateFileSystemBR("job-1", "test", t.Context(), nil, "velero", Callbacks{}, nil)
|
||||
async_job_1, err := m.CreateGenericDataPath("job-1", "test", t.Context(), nil, "velero", Callbacks{}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = m.CreateFileSystemBR("job-2", "test", t.Context(), nil, "velero", Callbacks{}, nil)
|
||||
_, err = m.CreateGenericDataPath("job-2", "test", t.Context(), nil, "velero", Callbacks{}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = m.CreateFileSystemBR("job-3", "test", t.Context(), nil, "velero", Callbacks{}, nil)
|
||||
_, err = m.CreateGenericDataPath("job-3", "test", t.Context(), nil, "velero", Callbacks{}, nil)
|
||||
assert.Equal(t, ConcurrentLimitExceed, err)
|
||||
|
||||
ret := m.GetAsyncBR("job-0")
|
||||
|
||||
@@ -169,14 +169,14 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
|
||||
OnProgress: r.OnDataPathProgress,
|
||||
}
|
||||
|
||||
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, podVolumeRequestor, ctx, r.client, pvb.Namespace, callbacks, log)
|
||||
fsBackup, err := r.dataPathMgr.CreateGenericDataPath(pvb.Name, podVolumeRequestor, ctx, r.client, pvb.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to create data path")
|
||||
}
|
||||
|
||||
log.Debug("Async fs br created")
|
||||
|
||||
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
|
||||
if err := fsBackup.Init(ctx, &datapath.InitParam{
|
||||
BSLName: pvb.Spec.BackupStorageLocation,
|
||||
SourceNamespace: pvb.Spec.Pod.Namespace,
|
||||
UploaderType: pvb.Spec.UploaderType,
|
||||
@@ -192,7 +192,7 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
|
||||
|
||||
tags := map[string]string{}
|
||||
|
||||
if err := fsBackup.StartBackup(r.sourceTargetPath, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
|
||||
if err := fsBackup.StartBackup(r.sourceTargetPath, pvb.Spec.UploaderSettings, &datapath.BackupStartParam{
|
||||
RealSource: GetRealSource(pvb),
|
||||
ParentSnapshot: "",
|
||||
ForceFull: false,
|
||||
|
||||
@@ -402,7 +402,7 @@ func TestRunCancelableDataPath(t *testing.T) {
|
||||
bs.dataPathMgr = test.dataPathMgr
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
datapath.VGDPCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
fsBR := datapathmockes.NewAsyncBR(t)
|
||||
if test.initErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||
|
||||
@@ -161,7 +161,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
|
||||
OnProgress: r.OnPvrProgress,
|
||||
}
|
||||
|
||||
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log)
|
||||
fsRestore, err := r.dataPathMgr.CreateGenericDataPath(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to create data path")
|
||||
}
|
||||
@@ -169,7 +169,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
|
||||
log.Debug("Async fs br created")
|
||||
|
||||
if err := fsRestore.Init(ctx,
|
||||
&datapath.FSBRInitParam{
|
||||
&datapath.InitParam{
|
||||
BSLName: pvr.Spec.BackupStorageLocation,
|
||||
SourceNamespace: pvr.Spec.SourceNamespace,
|
||||
UploaderType: pvr.Spec.UploaderType,
|
||||
|
||||
@@ -428,7 +428,7 @@ func TestRunCancelableDataPathRestore(t *testing.T) {
|
||||
rs.dataPathMgr = test.dataPathMgr
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
datapath.VGDPCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
fsBR := datapathmockes.NewAsyncBR(t)
|
||||
if test.initErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||
|
||||
@@ -36,9 +36,8 @@ import (
|
||||
"github.com/velero-io/velero/pkg/repository/udmrepo/service"
|
||||
)
|
||||
|
||||
// BackupFunc mainly used to make testing more convenient
|
||||
var BackupFunc = kopia.Backup
|
||||
var RestoreFunc = kopia.Restore
|
||||
var kopiaBackupFunc = kopia.Backup
|
||||
var kopiaRestoreFunc = kopia.Restore
|
||||
var BackupRepoServiceCreateFunc = service.Create
|
||||
|
||||
// kopiaProvider recorded info related with kopiaProvider
|
||||
@@ -166,7 +165,7 @@ func (kp *kopiaProvider) RunBackup(
|
||||
uploaderCfg[kopia.UploaderConfigMultipartKey] = "true"
|
||||
}
|
||||
|
||||
snapshotInfo, _, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
|
||||
snapshotInfo, _, err := kopiaBackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
|
||||
if err != nil {
|
||||
snapshotID := ""
|
||||
if snapshotInfo != nil {
|
||||
@@ -234,7 +233,7 @@ func (kp *kopiaProvider) RunRestore(
|
||||
// We use the cancel channel to control the restore cancel, so don't pass a context with cancel to Kopia restore.
|
||||
// Otherwise, Kopia restore will not response to the cancel control but return an arbitrary error.
|
||||
// Kopia restore cancel is not designed as well as Kopia backup which uses the context to control backup cancel all the way.
|
||||
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel)
|
||||
size, fileCount, err := kopiaRestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "Failed to run kopia restore")
|
||||
|
||||
@@ -105,7 +105,7 @@ func TestRunBackup(t *testing.T) {
|
||||
if tc.volMode == "" {
|
||||
tc.volMode = uploader.PersistentVolumeFilesystem
|
||||
}
|
||||
BackupFunc = tc.hookBackupFunc
|
||||
kopiaBackupFunc = tc.hookBackupFunc
|
||||
_, _, _, _, err := kp.RunBackup(t.Context(), "var", "", nil, false, "", CBTParam{}, tc.volMode, map[string]string{}, &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
@@ -156,7 +156,7 @@ func TestRunRestore(t *testing.T) {
|
||||
if tc.volMode == "" {
|
||||
tc.volMode = uploader.PersistentVolumeFilesystem
|
||||
}
|
||||
RestoreFunc = tc.hookRestoreFunc
|
||||
kopiaRestoreFunc = tc.hookRestoreFunc
|
||||
_, err := kp.RunRestore(t.Context(), "", "/var", tc.volMode, map[string]string{}, &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
|
||||
Reference in New Issue
Block a user