data mover micro service restore

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2024-07-30 17:14:11 +08:00
parent 8e0f4d17f7
commit 86e54801c5
7 changed files with 1047 additions and 12 deletions

View File

@@ -0,0 +1 @@
Data mover micro service restore according to design #7576

View File

@@ -14,16 +14,37 @@ limitations under the License.
package datamover
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type dataMoverRestoreConfig struct {
@@ -52,7 +73,10 @@ func NewRestoreCommand(f client.Factory) *cobra.Command {
logger.Infof("Starting Velero data-mover restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s := newdataMoverRestore(logger, config)
s, err := newdataMoverRestore(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create data mover restore, %v", err)
}
s.run()
},
@@ -74,19 +98,174 @@ func NewRestoreCommand(f client.Factory) *cobra.Command {
}
type dataMoverRestore struct {
logger logrus.FieldLogger
config dataMoverRestoreConfig
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config dataMoverRestoreConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}
func newdataMoverRestore(logger logrus.FieldLogger, config dataMoverRestoreConfig) *dataMoverRestore {
s := &dataMoverRestore{
logger: logger,
config: config,
func newdataMoverRestore(logger logrus.FieldLogger, factory client.Factory, config dataMoverRestoreConfig) (*dataMoverRestore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")
}
return s
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")
}
if err := velerov2alpha1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme")
}
if err := v1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")
}
nodeName := os.Getenv("NODE_NAME")
// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov2alpha1api.DataDownload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}
cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")
}
cache, err := ctlcache.New(clientConfig, cacheOption)
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
}
s := &dataMoverRestore{
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}
s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")
}
s.dataPathMgr = datapath.NewManager(1)
return s, nil
}
var funcCreateDataPathRestore = (*dataMoverRestore).createDataPathService
func (s *dataMoverRestore) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")
}
}()
// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
}
func (s *dataMoverRestore) runDataPath() {
s.logger.Infof("Starting micro service in node %s for dd %s", s.nodeName, s.config.ddName)
dpService, err := funcCreateDataPathRestore(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for DataDownload %s: %v", s.config.ddName, err)
return
}
s.logger.Infof("Starting data path service %s", s.config.ddName)
err = dpService.Init()
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err)
return
}
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err)
return
}
s.logger.WithField("dd", s.config.ddName).Info("Data path service completed")
dpService.Shutdown()
s.logger.WithField("dd", s.config.ddName).Info("Data path service is shut down")
s.cancelFunc()
funcExitWithMessage(s.logger, true, result)
}
func (s *dataMoverRestore) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
defaultCredentialsDirectory,
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}
credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}
repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)
return datamover.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.ddName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeMode(s.config.volumeMode),
}, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil
}

View File

