Merge branch 'main' into data-mover-ms-smoking-test

This commit is contained in:
Lyndon-Li
2024-08-14 00:15:56 +08:00
33 changed files with 1276 additions and 616 deletions

View File

@@ -41,6 +41,11 @@ type BackupRepositorySpec struct {
// MaintenanceFrequency is how often maintenance should be run.
MaintenanceFrequency metav1.Duration `json:"maintenanceFrequency"`
// RepositoryConfig is for repository-specific configuration fields.
// +optional
// +nullable
RepositoryConfig map[string]string `json:"repositoryConfig,omitempty"`
}
// BackupRepositoryPhase represents the lifecycle phase of a BackupRepository.

View File

@@ -111,7 +111,7 @@ func (in *BackupRepository) DeepCopyInto(out *BackupRepository) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
@@ -169,6 +169,13 @@ func (in *BackupRepositoryList) DeepCopyObject() runtime.Object {
func (in *BackupRepositorySpec) DeepCopyInto(out *BackupRepositorySpec) {
*out = *in
out.MaintenanceFrequency = in.MaintenanceFrequency
if in.RepositoryConfig != nil {
in, out := &in.RepositoryConfig, &out.RepositoryConfig
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupRepositorySpec.

View File

@@ -19,6 +19,7 @@ package builder
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
)
@@ -122,3 +123,21 @@ func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownlo
d.object.Status.StartTimestamp = startTime
return d
}
// CompletionTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataDownloadBuilder {
d.object.Status.CompletionTimestamp = completionTimestamp
return d
}
// Progress sets the DataDownload's Progress.
func (d *DataDownloadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataDownloadBuilder {
d.object.Status.Progress = progress
return d
}
// Node sets the DataDownload's Node.
func (d *DataDownloadBuilder) Node(node string) *DataDownloadBuilder {
d.object.Status.Node = node
return d
}

View File

@@ -133,7 +133,14 @@ func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder
return d
}
// Progress sets the DataUpload's Progress.
func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataUploadBuilder {
d.object.Status.Progress = progress
return d
}
// Node sets the DataUpload's Node.
func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder {
d.object.Status.Node = node
return d
}

View File

@@ -364,8 +364,10 @@ func (o *Options) Validate(c *cobra.Command, args []string, f client.Factory) er
return err
}
if err := uploader.ValidateUploaderType(o.UploaderType); err != nil {
if msg, err := uploader.ValidateUploaderType(o.UploaderType); err != nil {
return err
} else if msg != "" {
fmt.Printf("⚠️ %s\n", msg)
}
// If we're only installing CRDs, we can skip the rest of the validation.

View File

@@ -293,17 +293,29 @@ func (s *nodeAgentServer) run() {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataUploadResume(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataDownloadResume(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
go func() {
s.mgr.GetCache().WaitForCacheSync(s.ctx)
if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
}
if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
}
s.logger.Info("Attempt complete to resume dataUploads and dataDownloads")
}()
s.logger.Info("Controllers starting...")
if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil {
@@ -373,31 +385,6 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}
func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
}
}
func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
}
}
func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil {

View File

@@ -140,6 +140,7 @@ type serverConfig struct {
disableInformerCache bool
scheduleSkipImmediately bool
maintenanceCfg repository.MaintenanceConfig
backukpRepoConfig string
}
func NewCommand(f client.Factory) *cobra.Command {
@@ -253,6 +254,8 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().StringVar(&config.maintenanceCfg.CPULimit, "maintenance-job-cpu-limit", config.maintenanceCfg.CPULimit, "CPU limit for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.MemLimit, "maintenance-job-mem-limit", config.maintenanceCfg.MemLimit, "Memory limit for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.backukpRepoConfig, "backup-repository-config", config.backukpRepoConfig, "The name of configMap containing backup repository configurations.")
// maintenance job log setting inherited from velero server
config.maintenanceCfg.FormatFlag = config.formatFlag
config.maintenanceCfg.LogLevelFlag = logLevelFlag
@@ -288,8 +291,10 @@ type server struct {
}
func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*server, error) {
if err := uploader.ValidateUploaderType(config.uploaderType); err != nil {
if msg, err := uploader.ValidateUploaderType(config.uploaderType); err != nil {
return nil, err
} else if msg != "" {
logger.Warn(msg)
}
if config.clientQPS < 0.0 {
@@ -876,7 +881,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
if _, ok := enabledRuntimeControllers[controller.BackupRepo]; ok {
if err := controller.NewBackupRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
if err := controller.NewBackupRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.config.backukpRepoConfig, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupRepo)
}
}
@@ -1148,9 +1153,15 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup
du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew ||
du.Status.Phase == "" {
err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
dataUpload.Status.Message = fmt.Sprintf("Dataupload is in status %q during the velero server starting, mark it as cancel", du.Status.Phase)
return true
})
if err != nil {
@@ -1183,9 +1194,15 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew ||
dd.Status.Phase == "" {
err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
dataDownload.Status.Message = fmt.Sprintf("Datadownload is in status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
return true
})
if err != nil {

View File

@@ -203,6 +203,13 @@ func Test_newServer(t *testing.T) {
}, logger)
assert.Error(t, err)
// invalid clientQPS Restic uploader
_, err = newServer(factory, serverConfig{
uploaderType: uploader.ResticType,
clientQPS: -1,
}, logger)
assert.Error(t, err)
// invalid clientBurst
factory.On("SetClientQPS", mock.Anything).Return()
_, err = newServer(factory, serverConfig{

View File

@@ -19,6 +19,8 @@ package controller
import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"time"
@@ -38,6 +40,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
"github.com/vmware-tanzu/velero/pkg/util/kube"
corev1api "k8s.io/api/core/v1"
)
const (
@@ -51,17 +55,19 @@ type BackupRepoReconciler struct {
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
maintenanceFrequency time.Duration
backukpRepoConfig string
repositoryManager repository.Manager
}
func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client,
maintenanceFrequency time.Duration, repositoryManager repository.Manager) *BackupRepoReconciler {
maintenanceFrequency time.Duration, backukpRepoConfig string, repositoryManager repository.Manager) *BackupRepoReconciler {
c := &BackupRepoReconciler{
client,
namespace,
logger,
clocks.RealClock{},
maintenanceFrequency,
backukpRepoConfig,
repositoryManager,
}
@@ -223,7 +229,7 @@ func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *veler
}
func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
log.Info("Initializing backup repository")
log.WithField("repoConfig", r.backukpRepoConfig).Info("Initializing backup repository")
// confirm the repo's BackupStorageLocation is valid
repoIdentifier, err := r.getIdentiferByBSL(ctx, req)
@@ -238,6 +244,13 @@ func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
})
}
config, err := getBackupRepositoryConfig(ctx, r, r.backukpRepoConfig, r.namespace, req.Name, req.Spec.RepositoryType, log)
if err != nil {
log.WithError(err).Warn("Failed to get repo config, repo config is ignored")
} else if config != nil {
log.Infof("Init repo with config %v", config)
}
// defaulting - if the patch fails, return an error so the item is returned to the queue
if err := r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
rr.Spec.ResticIdentifier = repoIdentifier
@@ -245,6 +258,8 @@ func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
if rr.Spec.MaintenanceFrequency.Duration <= 0 {
rr.Spec.MaintenanceFrequency = metav1.Duration{Duration: r.getRepositoryMaintenanceFrequency(req)}
}
rr.Spec.RepositoryConfig = config
}); err != nil {
return err
}
@@ -366,3 +381,35 @@ func (r *BackupRepoReconciler) patchBackupRepository(ctx context.Context, req *v
}
return nil
}
func getBackupRepositoryConfig(ctx context.Context, ctrlClient client.Client, configName, namespace, repoName, repoType string, log logrus.FieldLogger) (map[string]string, error) {
if configName == "" {
return nil, nil
}
loc := &corev1api.ConfigMap{}
if err := ctrlClient.Get(ctx, client.ObjectKey{
Namespace: namespace,
Name: configName,
}, loc); err != nil {
return nil, errors.Wrapf(err, "error getting configMap %s", configName)
}
jsonData, found := loc.Data[repoType]
if !found {
log.Info("No data for repo type %s in config map %s", repoType, configName)
return nil, nil
}
var unmarshalled map[string]interface{}
if err := json.Unmarshal([]byte(jsonData), &unmarshalled); err != nil {
return nil, errors.Wrapf(err, "error unmarshalling config data from %s for repo %s, repo type %s", configName, repoName, repoType)
}
result := map[string]string{}
for k, v := range unmarshalled {
result[k] = fmt.Sprintf("%v", v)
}
return result, nil
}