@@ -0,0 +1,166 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamover
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func fakeCreateDataPathRestoreWithErr(_ *dataMoverRestore) (dataPathService, error) {
return nil, errors.New("fake-create-data-path-error")
}
func fakeCreateDataPathRestore(_ *dataMoverRestore) (dataPathService, error) {
return frHelper, nil
}
func TestRunDataPathRestore(t *testing.T) {
tests := []struct {
name string
ddName string
createDataPathFail bool
initDataPathErr error
runCancelableDataPathErr error
runCancelableDataPathResult string
expectedMessage string
expectedSucceed bool
}{
{
name: "create data path failed",
ddName: "fake-name",
createDataPathFail: true,
expectedMessage: "Failed to create data path service for DataDownload fake-name: fake-create-data-path-error",
},
{
name: "init data path failed",
ddName: "fake-name",
initDataPathErr: errors.New("fake-init-data-path-error"),
expectedMessage: "Failed to init data path service for DataDownload fake-name: fake-init-data-path-error",
},
{
name: "run data path failed",
ddName: "fake-name",
runCancelableDataPathErr: errors.New("fake-run-data-path-error"),
expectedMessage: "Failed to run data path service for DataDownload fake-name: fake-run-data-path-error",
},
{
name: "succeed",
ddName: "fake-name",
runCancelableDataPathResult: "fake-run-data-path-result",
expectedMessage: "fake-run-data-path-result",
expectedSucceed: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
frHelper = &fakeRunHelper{
initErr: test.initDataPathErr,
runCancelableDataPathErr: test.runCancelableDataPathErr,
runCancelableDataPathResult: test.runCancelableDataPathResult,
}
if test.createDataPathFail {
funcCreateDataPathRestore = fakeCreateDataPathRestoreWithErr
} else {
funcCreateDataPathRestore = fakeCreateDataPathRestore
}
funcExitWithMessage = frHelper.ExitWithMessage
s := &dataMoverRestore{
logger: velerotest.NewLogger(),
cancelFunc: func() {},
config: dataMoverRestoreConfig{
ddName: test.ddName,
},
}
s.runDataPath()
assert.Equal(t, test.expectedMessage, frHelper.exitMessage)
assert.Equal(t, test.expectedSucceed, frHelper.succeed)
})
}
}
func TestCreateDataPathRestore(t *testing.T) {
tests := []struct {
name string
fileStoreErr error
secretStoreErr error
mockGetInformer bool
getInformerErr error
expectedError string
}{
{
name: "create credential file store error",
fileStoreErr: errors.New("fake-file-store-error"),
expectedError: "error to create credential file store: fake-file-store-error",
},
{
name: "create credential secret store",
secretStoreErr: errors.New("fake-secret-store-error"),
expectedError: "error to create credential secret store: fake-secret-store-error",
},
{
name: "get informer error",
mockGetInformer: true,
getInformerErr: errors.New("fake-get-informer-error"),
expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error",
},
{
name: "succeed",
mockGetInformer: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fcHelper := &fakeCreateDataPathServiceHelper{
fileStoreErr: test.fileStoreErr,
secretStoreErr: test.secretStoreErr,
}
funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore
funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore
cache := cacheMock.NewCache(t)
if test.mockGetInformer {
cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr)
}
funcExitWithMessage = frHelper.ExitWithMessage
s := &dataMoverRestore{
cache: cache,
}
_, err := s.createDataPathService()
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@@ -127,7 +127,7 @@ func (r *BackupMicroService) Init() error {
return err
}
var waitDuTimeout time.Duration = time.Minute * 2
var waitControllerTimeout time.Duration = time.Minute * 2
func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
@@ -135,7 +135,7 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
})
du := &velerov2alpha1api.DataUpload{}
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitDuTimeout, true, func(ctx context.Context) (bool, error) {
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.dataUploadName,
@@ -313,7 +313,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) {
func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) {
r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled")
r.eventRecorder.Event(du, false, "Canceling", "Canceing for data upload %s", du.Name)
r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup == nil {

View File

@@ -412,7 +412,7 @@ func TestRunCancelableDataPath(t *testing.T) {
return fsBR
}
waitDuTimeout = time.Second
waitControllerTimeout = time.Second
if test.result != nil {
go func() {

View File

@@ -0,0 +1,295 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamover
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
cachetool "k8s.io/client-go/tools/cache"
)
// RestoreMicroService process data mover restores inside the restore pod
type RestoreMicroService struct {
ctx context.Context
client client.Client
kubeClient kubernetes.Interface
repoEnsurer *repository.Ensurer
credentialGetter *credentials.CredentialGetter
logger logrus.FieldLogger
dataPathMgr *datapath.Manager
eventRecorder kube.EventRecorder
namespace string
dataDownloadName string
dataDownload *velerov2alpha1api.DataDownload
sourceTargetPath datapath.AccessPoint
resultSignal chan dataPathResult
ddInformer cache.Informer
ddHandler cachetool.ResourceEventHandlerRegistration
nodeName string
}
func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, dataDownloadName string, namespace string, nodeName string,
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
ddInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
return &RestoreMicroService{
ctx: ctx,
client: client,
kubeClient: kubeClient,
credentialGetter: cred,
logger: log,
repoEnsurer: repoEnsurer,
dataPathMgr: dataPathMgr,
namespace: namespace,
dataDownloadName: dataDownloadName,
sourceTargetPath: sourceTargetPath,
nodeName: nodeName,
resultSignal: make(chan dataPathResult),
ddInformer: ddInformer,
}
}
func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName)
handler, err := r.ddInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldDd := oldObj.(*velerov2alpha1api.DataDownload)
newDd := newObj.(*velerov2alpha1api.DataDownload)
if newDd.Name != r.dataDownloadName {
return
}
if newDd.Status.Phase != velerov2alpha1api.DataDownloadPhaseInProgress {
return
}
if newDd.Spec.Cancel && !oldDd.Spec.Cancel {
r.cancelDataDownload(newDd)
}
},
},
)
if err != nil {
return errors.Wrap(err, "error adding dd handler")
}
r.ddHandler = handler
return err
}
func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
"datadownload": r.dataDownloadName,
})
dd := &velerov2alpha1api.DataDownload{}
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.dataDownloadName,
}, dd)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return true, errors.Wrapf(err, "error to get dd %s", r.dataDownloadName)
}
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
return true, nil
} else {
return false, nil
}
})
if err != nil {
log.WithError(err).Error("Failed to wait dd")
return "", errors.Wrap(err, "error waiting for dd")
}
r.dataDownload = dd
log.Info("Run cancelable dataDownload")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
OnCancelled: r.OnDataDownloadCancelled,
OnProgress: r.OnDataDownloadProgress,
}
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
if err != nil {
return "", errors.Wrap(err, "error to create data path")
}
log.Debug("Found volume path")
if err := fsRestore.Init(ctx,
&datapath.FSBRInitParam{
BSLName: dd.Spec.BackupStorageLocation,
SourceNamespace: dd.Spec.SourceNamespace,
UploaderType: GetUploaderType(dd.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return "", errors.Wrap(err, "error to initialize data path")
}
log.Info("fs init")
if err := fsRestore.StartRestore(dd.Spec.SnapshotID, r.sourceTargetPath, dd.Spec.DataMoverConfig); err != nil {
return "", errors.Wrap(err, "error starting data path restore")
}
log.Info("Async fs restore data path started")
r.eventRecorder.Event(dd, false, datapath.EventReasonStarted, "Data path for %s started", dd.Name)
result := ""
select {
case <-ctx.Done():
err = errors.New("timed out waiting for fs restore to complete")
break
case res := <-r.resultSignal:
err = res.err
result = res.result
break
}
if err != nil {
log.WithError(err).Error("Async fs restore was not completed")
}
return result, err
}
func (r *RestoreMicroService) Shutdown() {
r.eventRecorder.Shutdown()
r.closeDataPath(r.ctx, r.dataDownloadName)
if r.ddHandler != nil {
if err := r.ddInformer.RemoveEventHandler(r.ddHandler); err != nil {
r.logger.WithError(err).Warn("Failed to remove pod handler")
}
}
}
func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
restoreBytes, err := funcMarshal(result.Restore)
if err != nil {
log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, "Failed to marshal restore result %v", result.Restore),
}
} else {
r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonCompleted, string(restoreBytes))
r.resultSignal <- dataPathResult{
result: string(restoreBytes),
}
}
log.Info("Async fs restore data path completed")
}
func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.WithError(err).Error("Async fs restore data path failed")
r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonFailed, "Data path for data download %s failed, error %v", r.dataDownloadName, err)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, "Data path for data download %s failed", r.dataDownloadName),
}
}
func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.Warn("Async fs restore data path canceled")
r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonCancelled, "Data path for data download %s canceled", ddName)
r.resultSignal <- dataPathResult{
err: errors.New(datapath.ErrCancelled),
}
}
func (r *RestoreMicroService) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
log := r.logger.WithFields(logrus.Fields{
"datadownload": ddName,
})
progressBytes, err := funcMarshal(progress)
if err != nil {
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
return
}
r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonProgress, string(progressBytes))
}
func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) {
fsRestore := r.dataPathMgr.GetAsyncBR(ddName)
if fsRestore != nil {
fsRestore.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(ddName)
}
func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) {
r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled")
r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsBackup == nil {
r.OnDataDownloadCancelled(r.ctx, dd.GetNamespace(), dd.GetName())
} else {
fsBackup.Cancel()
}
}