View File

@@ -21,7 +21,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
@@ -29,6 +31,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
)
const testMaintenanceFrequency = 10 * time.Minute
@@ -43,6 +47,7 @@ func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
testMaintenanceFrequency,
"fake-repo-config",
mgr,
)
}
@@ -243,6 +248,7 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) {
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
test.userDefinedFreq,
"",
&mgr,
)
@@ -370,10 +376,112 @@ func TestNeedInvalidBackupRepo(t *testing.T) {
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
time.Duration(0), nil)
time.Duration(0), "", nil)
need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL)
assert.Equal(t, test.expect, need)
})
}
}
func TestGetBackupRepositoryConfig(t *testing.T) {
configWithNoData := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-1",
Namespace: velerov1api.DefaultNamespace,
},
}
configWithWrongData := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-1",
Namespace: velerov1api.DefaultNamespace,
},
Data: map[string]string{
"fake-repo-type": "",
},
}
configWithData := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-1",
Namespace: velerov1api.DefaultNamespace,
},
Data: map[string]string{
"fake-repo-type": "{\"cacheLimitMB\": 1000, \"enableCompression\": true}",
"fake-repo-type-1": "{\"cacheLimitMB\": 1, \"enableCompression\": false}",
},
}
tests := []struct {
name string
congiName string
repoName string
repoType string
kubeClientObj []runtime.Object
expectedErr string
expectedResult map[string]string
}{
{
name: "empty configName",
},
{
name: "get error",
congiName: "config-1",
expectedErr: "error getting configMap config-1: configmaps \"config-1\" not found",
},
{
name: "no config for repo",
congiName: "config-1",
repoName: "fake-repo",
repoType: "fake-repo-type",
kubeClientObj: []runtime.Object{
configWithNoData,
},
},
{
name: "unmarshall error",
congiName: "config-1",
repoName: "fake-repo",
repoType: "fake-repo-type",
kubeClientObj: []runtime.Object{
configWithWrongData,
},
expectedErr: "error unmarshalling config data from config-1 for repo fake-repo, repo type fake-repo-type: unexpected end of JSON input",
},
{
name: "succeed",
congiName: "config-1",
repoName: "fake-repo",
repoType: "fake-repo-type",
kubeClientObj: []runtime.Object{
configWithData,
},
expectedResult: map[string]string{
"cacheLimitMB": "1000",
"enableCompression": "true",
},
},
}
scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
result, err := getBackupRepositoryConfig(context.Background(), fakeClient, test.congiName, velerov1api.DefaultNamespace, test.repoName, test.repoType, velerotest.NewLogger())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expectedResult, result)
}
})
}
}

View File

@@ -140,9 +140,17 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
} else if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase)
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
dataDownload.Status.Message = "Cancel datadownload because it is being deleted"
return true
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
@@ -563,9 +571,15 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason)
dataDownload.Status.Message = fmt.Sprintf("Cancel datadownload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
return true
})
if err != nil {
@@ -586,75 +600,6 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
return []reconcile.Request{request}
}
func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
pods := &v1.PodList{}
var dataDownloads []*velerov2alpha1api.DataDownload
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return nil, errors.Wrapf(err, "failed to list pods on current node")
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != r.nodeName {
r.logger.Debugf("Pod %s related data download will not handled by %s nodes", pod.GetName(), r.nodeName)
continue
}
dd, err := findDataDownloadByPod(cli, pod)
if err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataDownload by pod")
continue
} else if dd != nil {
dataDownloads = append(dataDownloads, dd)
}
}
return dataDownloads, nil
}
func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return nil, errors.Wrapf(err, "failed to list datauploads")
}
var result []velerov2alpha1api.DataDownload
for _, dd := range dataDownloads.Items {
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
continue
}
if dd.Labels[acceptNodeLabelKey] == r.nodeName {
result = append(result, dd)
}
}
return result, nil
}
// CancelAcceptedDataDownload will cancel the accepted data download
func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) {
r.logger.Infof("Canceling accepted data for node %s", r.nodeName)
dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns)
if err != nil {
r.logger.WithError(err).Error("failed to find data downloads")
return
}
for _, dd := range dataDownloads {
if dd.Spec.Cancel {
continue
}
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})
r.logger.Warn(dd.Status.Message)
if err != nil {
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
}
}
}
func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
@@ -806,56 +751,139 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted
}
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataDownload) bool) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
dd := &velerov2alpha1api.DataDownload{}
if err := client.Get(ctx, namespacedName, dd); err != nil {
return false, errors.Wrap(err, "getting DataDownload")
}
updateFunc(dd)
updateErr := client.Update(ctx, dd)
if updateErr != nil {
if apierrors.IsConflict(updateErr) {
log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name)
return false, nil
if updateFunc(dd) {
err := client.Update(ctx, dd)
if err != nil {
if apierrors.IsConflict(err) {
log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name)
return false, nil
} else {
return false, errors.Wrapf(err, "error updating datadownload %s/%s", dd.Namespace, dd.Name)
}
}
log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name)
return false, err
}
return true, nil
})
}
func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil {
return errors.Wrapf(err, "failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})
var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath
if err != nil {
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
continue
}
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")
func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads")
return errors.Wrapf(err, "error to list datadownloads")
}
for i := range dataDownloads.Items {
dd := &dataDownloads.Items[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
if dd.Status.Node != r.nodeName {
logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node)
continue
}
err := funcResumeCancellableDataRestore(r, ctx, dd, logger)
if err == nil {
continue
}
logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it")
resumeErr := err
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("Resume InProgress datadownload failed with error %v, mark it as cancel", resumeErr)
return true
})
if err != nil {
logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
}
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase")
err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = "Datadownload is in Accepted status during the node-agent starting, mark it as cancel"
return true
})
if err != nil {
r.logger.WithField("datadownload", dd.GetName()).WithError(err).Errorf("Failed to trigger dataupload cancel")
}
}
}
//If the data download is in Accepted status, the expoded PVC may be not created
// so we need to mark the data download as canceled for it may not be recoverable
r.CancelAcceptedDataDownload(ctx, cli, ns)
return nil
}
func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, log logrus.FieldLogger) error {
log.Info("Resume cancelable dataDownload")
res, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration)
if err != nil {
return errors.Wrapf(err, "error to get exposed volume for dd %s", dd.Name)
}
if res == nil {
return errors.Errorf("expose info missed for dd %s", dd.Name)
}
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
OnCancelled: r.OnDataDownloadCancelled,
OnProgress: r.OnDataDownloadProgress,
}
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log)
if err != nil {
return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name)
}
resumeComplete := false
defer func() {
if !resumeComplete {
r.closeDataPath(ctx, dd.Name)
}
}()
if err := asyncBR.Init(ctx, nil); err != nil {
return errors.Wrapf(err, "error to init asyncBR watcher for dd %s", dd.Name)
}
if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, nil); err != nil {
return errors.Wrapf(err, "error to resume asyncBR watcher for dd %s", dd.Name)
}
resumeComplete = true
log.Infof("asyncBR is resumed for dd %s", dd.Name)
return nil
}

View File

@@ -904,12 +904,11 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
testCases := []struct {
Name string
needErrs []bool
noChange bool
ExpectErr bool
}{
{
Name: "SuccessOnFirstAttempt",
needErrs: []bool{false, false, false, false},
ExpectErr: false,
Name: "SuccessOnFirstAttempt",
},
{
Name: "Error get",
@@ -921,6 +920,11 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
needErrs: []bool{false, false, true, false, false},
ExpectErr: true,
},
{
Name: "no change",
noChange: true,
needErrs: []bool{false, false, true, false, false},
},
{
Name: "Conflict with error timeout",
needErrs: []bool{false, false, false, false, true},
@@ -936,8 +940,14 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
require.NoError(t, err)
err = r.client.Create(ctx, dataDownloadBuilder().Result())
require.NoError(t, err)
updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) {
updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) bool {
if tc.noChange {
return false
}
dataDownload.Spec.Cancel = true
return true
}
err = UpdateDataDownloadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc)
if tc.ExpectErr {
@@ -949,136 +959,115 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
}
}
func TestFindDataDownloads(t *testing.T) {
tests := []struct {
name string
pod corev1.Pod
du *velerov2alpha1api.DataDownload
expectedUploads []velerov2alpha1api.DataDownload
expectedError bool
}{
// Test case 1: Pod with matching nodeName and DataDownload label
{
name: "MatchingPod",
pod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pod-1",
Labels: map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
},
},
Spec: corev1.PodSpec{
NodeName: "node-1",
},
},
du: dataDownloadBuilder().Result(),
expectedUploads: []velerov2alpha1api.DataDownload{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: dataDownloadName,
},
},
},
expectedError: false,
},
// Test case 2: Pod with non-matching nodeName
{
name: "NonMatchingNodePod",
pod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pod-2",
Labels: map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
},
},
Spec: corev1.PodSpec{
NodeName: "node-2",
},
},
du: dataDownloadBuilder().Result(),
expectedUploads: []velerov2alpha1api.DataDownload{},
expectedError: false,
},
}
type ddResumeTestHelper struct {
resumeErr error
getExposeErr error
exposeResult *exposer.ExposeResult
asyncBR datapath.AsyncBR
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r, err := initDataDownloadReconcilerWithError(nil)
require.NoError(t, err)
r.nodeName = "node-1"
err = r.client.Create(ctx, test.du)
require.NoError(t, err)
err = r.client.Create(ctx, &test.pod)
require.NoError(t, err)
uploads, err := r.FindDataDownloads(context.Background(), r.client, "velero")
func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error {
return dt.resumeErr
}
if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, len(test.expectedUploads), len(uploads))
}
})
}
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error {
return nil
}
func (dt *ddResumeTestHelper) GetExposed(context.Context, corev1.ObjectReference, kbclient.Client, string, time.Duration) (*exposer.ExposeResult, error) {
return dt.exposeResult, dt.getExposeErr
}
func (dt *ddResumeTestHelper) PeekExposed(context.Context, corev1.ObjectReference) error {
return nil
}
func (dt *ddResumeTestHelper) RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error {
return nil
}
func (dt *ddResumeTestHelper) CleanUp(context.Context, corev1.ObjectReference) {}
func (dt *ddResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string,
datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
return dt.asyncBR
}
func TestAttemptDataDownloadResume(t *testing.T) {
tests := []struct {
name string
dataUploads []velerov2alpha1api.DataDownload
du *velerov2alpha1api.DataDownload
pod *corev1.Pod
needErrs []bool
acceptedDataDownloads []string
prepareddDataDownloads []string
cancelledDataDownloads []string
expectedError bool
name string
dataUploads []velerov2alpha1api.DataDownload
dd *velerov2alpha1api.DataDownload
needErrs []bool
resumeErr error
acceptedDataDownloads []string
prepareddDataDownloads []string
cancelledDataDownloads []string
inProgressDataDownloads []string
expectedError string
}{
// Test case 1: Process Accepted DataDownload
{
name: "AcceptedDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
name: "accepted DataDownload with no dd label",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
cancelledDataDownloads: []string{dataDownloadName},
acceptedDataDownloads: []string{dataDownloadName},
},
{
name: "accepted DataDownload in the current node",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(),
cancelledDataDownloads: []string{dataDownloadName},
acceptedDataDownloads: []string{dataDownloadName},
},
{
name: "accepted DataDownload with dd label but is canceled",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Cancel(true).Labels(map[string]string{
acceptNodeLabelKey: "node-1",
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
acceptedDataDownloads: []string{dataDownloadName},
cancelledDataDownloads: []string{dataDownloadName},
},
{
name: "accepted DataDownload with dd label but cancel fail",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Labels(map[string]string{
acceptNodeLabelKey: "node-1",
}).Result(),
needErrs: []bool{false, false, true, false, false, false},
acceptedDataDownloads: []string{dataDownloadName},
expectedError: false,
},
// Test case 2: Cancel an Accepted DataDownload
{
name: "CancelAcceptedDataDownload",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
},
// Test case 3: Process Accepted Prepared DataDownload
{
name: "PreparedDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
name: "prepared DataDownload",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
},
// Test case 4: Process Accepted InProgress DataDownload
{
name: "InProgressDataDownload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
prepareddDataDownloads: []string{dataDownloadName},
name: "InProgress DataDownload, not the current node",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
inProgressDataDownloads: []string{dataDownloadName},
},
// Test case 5: get resume error
{
name: "ResumeError",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataDownloadLabel: dataDownloadName,
}).Result(),
name: "InProgress DataDownload, no resume error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(),
inProgressDataDownloads: []string{dataDownloadName},
},
{
name: "InProgress DataDownload, resume error, cancel error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(),
resumeErr: errors.New("fake-resume-error"),
needErrs: []bool{false, false, true, false, false, false},
inProgressDataDownloads: []string{dataDownloadName},
},
{
name: "InProgress DataDownload, resume error, cancel succeed",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(),
resumeErr: errors.New("fake-resume-error"),
cancelledDataDownloads: []string{dataDownloadName},
inProgressDataDownloads: []string{dataDownloadName},
},
{
name: "Error",
needErrs: []bool{false, false, false, false, false, true},
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
expectedError: true,
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
expectedError: "error to list datadownloads: List error",
},
}
@@ -1089,30 +1078,31 @@ func TestAttemptDataDownloadResume(t *testing.T) {
r.nodeName = "node-1"
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
if test.pod != nil {
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
}
r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{})
}()
assert.NoError(t, r.client.Create(ctx, test.du))
if test.pod != nil {
assert.NoError(t, r.client.Create(ctx, test.pod))
}
// Run the test
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
assert.NoError(t, r.client.Create(ctx, test.dd))
if test.expectedError {
assert.Error(t, err)
dt := &duResumeTestHelper{
resumeErr: test.resumeErr,
}
funcResumeCancellableDataBackup = dt.resumeCancellableDataPath
// Run the test
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace)
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
} else {
assert.NoError(t, err)
// Verify DataDownload marked as Canceled
for _, duName := range test.cancelledDataDownloads {
dataUpload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
dataDownload := &velerov2alpha1api.DataDownload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataDownload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase)
assert.True(t, dataDownload.Spec.Cancel)
}
// Verify DataDownload marked as Accepted
for _, duName := range test.acceptedDataDownloads {
@@ -1132,3 +1122,108 @@ func TestAttemptDataDownloadResume(t *testing.T) {
})
}
}
func TestResumeCancellableRestore(t *testing.T) {
tests := []struct {
name string
dataDownloads []velerov2alpha1api.DataDownload
dd *velerov2alpha1api.DataDownload
getExposeErr error
exposeResult *exposer.ExposeResult
createWatcherErr error
initWatcherErr error
startWatcherErr error
mockInit bool
mockStart bool
mockClose bool
expectedError string
}{
{
name: "get expose failed",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
getExposeErr: errors.New("fake-expose-error"),
expectedError: fmt.Sprintf("error to get exposed volume for dd %s: fake-expose-error", dataDownloadName),
},
{
name: "no expose",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
expectedError: fmt.Sprintf("expose info missed for dd %s", dataDownloadName),
},
{
name: "watcher init error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
exposeResult: &exposer.ExposeResult{
ByPod: exposer.ExposeByPod{
HostingPod: &corev1.Pod{},
},
},
mockInit: true,
mockClose: true,
initWatcherErr: errors.New("fake-init-watcher-error"),
expectedError: fmt.Sprintf("error to init asyncBR watcher for dd %s: fake-init-watcher-error", dataDownloadName),
},
{
name: "start watcher error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
exposeResult: &exposer.ExposeResult{
ByPod: exposer.ExposeByPod{
HostingPod: &corev1.Pod{},
},
},
mockInit: true,
mockStart: true,
mockClose: true,
startWatcherErr: errors.New("fake-start-watcher-error"),
expectedError: fmt.Sprintf("error to resume asyncBR watcher for dd %s: fake-start-watcher-error", dataDownloadName),
},
{
name: "succeed",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
exposeResult: &exposer.ExposeResult{
ByPod: exposer.ExposeByPod{
HostingPod: &corev1.Pod{},
},
},
mockInit: true,
mockStart: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
r, err := initDataDownloadReconciler(nil, false)
r.nodeName = "node-1"
require.NoError(t, err)
mockAsyncBR := datapathmockes.NewAsyncBR(t)
if test.mockInit {
mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr)
}
if test.mockStart {
mockAsyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr)
}
if test.mockClose {
mockAsyncBR.On("Close", mock.Anything).Return()
}
dt := &ddResumeTestHelper{
getExposeErr: test.getExposeErr,
exposeResult: test.exposeResult,
asyncBR: mockAsyncBR,
}
r.restoreExposer = dt
datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher
err = r.resumeCancellableDataPath(ctx, test.dd, velerotest.NewLogger())
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
}
})
}
}