View File

@@ -0,0 +1,394 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamover
import (
"context"
"fmt"
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/uploader"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestOnDataDownloadFailed(t *testing.T) {
dataDownloadName := "fake-data-download"
bt := &backupMsTestHelper{}
bs := &RestoreMicroService{
dataDownloadName: dataDownloadName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := "Data path for data download fake-data-download failed: fake-error"
expectedEventReason := datapath.EventReasonFailed
expectedEventMsg := "Data path for data download fake-data-download failed, error fake-error"
go bs.OnDataDownloadFailed(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName, errors.New("fake-error"))
result := <-bs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, bt.EventReason())
assert.Equal(t, expectedEventMsg, bt.EventMessage())
}
func TestOnDataDownloadCancelled(t *testing.T) {
dataDownloadName := "fake-data-download"
bt := &backupMsTestHelper{}
bs := &RestoreMicroService{
dataDownloadName: dataDownloadName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := datapath.ErrCancelled
expectedEventReason := datapath.EventReasonCancelled
expectedEventMsg := "Data path for data download fake-data-download canceled"
go bs.OnDataDownloadCancelled(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName)
result := <-bs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, bt.EventReason())
assert.Equal(t, expectedEventMsg, bt.EventMessage())
}
func TestOnDataDownloadCompleted(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal restore result {{ }}: fake-marshal-error",
},
{
name: "succeed",
marshallStr: "fake-complete-string",
expectedEventReason: datapath.EventReasonCompleted,
expectedEventMsg: "fake-complete-string",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dataDownloadName := "fake-data-download"
bt := &backupMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
}
bs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
funcMarshal = bt.Marshal
go bs.OnDataDownloadCompleted(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName, datapath.Result{})
result := <-bs.resultSignal
if test.marshalErr != nil {
assert.EqualError(t, result.err, test.expectedErr)
} else {
assert.NoError(t, result.err)
assert.Equal(t, test.expectedEventReason, bt.EventReason())
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
}
})
}
}
func TestOnDataDownloadProgress(t *testing.T) {
tests := []struct {
name string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
},
{
name: "succeed",
marshallStr: "fake-progress-string",
expectedEventReason: datapath.EventReasonProgress,
expectedEventMsg: "fake-progress-string",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dataDownloadName := "fake-data-download"
bt := &backupMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
}
bs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
logger: velerotest.NewLogger(),
}
funcMarshal = bt.Marshal
bs.OnDataDownloadProgress(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName, &uploader.Progress{})
if test.marshalErr != nil {
assert.False(t, bt.withEvent)
} else {
assert.True(t, bt.withEvent)
assert.Equal(t, test.expectedEventReason, bt.EventReason())
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
}
})
}
}
func TestCancelDataDownload(t *testing.T) {
tests := []struct {
name string
expectedEventReason string
expectedEventMsg string
expectedErr string
}{
{
name: "no fs restore",
expectedEventReason: datapath.EventReasonCancelled,
expectedEventMsg: "Data path for data download fake-data-download canceled",
expectedErr: datapath.ErrCancelled,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dataDownloadName := "fake-data-download"
dd := builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).Result()
bt := &backupMsTestHelper{}
bs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
go bs.cancelDataDownload(dd)
result := <-bs.resultSignal
assert.EqualError(t, result.err, test.expectedErr)
assert.True(t, bt.withEvent)
assert.Equal(t, test.expectedEventReason, bt.EventReason())
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
})
}
}
func TestRunCancelableRestore(t *testing.T) {
dataDownloadName := "fake-data-download"
dd := builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).Phase(velerov2alpha1api.DataDownloadPhaseNew).Result()
ddInProgress := builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result()
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
tests := []struct {
name string
ctx context.Context
result *dataPathResult
dataPathMgr *datapath.Manager
kubeClientObj []runtime.Object
initErr error
startErr error
dataPathStarted bool
expectedEventMsg string
expectedErr string
}{
{
name: "no dd",
ctx: context.Background(),
expectedErr: "error waiting for dd: context deadline exceeded",
},
{
name: "dd not in in-progress",
ctx: context.Background(),
kubeClientObj: []runtime.Object{dd},
expectedErr: "error waiting for dd: context deadline exceeded",
},
{
name: "create data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{ddInProgress},
dataPathMgr: datapath.NewManager(0),
expectedErr: "error to create data path: Concurrent number exceeds",
},
{
name: "init data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{ddInProgress},
initErr: errors.New("fake-init-error"),
expectedErr: "error to initialize data path: fake-init-error",
},
{
name: "start data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{ddInProgress},
startErr: errors.New("fake-start-error"),
expectedErr: "error starting data path restore: fake-start-error",
},
{
name: "data path timeout",
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{ddInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedErr: "timed out waiting for fs restore to complete",
},
{
name: "data path returns error",
ctx: context.Background(),
kubeClientObj: []runtime.Object{ddInProgress},
dataPathStarted: true,
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedErr: "fake-data-path-error",
},
{
name: "succeed",
ctx: context.Background(),
kubeClientObj: []runtime.Object{ddInProgress},
dataPathStarted: true,
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
},
}
scheme := runtime.NewScheme()
velerov2alpha1api.AddToScheme(scheme)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
bt := &backupMsTestHelper{}
rs := &RestoreMicroService{
namespace: velerov1api.DefaultNamespace,
dataDownloadName: dataDownloadName,
ctx: context.Background(),
client: fakeClient,
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
if test.ctx != nil {
rs.ctx = test.ctx
}
if test.dataPathMgr != nil {
rs.dataPathMgr = test.dataPathMgr
}
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
fsBR := datapathmockes.NewAsyncBR(t)
if test.initErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
}
if test.startErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
}
if test.dataPathStarted {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(nil)
}
return fsBR
}
waitControllerTimeout = time.Second
if test.result != nil {
go func() {
time.Sleep(time.Millisecond * 500)
rs.resultSignal <- *test.result
}()
}
result, err := rs.RunCancelableDataPath(test.ctx)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.result.result, result)
}
if test.expectedEventMsg != "" {
assert.True(t, bt.withEvent)
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
}
})
}
cancel()
}