View File

@@ -153,9 +153,17 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
} else if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) && !du.Spec.Cancel && !isDataUploadInFinalState(du) {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) {
log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase)
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", du.Namespace, du.Name)
dataUpload.Status.Message = "Cancel dataupload because it is being deleted"
return true
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), du.Namespace, du.Name)
return ctrl.Result{}, err
@@ -600,9 +608,15 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
dataUpload.Status.Message = fmt.Sprintf("Cancel dataupload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
return true
})
if err != nil {
@@ -623,75 +637,6 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
return []reconcile.Request{request}
}
func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
pods := &corev1.PodList{}
var dataUploads []velerov2alpha1api.DataUpload
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return nil, errors.Wrapf(err, "failed to list pods on current node")
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != r.nodeName {
r.logger.Debugf("Pod %s related data upload will not handled by %s nodes", pod.GetName(), r.nodeName)
continue
}
du, err := findDataUploadByPod(cli, pod)
if err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataUpload by pod")
continue
} else if du != nil {
dataUploads = append(dataUploads, *du)
}
}
return dataUploads, nil
}
func (r *DataUploadReconciler) findAcceptDataUploadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
dataUploads := &velerov2alpha1api.DataUploadList{}
if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return nil, errors.Wrapf(err, "failed to list datauploads")
}
var result []velerov2alpha1api.DataUpload
for _, du := range dataUploads.Items {
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
continue
}
if du.Labels[acceptNodeLabelKey] == r.nodeName {
result = append(result, du)
}
}
return result, nil
}
func (r *DataUploadReconciler) CancelAcceptedDataupload(ctx context.Context, cli client.Client, ns string) {
r.logger.Infof("Reset accepted dataupload for node %s", r.nodeName)
dataUploads, err := r.findAcceptDataUploadsByNodeLabel(ctx, cli, ns)
if err != nil {
r.logger.WithError(err).Error("failed to find dataupload")
return
}
for _, du := range dataUploads {
if du.Spec.Cancel {
continue
}
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
})
r.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
if err != nil {
r.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
}
}
func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
du.Status.Node = r.nodeName
@@ -903,54 +848,145 @@ func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool {
du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted
}
func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataUpload *velerov2alpha1api.DataUpload)) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataUpload) bool) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
du := &velerov2alpha1api.DataUpload{}
if err := client.Get(ctx, namespacedName, du); err != nil {
return false, errors.Wrap(err, "getting DataUpload")
}
updateFunc(du)
updateErr := client.Update(ctx, du)
if updateErr != nil {
if apierrors.IsConflict(updateErr) {
log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name)
return false, nil
if updateFunc(du) {
err := client.Update(ctx, du)
if err != nil {
if apierrors.IsConflict(err) {
log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name)
return false, nil
} else {
return false, errors.Wrapf(err, "error updating dataupload with error %s/%s", du.Namespace, du.Name)
}
}
log.Errorf("failed to update dataupload with error %s for %s/%s", updateErr.Error(), du.Namespace, du.Name)
return false, err
}
return true, nil
})
}
func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
if dataUploads, err := r.FindDataUploadsByPod(ctx, cli, ns); err != nil {
return errors.Wrap(err, "failed to find data uploads")
} else {
for _, du := range dataUploads {
if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by dataupload controller after node-agent restart
logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared")
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
})
var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath
if err != nil {
logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q into canceled", du.GetName())
continue
}
logger.WithField("dataupload", du.GetName()).Debug("mark dataupload into canceled")
func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
dataUploads := &velerov2alpha1api.DataUploadList{}
if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return errors.Wrapf(err, "error to list datauploads")
}
for i := range dataUploads.Items {
du := &dataUploads.Items[i]
if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by dataupload controller after node-agent restart
logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared")
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
if du.Status.Node != r.nodeName {
logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node)
continue
}
err := funcResumeCancellableDataBackup(r, ctx, du, logger)
if err == nil {
continue
}
logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it")
resumeErr := err
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("Resume InProgress dataupload failed with error %v, mark it as cancel", resumeErr)
return true
})
if err != nil {
logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
}
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase")
err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
}
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = "Dataupload is in Accepted status during the node-agent starting, mark it as cancel"
return true
})
if err != nil {
r.logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
}
}
}
//If the data upload is in Accepted status, the volume snapshot may be deleted and the exposed pod may not be created
// so we need to mark the data upload as canceled for it may not be recoverable
r.CancelAcceptedDataupload(ctx, cli, ns)
return nil
}
func (r *DataUploadReconciler) resumeCancellableDataPath(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) error {
log.Info("Resume cancelable dataUpload")
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
if !ok {
return errors.Errorf("error to find exposer for du %s", du.Name)
}
waitExposePara := r.setupWaitExposePara(du)
res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara)
if err != nil {
return errors.Wrapf(err, "error to get exposed snapshot for du %s", du.Name)
}
if res == nil {
return errors.Errorf("expose info missed for du %s", du.Name)
}
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataUploadCompleted,
OnFailed: r.OnDataUploadFailed,
OnCancelled: r.OnDataUploadCancelled,
OnProgress: r.OnDataUploadProgress,
}
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, true, log)
if err != nil {
return errors.Wrapf(err, "error to create asyncBR watcher for du %s", du.Name)
}
resumeComplete := false
defer func() {
if !resumeComplete {
r.closeDataPath(ctx, du.Name)
}
}()
if err := asyncBR.Init(ctx, nil); err != nil {
return errors.Wrapf(err, "error to init asyncBR watcher for du %s", du.Name)
}
if err := asyncBR.StartBackup(datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, du.Spec.DataMoverConfig, nil); err != nil {
return errors.Wrapf(err, "error to resume asyncBR watcher for du %s", du.Name)
}
resumeComplete = true
log.Infof("asyncBR is resumed for du %s", du.Name)
return nil
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -51,6 +52,7 @@ import (
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
datapathmocks "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
@@ -966,12 +968,11 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
testCases := []struct {
Name string
needErrs []bool
noChange bool
ExpectErr bool
}{
{
Name: "SuccessOnFirstAttempt",
needErrs: []bool{false, false, false, false},
ExpectErr: false,
Name: "SuccessOnFirstAttempt",
},
{
Name: "Error get",
@@ -983,6 +984,11 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
needErrs: []bool{false, false, true, false, false},
ExpectErr: true,
},
{
Name: "no change",
noChange: true,
needErrs: []bool{false, false, true, false, false},
},
{
Name: "Conflict with error timeout",
needErrs: []bool{false, false, false, false, true},
@@ -998,8 +1004,13 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
require.NoError(t, err)
err = r.client.Create(ctx, dataUploadBuilder().Result())
require.NoError(t, err)
updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) {
updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) bool {
if tc.noChange {
return false
}
dataDownload.Spec.Cancel = true
return true
}
err = UpdateDataUploadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc)
if tc.ExpectErr {
@@ -1011,135 +1022,107 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
}
}
func TestFindDataUploads(t *testing.T) {
tests := []struct {
name string
pod corev1.Pod
du *velerov2alpha1api.DataUpload
expectedUploads []velerov2alpha1api.DataUpload
expectedError bool
}{
// Test case 1: Pod with matching nodeName and DataUpload label
{
name: "MatchingPod",
pod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pod-1",
Labels: map[string]string{
velerov1api.DataUploadLabel: dataUploadName,
},
},
Spec: corev1.PodSpec{
NodeName: "node-1",
},
},
du: dataUploadBuilder().Result(),
expectedUploads: []velerov2alpha1api.DataUpload{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: dataUploadName,
},
},
},
expectedError: false,
},
// Test case 2: Pod with non-matching nodeName
{
name: "NonMatchingNodePod",
pod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pod-2",
Labels: map[string]string{
velerov1api.DataUploadLabel: dataUploadName,
},
},
Spec: corev1.PodSpec{
NodeName: "node-2",
},
},
du: dataUploadBuilder().Result(),
expectedUploads: []velerov2alpha1api.DataUpload{},
expectedError: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r, err := initDataUploaderReconcilerWithError()
require.NoError(t, err)
r.nodeName = "node-1"
err = r.client.Create(ctx, test.du)
require.NoError(t, err)
err = r.client.Create(ctx, &test.pod)
require.NoError(t, err)
uploads, err := r.FindDataUploadsByPod(context.Background(), r.client, "velero")
if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, len(test.expectedUploads), len(uploads))
}
})
}
type duResumeTestHelper struct {
resumeErr error
getExposeErr error
exposeResult *exposer.ExposeResult
asyncBR datapath.AsyncBR
}
func (dt *duResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error {
return dt.resumeErr
}
func (dt *duResumeTestHelper) Expose(context.Context, corev1.ObjectReference, interface{}) error {
return nil
}
func (dt *duResumeTestHelper) GetExposed(context.Context, corev1.ObjectReference, time.Duration, interface{}) (*exposer.ExposeResult, error) {
return dt.exposeResult, dt.getExposeErr
}
func (dt *duResumeTestHelper) PeekExposed(context.Context, corev1.ObjectReference) error {
return nil
}
func (dt *duResumeTestHelper) CleanUp(context.Context, corev1.ObjectReference, string, string) {}
func (dt *duResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string,
datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
return dt.asyncBR
}
func TestAttemptDataUploadResume(t *testing.T) {
tests := []struct {
name string
dataUploads []velerov2alpha1api.DataUpload
du *velerov2alpha1api.DataUpload
pod *corev1.Pod
needErrs []bool
acceptedDataUploads []string
prepareddDataUploads []string
cancelledDataUploads []string
expectedError bool
name string
dataUploads []velerov2alpha1api.DataUpload
du *velerov2alpha1api.DataUpload
needErrs []bool
acceptedDataUploads []string
prepareddDataUploads []string
cancelledDataUploads []string
inProgressDataUploads []string
resumeErr error
expectedError string
}{
// Test case 1: Process Accepted DataUpload
{
name: "AcceptedDataUpload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataUploadLabel: dataUploadName,
}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
name: "accepted DataUpload in other node",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
cancelledDataUploads: []string{dataUploadName},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "accepted DataUpload in the current node",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(),
cancelledDataUploads: []string{dataUploadName},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "accepted DataUpload in the current node but canceled",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Cancel(true).Result(),
cancelledDataUploads: []string{dataUploadName},
acceptedDataUploads: []string{dataUploadName},
},
{
name: "accepted DataUpload in the current node but update error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(),
needErrs: []bool{false, false, true, false, false, false},
acceptedDataUploads: []string{dataUploadName},
expectedError: false,
},
// Test case 2: Cancel an Accepted DataUpload
{
name: "CancelAcceptedDataUpload",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
},
// Test case 3: Process Accepted Prepared DataUpload
{
name: "PreparedDataUpload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataUploadLabel: dataUploadName,
}).Result(),
name: "prepared DataUpload",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
prepareddDataUploads: []string{dataUploadName},
},
// Test case 4: Process Accepted InProgress DataUpload
{
name: "InProgressDataUpload",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataUploadLabel: dataUploadName,
}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
prepareddDataUploads: []string{dataUploadName},
name: "InProgress DataUpload, not the current node",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
inProgressDataUploads: []string{dataUploadName},
},
// Test case 5: get resume error
{
name: "ResumeError",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
velerov1api.DataUploadLabel: dataUploadName,
}).Result(),
name: "InProgress DataUpload, resume error and update error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(),
needErrs: []bool{false, false, true, false, false, false},
resumeErr: errors.New("fake-resume-error"),
inProgressDataUploads: []string{dataUploadName},
},
{
name: "InProgress DataUpload, resume error and update succeed",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(),
resumeErr: errors.New("fake-resume-error"),
cancelledDataUploads: []string{dataUploadName},
inProgressDataUploads: []string{dataUploadName},
},
{
name: "InProgress DataUpload and resume succeed",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(),
inProgressDataUploads: []string{dataUploadName},
},
{
name: "Error",
needErrs: []bool{false, false, false, false, false, true},
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedError: true,
expectedError: "error to list datauploads: List error",
},
}
@@ -1149,22 +1132,20 @@ func TestAttemptDataUploadResume(t *testing.T) {
r, err := initDataUploaderReconciler(test.needErrs...)
r.nodeName = "node-1"
require.NoError(t, err)
defer func() {
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
if test.pod != nil {
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
}
}()
assert.NoError(t, r.client.Create(ctx, test.du))
if test.pod != nil {
assert.NoError(t, r.client.Create(ctx, test.pod))
dt := &duResumeTestHelper{
resumeErr: test.resumeErr,
}
funcResumeCancellableDataBackup = dt.resumeCancellableDataPath
// Run the test
err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
if test.expectedError {
assert.Error(t, err)
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
} else {
assert.NoError(t, err)
@@ -1173,7 +1154,7 @@ func TestAttemptDataUploadResume(t *testing.T) {
dataUpload := &velerov2alpha1api.DataUpload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, dataUpload.Status.Phase)
assert.True(t, dataUpload.Spec.Cancel)
}
// Verify DataUploads marked as Accepted
for _, duName := range test.acceptedDataUploads {
@@ -1189,6 +1170,123 @@ func TestAttemptDataUploadResume(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataUploadPhasePrepared, dataUpload.Status.Phase)
}
// Verify DataUploads marked as InProgress
for _, duName := range test.inProgressDataUploads {
dataUpload := &velerov2alpha1api.DataUpload{}
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
require.NoError(t, err)
assert.Equal(t, velerov2alpha1api.DataUploadPhaseInProgress, dataUpload.Status.Phase)
}
}
})
}
}
func TestResumeCancellableBackup(t *testing.T) {
tests := []struct {
name string
dataUploads []velerov2alpha1api.DataUpload
du *velerov2alpha1api.DataUpload
getExposeErr error
exposeResult *exposer.ExposeResult
createWatcherErr error
initWatcherErr error
startWatcherErr error
mockInit bool
mockStart bool
mockClose bool
expectedError string
}{
{
name: "not find exposer",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType("").Result(),
expectedError: fmt.Sprintf("error to find exposer for du %s", dataUploadName),
},
{
name: "get expose failed",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(velerov2alpha1api.SnapshotTypeCSI).Result(),
getExposeErr: errors.New("fake-expose-error"),
expectedError: fmt.Sprintf("error to get exposed snapshot for du %s: fake-expose-error", dataUploadName),
},
{
name: "no expose",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
expectedError: fmt.Sprintf("expose info missed for du %s", dataUploadName),
},
{
name: "watcher init error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
exposeResult: &exposer.ExposeResult{
ByPod: exposer.ExposeByPod{
HostingPod: &corev1.Pod{},
},
},
mockInit: true,
mockClose: true,
initWatcherErr: errors.New("fake-init-watcher-error"),
expectedError: fmt.Sprintf("error to init asyncBR watcher for du %s: fake-init-watcher-error", dataUploadName),
},
{
name: "start watcher error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
exposeResult: &exposer.ExposeResult{
ByPod: exposer.ExposeByPod{
HostingPod: &corev1.Pod{},
},
},
mockInit: true,
mockStart: true,
mockClose: true,
startWatcherErr: errors.New("fake-start-watcher-error"),
expectedError: fmt.Sprintf("error to resume asyncBR watcher for du %s: fake-start-watcher-error", dataUploadName),
},
{
name: "succeed",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
exposeResult: &exposer.ExposeResult{
ByPod: exposer.ExposeByPod{
HostingPod: &corev1.Pod{},
},
},
mockInit: true,
mockStart: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.TODO()
r, err := initDataUploaderReconciler()
r.nodeName = "node-1"
require.NoError(t, err)
mockAsyncBR := datapathmocks.NewAsyncBR(t)
if test.mockInit {
mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr)
}
if test.mockStart {
mockAsyncBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr)
}
if test.mockClose {
mockAsyncBR.On("Close", mock.Anything).Return()
}
dt := &duResumeTestHelper{
getExposeErr: test.getExposeErr,
exposeResult: test.exposeResult,
asyncBR: mockAsyncBR,
}
r.snapshotExposerList[velerov2alpha1api.SnapshotTypeCSI] = dt
datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher
err = r.resumeCancellableDataPath(ctx, test.du, velerotest.NewLogger())
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
}
})
}

View File

@@ -36,6 +36,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
uploaderutil "github.com/vmware-tanzu/velero/pkg/uploader/util"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
@@ -163,10 +164,13 @@ func (b *backupper) getMatchAction(resPolicies *resourcepolicies.Policies, pvc *
return nil, errors.Errorf("failed to check resource policies for empty volume")
}
var funcGetRepositoryType = getRepositoryType
func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) {
if len(volumesToBackup) == 0 {
return nil, nil, nil
}
log.Infof("pod %s/%s has volumes to backup: %v", pod.Namespace, pod.Name, volumesToBackup)
var (
@@ -189,6 +193,13 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
}
}
if msg, err := uploader.ValidateUploaderType(b.uploaderType); err != nil {
skipAllPodVolumes(pod, volumesToBackup, err, pvcSummary, log)
return nil, pvcSummary, []error{err}
} else if msg != "" {
log.Warn(msg)
}
if err := kube.IsPodRunning(pod); err != nil {
skipAllPodVolumes(pod, volumesToBackup, err, pvcSummary, log)
return nil, pvcSummary, nil
@@ -196,18 +207,21 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
err := nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.crClient)
if err != nil {
return nil, nil, []error{err}
skipAllPodVolumes(pod, volumesToBackup, err, pvcSummary, log)
return nil, pvcSummary, []error{err}
}
repositoryType := getRepositoryType(b.uploaderType)
repositoryType := funcGetRepositoryType(b.uploaderType)
if repositoryType == "" {
err := errors.Errorf("empty repository type, uploader %s", b.uploaderType)
return nil, nil, []error{err}
skipAllPodVolumes(pod, volumesToBackup, err, pvcSummary, log)
return nil, pvcSummary, []error{err}
}
repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace, backup.Spec.StorageLocation, repositoryType)
if err != nil {
return nil, nil, []error{err}
skipAllPodVolumes(pod, volumesToBackup, err, pvcSummary, log)
return nil, pvcSummary, []error{err}
}
// get a single non-exclusive lock since we'll wait for all individual

View File

@@ -309,22 +309,38 @@ func TestBackupPodVolumes(t *testing.T) {
corev1api.AddToScheme(scheme)
tests := []struct {
name string
bsl string
uploaderType string
volumes []string
sourcePod *corev1api.Pod
kubeClientObj []runtime.Object
ctlClientObj []runtime.Object
veleroClientObj []runtime.Object
veleroReactors []reactor
runtimeScheme *runtime.Scheme
pvbs int
errs []string
name string
bsl string
uploaderType string
volumes []string
sourcePod *corev1api.Pod
kubeClientObj []runtime.Object
ctlClientObj []runtime.Object
veleroClientObj []runtime.Object
veleroReactors []reactor
runtimeScheme *runtime.Scheme
pvbs int
mockGetRepositoryType bool
errs []string
}{
{
name: "empty volume list",
},
{
name: "wrong uploader type",
volumes: []string{
"fake-volume-1",
"fake-volume-2",
},
sourcePod: createPodObj(true, false, false, 2),
kubeClientObj: []runtime.Object{
createNodeAgentPodObj(true),
},
uploaderType: "fake-uploader-type",
errs: []string{
"invalid uploader type 'fake-uploader-type', valid upload types are: 'restic', 'kopia'",
},
},
{
name: "pod is not running",
volumes: []string{
@@ -348,7 +364,8 @@ func TestBackupPodVolumes(t *testing.T) {
"fake-volume-1",
"fake-volume-2",
},
sourcePod: createPodObj(true, false, false, 2),
sourcePod: createPodObj(true, false, false, 2),
uploaderType: "kopia",
errs: []string{
"daemonset pod not found in running state in node fake-node-name",
},
@@ -363,9 +380,10 @@ func TestBackupPodVolumes(t *testing.T) {
kubeClientObj: []runtime.Object{
createNodeAgentPodObj(true),
},
uploaderType: "fake-uploader-type",
uploaderType: "kopia",
mockGetRepositoryType: true,
errs: []string{
"empty repository type, uploader fake-uploader-type",
"empty repository type, uploader kopia",
},
},
{
@@ -542,6 +560,12 @@ func TestBackupPodVolumes(t *testing.T) {
require.NoError(t, err)
if test.mockGetRepositoryType {
funcGetRepositoryType = func(string) string { return "" }
} else {
funcGetRepositoryType = getRepositoryType
}
pvbs, _, errs := bp.BackupPodVolumes(backupObj, test.sourcePod, test.volumes, nil, velerotest.NewLogger())
if errs == nil {

View File

@@ -53,7 +53,7 @@ var getGCPCredentials = repoconfig.GetGCPCredentials
var getS3BucketRegion = repoconfig.GetAWSBucketRegion
type localFuncTable struct {
getStorageVariables func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error)
getStorageVariables func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error)
getStorageCredentials func(*velerov1api.BackupStorageLocation, credentials.FileStore) (map[string]string, error)
}
@@ -397,7 +397,7 @@ func (urp *unifiedRepoProvider) GetStoreOptions(param interface{}) (map[string]s
return map[string]string{}, errors.Errorf("invalid parameter, expect %T, actual %T", RepoParam{}, param)
}
storeVar, err := funcTable.getStorageVariables(repoParam.BackupLocation, urp.repoBackend, repoParam.BackupRepo.Spec.VolumeNamespace)
storeVar, err := funcTable.getStorageVariables(repoParam.BackupLocation, urp.repoBackend, repoParam.BackupRepo.Spec.VolumeNamespace, repoParam.BackupRepo.Spec.RepositoryConfig)
if err != nil {
return map[string]string{}, errors.Wrap(err, "error to get storage variables")
}
@@ -498,7 +498,7 @@ func getStorageCredentials(backupLocation *velerov1api.BackupStorageLocation, cr
return result, nil
}
func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repoBackend string, repoName string) (map[string]string, error) {
func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repoBackend string, repoName string, backupRepoConfig map[string]string) (map[string]string, error) {
result := make(map[string]string)
backendType := repoconfig.GetBackendType(backupLocation.Spec.Provider, backupLocation.Spec.Config)
@@ -568,6 +568,12 @@ func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repo
result[udmrepo.StoreOptionOssRegion] = strings.Trim(region, "/")
result[udmrepo.StoreOptionFsPath] = config["fspath"]
if backupRepoConfig != nil {
if v, found := backupRepoConfig[udmrepo.StoreOptionCacheLimit]; found {
result[udmrepo.StoreOptionCacheLimit] = v
}
}
return result, nil
}

View File

@@ -221,6 +221,7 @@ func TestGetStorageVariables(t *testing.T) {
credFileStore *credmock.FileStore
repoName string
repoBackend string
repoConfig map[string]string
getS3BucketRegion func(string) (string, error)
expected map[string]string
expectedErr string
@@ -435,13 +436,36 @@ func TestGetStorageVariables(t *testing.T) {
"region": "",
},
},
{
name: "fs with repo config",
backupLocation: velerov1api.BackupStorageLocation{
Spec: velerov1api.BackupStorageLocationSpec{
Provider: "velero.io/fs",
Config: map[string]string{
"fspath": "fake-path",
"prefix": "fake-prefix",
},
},
},
repoBackend: "fake-repo-type",
repoConfig: map[string]string{
udmrepo.StoreOptionCacheLimit: "1000",
},
expected: map[string]string{
"fspath": "fake-path",
"bucket": "",
"prefix": "fake-prefix/fake-repo-type/",
"region": "",
"cacheLimitMB": "1000",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
getS3BucketRegion = tc.getS3BucketRegion
actual, err := getStorageVariables(&tc.backupLocation, tc.repoBackend, tc.repoName)
actual, err := getStorageVariables(&tc.backupLocation, tc.repoBackend, tc.repoName, tc.repoConfig)
require.Equal(t, tc.expected, actual)
@@ -530,7 +554,7 @@ func TestGetStoreOptions(t *testing.T) {
BackupRepo: &velerov1api.BackupRepository{},
},
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, errors.New("fake-error-2")
},
},
@@ -544,7 +568,7 @@ func TestGetStoreOptions(t *testing.T) {
BackupRepo: &velerov1api.BackupRepository{},
},
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -604,7 +628,7 @@ func TestPrepareRepo(t *testing.T) {
repoService: new(reposervicenmocks.BackupRepoService),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, errors.New("fake-store-option-error")
},
},
@@ -615,7 +639,7 @@ func TestPrepareRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -635,7 +659,7 @@ func TestPrepareRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -656,7 +680,7 @@ func TestPrepareRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -733,7 +757,7 @@ func TestForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -757,7 +781,7 @@ func TestForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -785,7 +809,7 @@ func TestForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -883,7 +907,7 @@ func TestBatchForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -907,7 +931,7 @@ func TestBatchForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -936,7 +960,7 @@ func TestBatchForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1032,7 +1056,7 @@ func TestInitRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1050,7 +1074,7 @@ func TestInitRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1120,7 +1144,7 @@ func TestConnectToRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1138,7 +1162,7 @@ func TestConnectToRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1212,7 +1236,7 @@ func TestBoostRepoConnect(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1239,7 +1263,7 @@ func TestBoostRepoConnect(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1265,7 +1289,7 @@ func TestBoostRepoConnect(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1352,7 +1376,7 @@ func TestPruneRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -1370,7 +1394,7 @@ func TestPruneRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string, map[string]string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {

View File

@@ -33,8 +33,7 @@ import (
)
const (
maxDataCacheMB = 2000
maxMetadataCacheMB = 2000
defaultCacheLimitMB = 5000
maxCacheDurationSecond = 30
)
@@ -67,11 +66,21 @@ func SetupNewRepositoryOptions(ctx context.Context, flags map[string]string) rep
// SetupConnectOptions setups the options when connecting to an existing Kopia repository
func SetupConnectOptions(ctx context.Context, repoOptions udmrepo.RepoOptions) repo.ConnectOptions {
cacheLimit := optionalHaveIntWithDefault(ctx, udmrepo.StoreOptionCacheLimit, repoOptions.StorageOptions, defaultCacheLimitMB) << 20
// 80% for data cache and 20% for metadata cache and align to KB
dataCacheLimit := (cacheLimit / 5 * 4) >> 10
metadataCacheLimit := (cacheLimit / 5) >> 10
return repo.ConnectOptions{
CachingOptions: content.CachingOptions{
ContentCacheSizeBytes: maxDataCacheMB << 20,
MetadataCacheSizeBytes: maxMetadataCacheMB << 20,
MaxListCacheDuration: content.DurationSeconds(time.Duration(maxCacheDurationSecond) * time.Second),
// softLimit 80%
ContentCacheSizeBytes: (dataCacheLimit / 5 * 4) << 10,
MetadataCacheSizeBytes: (metadataCacheLimit / 5 * 4) << 10,
// hardLimit 100%
ContentCacheSizeLimitBytes: dataCacheLimit << 10,
MetadataCacheSizeLimitBytes: metadataCacheLimit << 10,
MaxListCacheDuration: content.DurationSeconds(time.Duration(maxCacheDurationSecond) * time.Second),
},
ClientOptions: repo.ClientOptions{
Hostname: optionalHaveString(udmrepo.GenOptionOwnerDomain, repoOptions.GeneralOptions),

View File

@@ -111,9 +111,11 @@ func TestSetupNewRepositoryOptions(t *testing.T) {
func TestSetupConnectOptions(t *testing.T) {
defaultCacheOption := content.CachingOptions{
ContentCacheSizeBytes: 2000 << 20,
MetadataCacheSizeBytes: 2000 << 20,
MaxListCacheDuration: content.DurationSeconds(time.Duration(30) * time.Second),
ContentCacheSizeBytes: 3200 << 20,
MetadataCacheSizeBytes: 800 << 20,
ContentCacheSizeLimitBytes: 4000 << 20,
MetadataCacheSizeLimitBytes: 1000 << 20,
MaxListCacheDuration: content.DurationSeconds(time.Duration(30) * time.Second),
}
testCases := []struct {

View File

@@ -98,6 +98,21 @@ func optionalHaveBase64(ctx context.Context, key string, flags map[string]string
return nil
}
func optionalHaveIntWithDefault(ctx context.Context, key string, flags map[string]string, defValue int64) int64 {
if value, exist := flags[key]; exist {
if value != "" {
ret, err := strconv.ParseInt(value, 10, 64)
if err == nil {
return ret
}
backendLog()(ctx).Errorf("Ignore %s, value [%s] is invalid, err %v", key, value, err)
}
}
return defValue
}
func backendLog() func(ctx context.Context) logging.Logger {
return logging.Module("kopialib-bd")
}

View File

@@ -90,3 +90,68 @@ func TestOptionalHaveBool(t *testing.T) {
})
}
}
func TestOptionalHaveIntWithDefault(t *testing.T) {
var expectMsg string
testCases := []struct {
name string
key string
flags map[string]string
defaultValue int64
logger *storagemocks.Core
retFuncCheck func(mock.Arguments)
expectMsg string
retValue int64
}{
{
name: "key not exist",
key: "fake-key",
flags: map[string]string{},
defaultValue: 2000,
retValue: 2000,
},
{
name: "value valid",
key: "fake-key",
flags: map[string]string{
"fake-key": "1000",
},
retValue: 1000,
},
{
name: "value invalid",
key: "fake-key",
flags: map[string]string{
"fake-key": "fake-value",
},
logger: new(storagemocks.Core),
retFuncCheck: func(args mock.Arguments) {
ent := args[0].(zapcore.Entry)
if ent.Level == zapcore.ErrorLevel {
expectMsg = ent.Message
}
},
expectMsg: "Ignore fake-key, value [fake-value] is invalid, err strconv.ParseInt: parsing \"fake-value\": invalid syntax",
defaultValue: 2000,
retValue: 2000,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.logger != nil {
tc.logger.On("Enabled", mock.Anything).Return(true)
tc.logger.On("Check", mock.Anything, mock.Anything).Run(tc.retFuncCheck).Return(&zapcore.CheckedEntry{})
}
ctx := logging.WithLogger(context.Background(), func(module string) logging.Logger {
return zap.New(tc.logger).Sugar()
})
retValue := optionalHaveIntWithDefault(ctx, tc.key, tc.flags, tc.defaultValue)
require.Equal(t, retValue, tc.retValue)
require.Equal(t, tc.expectMsg, expectMsg)
})
}
}

View File

@@ -63,6 +63,8 @@ const (
StoreOptionGenRetentionPeriod = "retentionPeriod"
StoreOptionGenReadOnly = "readOnly"
StoreOptionCacheLimit = "cacheLimitMB"
ThrottleOptionReadOps = "readOPS"
ThrottleOptionWriteOps = "writeOPS"
ThrottleOptionListOps = "listOPS"

View File

@@ -39,12 +39,17 @@ const (
// ValidateUploaderType validates if the input param is a valid uploader type.
// It will return an error if it's invalid.
func ValidateUploaderType(t string) error {
func ValidateUploaderType(t string) (string, error) {
t = strings.TrimSpace(t)
if t != ResticType && t != KopiaType {
return fmt.Errorf("invalid uploader type '%s', valid upload types are: '%s', '%s'", t, ResticType, KopiaType)
return "", fmt.Errorf("invalid uploader type '%s', valid upload types are: '%s', '%s'", t, ResticType, KopiaType)
}
return nil
if t == ResticType {
return fmt.Sprintf("Uploader '%s' is deprecated, don't use it for new backups, otherwise the backups won't be available for restore when this functionality is removed in a future version of Velero", t), nil
}
return "", nil
}
type SnapshotInfo struct {

View File

@@ -1,34 +1,47 @@
package uploader
import "testing"
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestValidateUploaderType(t *testing.T) {
tests := []struct {
name string
input string
wantErr bool
wantErr string
wantMsg string
}{
{
"'restic' is a valid type",
"restic",
false,
"",
"Uploader 'restic' is deprecated, don't use it for new backups, otherwise the backups won't be available for restore when this functionality is removed in a future version of Velero",
},
{
"' kopia ' is a valid type (space will be trimmed)",
" kopia ",
false,
"",
"",
},
{
"'anything_else' is invalid",
"anything_else",
true,
"invalid uploader type 'anything_else', valid upload types are: 'restic', 'kopia'",
"",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := ValidateUploaderType(tt.input); (err != nil) != tt.wantErr {
t.Errorf("ValidateUploaderType(), input = '%s' error = %v, wantErr %v", tt.input, err, tt.wantErr)
msg, err := ValidateUploaderType(tt.input)
if tt.wantErr != "" {
assert.EqualError(t, err, tt.wantErr)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.wantMsg, msg)
})
}
}