From a523d10802a00122104c4f050dc7101e40647f3c Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 5 Aug 2024 19:40:27 +0800 Subject: [PATCH] data mover ms node agent resume Signed-off-by: Lyndon-Li --- changelogs/unreleased/8085-Lyndon-Li | 1 + pkg/builder/data_download_builder.go | 25 ++ pkg/builder/data_upload_builder.go | 7 + pkg/cmd/cli/nodeagent/server.go | 43 +-- pkg/cmd/server/server.go | 20 +- pkg/controller/data_download_controller.go | 247 ++++++------ .../data_download_controller_test.go | 361 +++++++++++------- pkg/controller/data_upload_controller.go | 253 ++++++------ pkg/controller/data_upload_controller_test.go | 358 ++++++++++------- 9 files changed, 806 insertions(+), 509 deletions(-) create mode 100644 changelogs/unreleased/8085-Lyndon-Li diff --git a/changelogs/unreleased/8085-Lyndon-Li b/changelogs/unreleased/8085-Lyndon-Li new file mode 100644 index 000000000..f063cdfc1 --- /dev/null +++ b/changelogs/unreleased/8085-Lyndon-Li @@ -0,0 +1 @@ +According to design #7576, after node-agent restarts, if a DU/DD is in InProgress status, re-capture the data mover ms pod and continue the execution \ No newline at end of file diff --git a/pkg/builder/data_download_builder.go b/pkg/builder/data_download_builder.go index 9a85c7905..09bd498a0 100644 --- a/pkg/builder/data_download_builder.go +++ b/pkg/builder/data_download_builder.go @@ -19,6 +19,7 @@ package builder import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" ) @@ -116,3 +117,27 @@ func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownlo d.object.Status.StartTimestamp = startTime return d } + +// CompletionTimestamp sets the DataDownload's StartTimestamp. +func (d *DataDownloadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataDownloadBuilder { + d.object.Status.CompletionTimestamp = completionTimestamp + return d +} + +// Labels sets the DataDownload's Labels. +func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder { + d.object.Labels = labels + return d +} + +// Labels sets the DataDownload's Progress. +func (d *DataDownloadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataDownloadBuilder { + d.object.Status.Progress = progress + return d +} + +// Node sets the DataDownload's Node. +func (d *DataDownloadBuilder) Node(node string) *DataDownloadBuilder { + d.object.Status.Node = node + return d +} diff --git a/pkg/builder/data_upload_builder.go b/pkg/builder/data_upload_builder.go index 7ff33dcb0..0bc28b860 100644 --- a/pkg/builder/data_upload_builder.go +++ b/pkg/builder/data_upload_builder.go @@ -133,7 +133,14 @@ func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder return d } +// Labels sets the DataUpload's Progress. func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataUploadBuilder { d.object.Status.Progress = progress return d } + +// Node sets the DataUpload's Node. +func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder { + d.object.Status.Node = node + return d +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 61dd0b006..b6e364523 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -292,18 +292,28 @@ func (s *nodeAgentServer) run() { if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) - s.attemptDataUploadResume(dataUploadReconciler) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) - s.attemptDataDownloadResume(dataDownloadReconciler) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } + go func() { + s.mgr.GetCache().WaitForCacheSync(s.ctx) + + if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") + } + + if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") + } + }() + s.logger.Info("Controllers starting...") if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil { @@ -373,31 +383,6 @@ func (s *nodeAgentServer) markInProgressCRsFailed() { s.markInProgressPVRsFailed(client) } -func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) { - // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here - client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) - if err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to create client") - return - } - if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") - } -} - -func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) { - // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here - client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) - if err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to create client") - return - } - - if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") - } -} - func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) { pvbs := &velerov1api.PodVolumeBackupList{} if err := client.List(s.ctx, pvbs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil { diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index d8938ca56..5d1c2b76b 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -1148,9 +1148,15 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew || du.Status.Phase == "" { err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) { + func(dataUpload *velerov2alpha1api.DataUpload) bool { + if dataUpload.Spec.Cancel { + return false + } + dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase) + dataUpload.Status.Message = fmt.Sprintf("Dataupload is in status %q during the velero server starting, mark it as cancel", du.Status.Phase) + + return true }) if err != nil { @@ -1183,9 +1189,15 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew || dd.Status.Phase == "" { err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name), - func(dataDownload *velerov2alpha1api.DataDownload) { + func(dataDownload *velerov2alpha1api.DataDownload) bool { + if dataDownload.Spec.Cancel { + return false + } + dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase) + dataDownload.Status.Message = fmt.Sprintf("Datadownload is in status %q during the velero server starting, mark it as cancel", dd.Status.Phase) + + return true }) if err != nil { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index c8e8cca50..c4becaa52 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -56,6 +57,7 @@ import ( type DataDownloadReconciler struct { client client.Client kubeClient kubernetes.Interface + mgr manager.Manager logger logrus.FieldLogger credentialGetter *credentials.CredentialGetter fileSystem filesystem.Interface @@ -68,11 +70,12 @@ type DataDownloadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, +func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, kubeClient: kubeClient, + mgr: mgr, logger: logger.WithField("controller", "DataDownload"), credentialGetter: credentialGetter, fileSystem: filesystem.NewFileSystem(), @@ -137,9 +140,17 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } else if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) { // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism // to help clear up resources instead of clear them directly in case of some conflict with Expose action - if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) { + log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase) + + if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool { + if dataDownload.Spec.Cancel { + return false + } + dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) + dataDownload.Status.Message = "Cancel datadownload because it is being deleted" + + return true }); err != nil { log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) return ctrl.Result{}, err @@ -552,9 +563,15 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context, } } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name), - func(dataDownload *velerov2alpha1api.DataDownload) { + func(dataDownload *velerov2alpha1api.DataDownload) bool { + if dataDownload.Spec.Cancel { + return false + } + dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason) + dataDownload.Status.Message = fmt.Sprintf("Cancel datadownload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason) + + return true }) if err != nil { @@ -575,75 +592,6 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context, return []reconcile.Request{request} } -func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) { - pods := &v1.PodList{} - var dataDownloads []*velerov2alpha1api.DataDownload - if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil { - r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node") - return nil, errors.Wrapf(err, "failed to list pods on current node") - } - - for _, pod := range pods.Items { - if pod.Spec.NodeName != r.nodeName { - r.logger.Debugf("Pod %s related data download will not handled by %s nodes", pod.GetName(), r.nodeName) - continue - } - dd, err := findDataDownloadByPod(cli, pod) - if err != nil { - r.logger.WithError(errors.WithStack(err)).Error("failed to get dataDownload by pod") - continue - } else if dd != nil { - dataDownloads = append(dataDownloads, dd) - } - } - return dataDownloads, nil -} - -func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) { - dataDownloads := &velerov2alpha1api.DataDownloadList{} - if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { - r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") - return nil, errors.Wrapf(err, "failed to list datauploads") - } - - var result []velerov2alpha1api.DataDownload - for _, dd := range dataDownloads.Items { - if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { - continue - } - if dd.Labels[acceptNodeLabelKey] == r.nodeName { - result = append(result, dd) - } - } - return result, nil -} - -// CancelAcceptedDataDownload will cancel the accepted data download -func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) { - r.logger.Infof("Canceling accepted data for node %s", r.nodeName) - dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns) - if err != nil { - r.logger.WithError(err).Error("failed to find data downloads") - return - } - - for _, dd := range dataDownloads { - if dd.Spec.Cancel { - continue - } - err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, - r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) { - dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) - }) - - r.logger.Warn(dd.Status.Message) - if err != nil { - r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error()) - } - } -} - func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) { ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared ssb.Status.Node = r.nodeName @@ -795,56 +743,139 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool { dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted } -func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error { - return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) { +func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataDownload) bool) error { + return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { dd := &velerov2alpha1api.DataDownload{} if err := client.Get(ctx, namespacedName, dd); err != nil { return false, errors.Wrap(err, "getting DataDownload") } - updateFunc(dd) - updateErr := client.Update(ctx, dd) - if updateErr != nil { - if apierrors.IsConflict(updateErr) { - log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name) - return false, nil + if updateFunc(dd) { + err := client.Update(ctx, dd) + if err != nil { + if apierrors.IsConflict(err) { + log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name) + return false, nil + } else { + return false, errors.Wrapf(err, "error updating datadownload %s/%s", dd.Namespace, dd.Name) + } } - log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name) - return false, err } return true, nil }) } -func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { - if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil { - return errors.Wrapf(err, "failed to find data downloads") - } else { - for i := range dataDownloads { - dd := dataDownloads[i] - if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { - // keep doing nothing let controller re-download the data - // the Prepared CR could be still handled by datadownload controller after node-agent restart - logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared") - } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), - func(dataDownload *velerov2alpha1api.DataDownload) { - dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) - }) +var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath - if err != nil { - logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName()) - continue - } - logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled") +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { + dataDownloads := &velerov2alpha1api.DataDownloadList{} + if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads") + return errors.Wrapf(err, "error to list datadownloads") + } + + for i := range dataDownloads.Items { + dd := &dataDownloads.Items[i] + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { + // keep doing nothing let controller re-download the data + // the Prepared CR could be still handled by datadownload controller after node-agent restart + logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared") + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + if dd.Status.Node != r.nodeName { + logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node) + continue + } + + err := funcResumeCancellableDataRestore(r, ctx, dd, logger) + if err == nil { + continue + } + + logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it") + + resumeErr := err + err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + func(dataDownload *velerov2alpha1api.DataDownload) bool { + if dataDownload.Spec.Cancel { + return false + } + + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("Resume InProgress datadownload failed with error %v, mark it as cancel", resumeErr) + + return true + }) + if err != nil { + logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") + } + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { + r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase") + + err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { + if dataDownload.Spec.Cancel { + return false + } + + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = "Datadownload is in Accepted status during the node-agent starting, mark it as cancel" + + return true + }) + if err != nil { + r.logger.WithField("datadownload", dd.GetName()).WithError(err).Errorf("Failed to trigger dataupload cancel") } } } - //If the data download is in Accepted status, the expoded PVC may be not created - // so we need to mark the data download as canceled for it may not be recoverable - r.CancelAcceptedDataDownload(ctx, cli, ns) + return nil +} + +func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, log logrus.FieldLogger) error { + log.Info("Resume cancelable dataDownload") + + res, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration) + if err != nil { + return errors.Wrapf(err, "error to get exposed volume for dd %s", dd.Name) + } + + if res == nil { + return errors.Errorf("expose info missed for dd %s", dd.Name) + } + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataDownloadCompleted, + OnFailed: r.OnDataDownloadFailed, + OnCancelled: r.OnDataDownloadCancelled, + OnProgress: r.OnDataDownloadProgress, + } + + asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log) + if err != nil { + return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name) + } + + resumeComplete := false + defer func() { + if !resumeComplete { + r.closeDataPath(ctx, dd.Name) + } + }() + + if err := asyncBR.Init(ctx, nil); err != nil { + return errors.Wrapf(err, "error to init asyncBR watcher for dd %s", dd.Name) + } + + if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, nil); err != nil { + return errors.Wrapf(err, "error to resume asyncBR watche for dd %s", dd.Name) + } + + resumeComplete = true + + log.Infof("asyncBR is resumed for dd %s", dd.Name) + return nil } diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 7c7c5dbef..356e7f49a 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -33,10 +33,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clientgofake "k8s.io/client-go/kubernetes/fake" ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -149,7 +151,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -869,12 +871,11 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) { testCases := []struct { Name string needErrs []bool + noChange bool ExpectErr bool }{ { - Name: "SuccessOnFirstAttempt", - needErrs: []bool{false, false, false, false}, - ExpectErr: false, + Name: "SuccessOnFirstAttempt", }, { Name: "Error get", @@ -886,6 +887,11 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) { needErrs: []bool{false, false, true, false, false}, ExpectErr: true, }, + { + Name: "no change", + noChange: true, + needErrs: []bool{false, false, true, false, false}, + }, { Name: "Conflict with error timeout", needErrs: []bool{false, false, false, false, true}, @@ -901,8 +907,14 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) { require.NoError(t, err) err = r.client.Create(ctx, dataDownloadBuilder().Result()) require.NoError(t, err) - updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) { + updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) bool { + if tc.noChange { + return false + } + dataDownload.Spec.Cancel = true + + return true } err = UpdateDataDownloadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc) if tc.ExpectErr { @@ -914,136 +926,115 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) { } } -func TestFindDataDownloads(t *testing.T) { - tests := []struct { - name string - pod corev1.Pod - du *velerov2alpha1api.DataDownload - expectedUploads []velerov2alpha1api.DataDownload - expectedError bool - }{ - // Test case 1: Pod with matching nodeName and DataDownload label - { - name: "MatchingPod", - pod: corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "pod-1", - Labels: map[string]string{ - velerov1api.DataDownloadLabel: dataDownloadName, - }, - }, - Spec: corev1.PodSpec{ - NodeName: "node-1", - }, - }, - du: dataDownloadBuilder().Result(), - expectedUploads: []velerov2alpha1api.DataDownload{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: dataDownloadName, - }, - }, - }, - expectedError: false, - }, - // Test case 2: Pod with non-matching nodeName - { - name: "NonMatchingNodePod", - pod: corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "pod-2", - Labels: map[string]string{ - velerov1api.DataDownloadLabel: dataDownloadName, - }, - }, - Spec: corev1.PodSpec{ - NodeName: "node-2", - }, - }, - du: dataDownloadBuilder().Result(), - expectedUploads: []velerov2alpha1api.DataDownload{}, - expectedError: false, - }, - } +type ddResumeTestHelper struct { + resumeErr error + getExposeErr error + exposeResult *exposer.ExposeResult + asyncBR datapath.AsyncBR +} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - r, err := initDataDownloadReconcilerWithError(nil) - require.NoError(t, err) - r.nodeName = "node-1" - err = r.client.Create(ctx, test.du) - require.NoError(t, err) - err = r.client.Create(ctx, &test.pod) - require.NoError(t, err) - uploads, err := r.FindDataDownloads(context.Background(), r.client, "velero") +func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error { + return dt.resumeErr +} - if test.expectedError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, len(test.expectedUploads), len(uploads)) - } - }) - } +func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error { + return nil +} + +func (dt *ddResumeTestHelper) GetExposed(context.Context, corev1.ObjectReference, kbclient.Client, string, time.Duration) (*exposer.ExposeResult, error) { + return dt.exposeResult, dt.getExposeErr +} + +func (dt *ddResumeTestHelper) PeekExposed(context.Context, corev1.ObjectReference) error { + return nil +} + +func (dt *ddResumeTestHelper) RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error { + return nil +} + +func (dt *ddResumeTestHelper) CleanUp(context.Context, corev1.ObjectReference) {} + +func (dt *ddResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, + datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return dt.asyncBR } func TestAttemptDataDownloadResume(t *testing.T) { tests := []struct { - name string - dataUploads []velerov2alpha1api.DataDownload - du *velerov2alpha1api.DataDownload - pod *corev1.Pod - needErrs []bool - acceptedDataDownloads []string - prepareddDataDownloads []string - cancelledDataDownloads []string - expectedError bool + name string + dataUploads []velerov2alpha1api.DataDownload + dd *velerov2alpha1api.DataDownload + needErrs []bool + resumeErr error + acceptedDataDownloads []string + prepareddDataDownloads []string + cancelledDataDownloads []string + inProgressDataDownloads []string + expectedError string }{ - // Test case 1: Process Accepted DataDownload { - name: "AcceptedDataDownload", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataDownloadLabel: dataDownloadName, + name: "accepted DataDownload with no dd label", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + cancelledDataDownloads: []string{dataDownloadName}, + acceptedDataDownloads: []string{dataDownloadName}, + }, + { + name: "accepted DataDownload in the current node", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(), + cancelledDataDownloads: []string{dataDownloadName}, + acceptedDataDownloads: []string{dataDownloadName}, + }, + { + name: "accepted DataDownload with dd label but is canceled", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Cancel(true).Labels(map[string]string{ + acceptNodeLabelKey: "node-1", }).Result(), - du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + acceptedDataDownloads: []string{dataDownloadName}, + cancelledDataDownloads: []string{dataDownloadName}, + }, + { + name: "accepted DataDownload with dd label but cancel fail", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Labels(map[string]string{ + acceptNodeLabelKey: "node-1", + }).Result(), + needErrs: []bool{false, false, true, false, false, false}, acceptedDataDownloads: []string{dataDownloadName}, - expectedError: false, }, - // Test case 2: Cancel an Accepted DataDownload { - name: "CancelAcceptedDataDownload", - du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), - }, - // Test case 3: Process Accepted Prepared DataDownload - { - name: "PreparedDataDownload", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataDownloadLabel: dataDownloadName, - }).Result(), - du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + name: "prepared DataDownload", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), prepareddDataDownloads: []string{dataDownloadName}, }, - // Test case 4: Process Accepted InProgress DataDownload { - name: "InProgressDataDownload", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataDownloadLabel: dataDownloadName, - }).Result(), - du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - prepareddDataDownloads: []string{dataDownloadName}, + name: "InProgress DataDownload, not the current node", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + inProgressDataDownloads: []string{dataDownloadName}, }, - // Test case 5: get resume error { - name: "ResumeError", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataDownloadLabel: dataDownloadName, - }).Result(), + name: "InProgress DataDownload, no resume error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(), + inProgressDataDownloads: []string{dataDownloadName}, + }, + { + name: "InProgress DataDownload, resume error, cancel error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(), + resumeErr: errors.New("fake-resume-error"), + needErrs: []bool{false, false, true, false, false, false}, + inProgressDataDownloads: []string{dataDownloadName}, + }, + { + name: "InProgress DataDownload, resume error, cancel succeed", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(), + resumeErr: errors.New("fake-resume-error"), + cancelledDataDownloads: []string{dataDownloadName}, + inProgressDataDownloads: []string{dataDownloadName}, + }, + { + name: "Error", needErrs: []bool{false, false, false, false, false, true}, - du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - expectedError: true, + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + expectedError: "error to list datadownloads: List error", }, } @@ -1054,30 +1045,31 @@ func TestAttemptDataDownloadResume(t *testing.T) { r.nodeName = "node-1" require.NoError(t, err) defer func() { - r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) - if test.pod != nil { - r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) - } + r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{}) }() - assert.NoError(t, r.client.Create(ctx, test.du)) - if test.pod != nil { - assert.NoError(t, r.client.Create(ctx, test.pod)) - } - // Run the test - err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + assert.NoError(t, r.client.Create(ctx, test.dd)) - if test.expectedError { - assert.Error(t, err) + dt := &duResumeTestHelper{ + resumeErr: test.resumeErr, + } + + funcResumeCancellableDataBackup = dt.resumeCancellableDataPath + + // Run the test + err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace) + + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) } else { assert.NoError(t, err) // Verify DataDownload marked as Canceled for _, duName := range test.cancelledDataDownloads { - dataUpload := &velerov2alpha1api.DataDownload{} - err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + dataDownload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataDownload) require.NoError(t, err) - assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase) + assert.True(t, dataDownload.Spec.Cancel) } // Verify DataDownload marked as Accepted for _, duName := range test.acceptedDataDownloads { @@ -1097,3 +1089,108 @@ func TestAttemptDataDownloadResume(t *testing.T) { }) } } + +func TestResumeCancellableRestore(t *testing.T) { + tests := []struct { + name string + dataDownloads []velerov2alpha1api.DataDownload + dd *velerov2alpha1api.DataDownload + getExposeErr error + exposeResult *exposer.ExposeResult + createWatcherErr error + initWatcherErr error + startWatcherErr error + mockInit bool + mockStart bool + mockClose bool + expectedError string + }{ + { + name: "get expose failed", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(), + getExposeErr: errors.New("fake-expose-error"), + expectedError: fmt.Sprintf("error to get exposed volume for dd %s: fake-expose-error", dataDownloadName), + }, + { + name: "no expose", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(), + expectedError: fmt.Sprintf("expose info missed for dd %s", dataDownloadName), + }, + { + name: "watcher init error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1.Pod{}, + }, + }, + mockInit: true, + mockClose: true, + initWatcherErr: errors.New("fake-init-watcher-error"), + expectedError: fmt.Sprintf("error to init asyncBR watcher for dd %s: fake-init-watcher-error", dataDownloadName), + }, + { + name: "start watcher error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + mockClose: true, + startWatcherErr: errors.New("fake-start-watcher-error"), + expectedError: fmt.Sprintf("error to resume asyncBR watche for dd %s: fake-start-watcher-error", dataDownloadName), + }, + { + name: "succeed", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initDataDownloadReconciler(nil, false) + r.nodeName = "node-1" + require.NoError(t, err) + + mockAsyncBR := datapathmockes.NewAsyncBR(t) + + if test.mockInit { + mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr) + } + + if test.mockStart { + mockAsyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr) + } + + if test.mockClose { + mockAsyncBR.On("Close", mock.Anything).Return() + } + + dt := &ddResumeTestHelper{ + getExposeErr: test.getExposeErr, + exposeResult: test.exposeResult, + asyncBR: mockAsyncBR, + } + + r.restoreExposer = dt + + datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher + + err = r.resumeCancellableDataPath(ctx, test.dd, velerotest.NewLogger()) + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } + }) + } +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 4781f04f4..d9025f668 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -67,6 +68,7 @@ type DataUploadReconciler struct { client client.Client kubeClient kubernetes.Interface csiSnapshotClient snapshotter.SnapshotV1Interface + mgr manager.Manager repoEnsurer *repository.Ensurer Clock clocks.WithTickerAndDelayedExecution credentialGetter *credentials.CredentialGetter @@ -80,11 +82,12 @@ type DataUploadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, +func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, + mgr: mgr, kubeClient: kubeClient, csiSnapshotClient: csiSnapshotClient, Clock: clock, @@ -150,9 +153,17 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } else if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) && !du.Spec.Cancel && !isDataUploadInFinalState(du) { // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism // to help clear up resources instead of clear them directly in case of some conflict with Expose action - if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) { + log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase) + + if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool { + if dataUpload.Spec.Cancel { + return false + } + dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", du.Namespace, du.Name) + dataUpload.Status.Message = "Cancel dataupload because it is being deleted" + + return true }); err != nil { log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), du.Namespace, du.Name) return ctrl.Result{}, err @@ -599,9 +610,15 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj } } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) { + func(dataUpload *velerov2alpha1api.DataUpload) bool { + if dataUpload.Spec.Cancel { + return false + } + dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason) + dataUpload.Status.Message = fmt.Sprintf("Cancel dataupload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason) + + return true }) if err != nil { @@ -622,75 +639,6 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj return []reconcile.Request{request} } -func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { - pods := &corev1.PodList{} - var dataUploads []velerov2alpha1api.DataUpload - if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil { - r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node") - return nil, errors.Wrapf(err, "failed to list pods on current node") - } - - for _, pod := range pods.Items { - if pod.Spec.NodeName != r.nodeName { - r.logger.Debugf("Pod %s related data upload will not handled by %s nodes", pod.GetName(), r.nodeName) - continue - } - du, err := findDataUploadByPod(cli, pod) - if err != nil { - r.logger.WithError(errors.WithStack(err)).Error("failed to get dataUpload by pod") - continue - } else if du != nil { - dataUploads = append(dataUploads, *du) - } - } - return dataUploads, nil -} - -func (r *DataUploadReconciler) findAcceptDataUploadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { - dataUploads := &velerov2alpha1api.DataUploadList{} - if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { - r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") - return nil, errors.Wrapf(err, "failed to list datauploads") - } - - var result []velerov2alpha1api.DataUpload - for _, du := range dataUploads.Items { - if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted { - continue - } - if du.Labels[acceptNodeLabelKey] == r.nodeName { - result = append(result, du) - } - } - return result, nil -} - -func (r *DataUploadReconciler) CancelAcceptedDataupload(ctx context.Context, cli client.Client, ns string) { - r.logger.Infof("Reset accepted dataupload for node %s", r.nodeName) - dataUploads, err := r.findAcceptDataUploadsByNodeLabel(ctx, cli, ns) - if err != nil { - r.logger.WithError(err).Error("failed to find dataupload") - return - } - - for _, du := range dataUploads { - if du.Spec.Cancel { - continue - } - err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) { - dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) - }) - - r.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) - if err != nil { - r.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) - continue - } - } -} - func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) { du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared du.Status.Node = r.nodeName @@ -902,54 +850,145 @@ func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool { du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted } -func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataUpload *velerov2alpha1api.DataUpload)) error { - return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) { +func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataUpload) bool) error { + return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { du := &velerov2alpha1api.DataUpload{} if err := client.Get(ctx, namespacedName, du); err != nil { return false, errors.Wrap(err, "getting DataUpload") } - updateFunc(du) - updateErr := client.Update(ctx, du) - if updateErr != nil { - if apierrors.IsConflict(updateErr) { - log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name) - return false, nil + if updateFunc(du) { + err := client.Update(ctx, du) + if err != nil { + if apierrors.IsConflict(err) { + log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name) + return false, nil + } else { + return false, errors.Wrapf(err, "error updating dataupload with error %s/%s", du.Namespace, du.Name) + } } - log.Errorf("failed to update dataupload with error %s for %s/%s", updateErr.Error(), du.Namespace, du.Name) - return false, err } + return true, nil }) } -func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { - if dataUploads, err := r.FindDataUploadsByPod(ctx, cli, ns); err != nil { - return errors.Wrap(err, "failed to find data uploads") - } else { - for _, du := range dataUploads { - if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { - // keep doing nothing let controller re-download the data - // the Prepared CR could be still handled by dataupload controller after node-agent restart - logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared") - } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) { - dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) - }) +var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath - if err != nil { - logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q into canceled", du.GetName()) - continue - } - logger.WithField("dataupload", du.GetName()).Debug("mark dataupload into canceled") +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { + dataUploads := &velerov2alpha1api.DataUploadList{} + if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") + return errors.Wrapf(err, "error to list datauploads") + } + + for i := range dataUploads.Items { + du := &dataUploads.Items[i] + if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { + // keep doing nothing let controller re-download the data + // the Prepared CR could be still handled by dataupload controller after node-agent restart + logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared") + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + if du.Status.Node != r.nodeName { + logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node) + continue + } + + err := funcResumeCancellableDataBackup(r, ctx, du, logger) + if err == nil { + continue + } + + logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it") + + resumeErr := err + err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) bool { + if dataUpload.Spec.Cancel { + return false + } + + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("Resume InProgress dataupload failed with error %v, mark it as cancel", resumeErr) + + return true + }) + if err != nil { + logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") + } + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { + r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") + + err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) bool { + if dataUpload.Spec.Cancel { + return false + } + + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = "Dataupload is in Accepted status during the node-agent starting, mark it as cancel" + + return true + }) + if err != nil { + r.logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel") } } } - //If the data upload is in Accepted status, the volume snapshot may be deleted and the exposed pod may not be created - // so we need to mark the data upload as canceled for it may not be recoverable - r.CancelAcceptedDataupload(ctx, cli, ns) + return nil +} + +func (r *DataUploadReconciler) resumeCancellableDataPath(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) error { + log.Info("Resume cancelable dataUpload") + + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] + if !ok { + return errors.Errorf("error to find exposer for du %s", du.Name) + } + + waitExposePara := r.setupWaitExposePara(du) + res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara) + if err != nil { + return errors.Wrapf(err, "error to get exposed snapshot for du %s", du.Name) + } + + if res == nil { + return errors.Errorf("expose info missed for du %s", du.Name) + } + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataUploadCompleted, + OnFailed: r.OnDataUploadFailed, + OnCancelled: r.OnDataUploadCancelled, + OnProgress: r.OnDataUploadProgress, + } + + asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, true, log) + if err != nil { + return errors.Wrapf(err, "error to create asyncBR watcher for du %s", du.Name) + } + + resumeComplete := false + defer func() { + if !resumeComplete { + r.closeDataPath(ctx, du.Name) + } + }() + + if err := asyncBR.Init(ctx, nil); err != nil { + return errors.Wrapf(err, "error to init asyncBR watcher for du %s", du.Name) + } + + if err := asyncBR.StartBackup(datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, du.Spec.DataMoverConfig, nil); err != nil { + return errors.Wrapf(err, "error to resume asyncBR watche for du %s", du.Name) + } + + resumeComplete = true + + log.Infof("asyncBR is resumed for du %s", du.Name) + return nil } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 2bca1d5b9..0584065b1 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -27,6 +27,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -35,6 +36,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clientgofake "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" @@ -42,6 +44,7 @@ import ( kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/vmware-tanzu/velero/internal/credentials" @@ -49,6 +52,7 @@ import ( velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" "github.com/vmware-tanzu/velero/pkg/datapath" + datapathmocks "github.com/vmware-tanzu/velero/pkg/datapath/mocks" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" velerotest "github.com/vmware-tanzu/velero/pkg/test" @@ -241,7 +245,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci if err != nil { return nil, err } - return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, + return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } @@ -944,12 +948,11 @@ func TestUpdateDataUploadWithRetry(t *testing.T) { testCases := []struct { Name string needErrs []bool + noChange bool ExpectErr bool }{ { - Name: "SuccessOnFirstAttempt", - needErrs: []bool{false, false, false, false}, - ExpectErr: false, + Name: "SuccessOnFirstAttempt", }, { Name: "Error get", @@ -961,6 +964,11 @@ func TestUpdateDataUploadWithRetry(t *testing.T) { needErrs: []bool{false, false, true, false, false}, ExpectErr: true, }, + { + Name: "no change", + noChange: true, + needErrs: []bool{false, false, true, false, false}, + }, { Name: "Conflict with error timeout", needErrs: []bool{false, false, false, false, true}, @@ -976,8 +984,13 @@ func TestUpdateDataUploadWithRetry(t *testing.T) { require.NoError(t, err) err = r.client.Create(ctx, dataUploadBuilder().Result()) require.NoError(t, err) - updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) { + updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) bool { + if tc.noChange { + return false + } + dataDownload.Spec.Cancel = true + return true } err = UpdateDataUploadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc) if tc.ExpectErr { @@ -989,135 +1002,107 @@ func TestUpdateDataUploadWithRetry(t *testing.T) { } } -func TestFindDataUploads(t *testing.T) { - tests := []struct { - name string - pod corev1.Pod - du *velerov2alpha1api.DataUpload - expectedUploads []velerov2alpha1api.DataUpload - expectedError bool - }{ - // Test case 1: Pod with matching nodeName and DataUpload label - { - name: "MatchingPod", - pod: corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "pod-1", - Labels: map[string]string{ - velerov1api.DataUploadLabel: dataUploadName, - }, - }, - Spec: corev1.PodSpec{ - NodeName: "node-1", - }, - }, - du: dataUploadBuilder().Result(), - expectedUploads: []velerov2alpha1api.DataUpload{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: dataUploadName, - }, - }, - }, - expectedError: false, - }, - // Test case 2: Pod with non-matching nodeName - { - name: "NonMatchingNodePod", - pod: corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "pod-2", - Labels: map[string]string{ - velerov1api.DataUploadLabel: dataUploadName, - }, - }, - Spec: corev1.PodSpec{ - NodeName: "node-2", - }, - }, - du: dataUploadBuilder().Result(), - expectedUploads: []velerov2alpha1api.DataUpload{}, - expectedError: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - r, err := initDataUploaderReconcilerWithError() - require.NoError(t, err) - r.nodeName = "node-1" - err = r.client.Create(ctx, test.du) - require.NoError(t, err) - err = r.client.Create(ctx, &test.pod) - require.NoError(t, err) - uploads, err := r.FindDataUploadsByPod(context.Background(), r.client, "velero") - - if test.expectedError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, len(test.expectedUploads), len(uploads)) - } - }) - } +type duResumeTestHelper struct { + resumeErr error + getExposeErr error + exposeResult *exposer.ExposeResult + asyncBR datapath.AsyncBR } + +func (dt *duResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error { + return dt.resumeErr +} + +func (dt *duResumeTestHelper) Expose(context.Context, corev1.ObjectReference, interface{}) error { + return nil +} + +func (dt *duResumeTestHelper) GetExposed(context.Context, corev1.ObjectReference, time.Duration, interface{}) (*exposer.ExposeResult, error) { + return dt.exposeResult, dt.getExposeErr +} + +func (dt *duResumeTestHelper) PeekExposed(context.Context, corev1.ObjectReference) error { + return nil +} + +func (dt *duResumeTestHelper) CleanUp(context.Context, corev1.ObjectReference, string, string) {} + +func (dt *duResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, + datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return dt.asyncBR +} + func TestAttemptDataUploadResume(t *testing.T) { tests := []struct { - name string - dataUploads []velerov2alpha1api.DataUpload - du *velerov2alpha1api.DataUpload - pod *corev1.Pod - needErrs []bool - acceptedDataUploads []string - prepareddDataUploads []string - cancelledDataUploads []string - expectedError bool + name string + dataUploads []velerov2alpha1api.DataUpload + du *velerov2alpha1api.DataUpload + needErrs []bool + acceptedDataUploads []string + prepareddDataUploads []string + cancelledDataUploads []string + inProgressDataUploads []string + resumeErr error + expectedError string }{ - // Test case 1: Process Accepted DataUpload { - name: "AcceptedDataUpload", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataUploadLabel: dataUploadName, - }).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + name: "accepted DataUpload in other node", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + cancelledDataUploads: []string{dataUploadName}, + acceptedDataUploads: []string{dataUploadName}, + }, + { + name: "accepted DataUpload in the current node", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(), + cancelledDataUploads: []string{dataUploadName}, + acceptedDataUploads: []string{dataUploadName}, + }, + { + name: "accepted DataUpload in the current node but canceled", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Cancel(true).Result(), + cancelledDataUploads: []string{dataUploadName}, + acceptedDataUploads: []string{dataUploadName}, + }, + { + name: "accepted DataUpload in the current node but update error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(), + needErrs: []bool{false, false, true, false, false, false}, acceptedDataUploads: []string{dataUploadName}, - expectedError: false, }, - // Test case 2: Cancel an Accepted DataUpload { - name: "CancelAcceptedDataUpload", - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - }, - // Test case 3: Process Accepted Prepared DataUpload - { - name: "PreparedDataUpload", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataUploadLabel: dataUploadName, - }).Result(), + name: "prepared DataUpload", du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), prepareddDataUploads: []string{dataUploadName}, }, - // Test case 4: Process Accepted InProgress DataUpload { - name: "InProgressDataUpload", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataUploadLabel: dataUploadName, - }).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - prepareddDataUploads: []string{dataUploadName}, + name: "InProgress DataUpload, not the current node", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + inProgressDataUploads: []string{dataUploadName}, }, - // Test case 5: get resume error { - name: "ResumeError", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ - velerov1api.DataUploadLabel: dataUploadName, - }).Result(), + name: "InProgress DataUpload, resume error and update error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(), + needErrs: []bool{false, false, true, false, false, false}, + resumeErr: errors.New("fake-resume-error"), + inProgressDataUploads: []string{dataUploadName}, + }, + { + name: "InProgress DataUpload, resume error and update succeed", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(), + resumeErr: errors.New("fake-resume-error"), + cancelledDataUploads: []string{dataUploadName}, + inProgressDataUploads: []string{dataUploadName}, + }, + { + name: "InProgress DataUpload and resume succeed", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(), + inProgressDataUploads: []string{dataUploadName}, + }, + { + name: "Error", needErrs: []bool{false, false, false, false, false, true}, du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedError: true, + expectedError: "error to list datauploads: List error", }, } @@ -1127,22 +1112,20 @@ func TestAttemptDataUploadResume(t *testing.T) { r, err := initDataUploaderReconciler(test.needErrs...) r.nodeName = "node-1" require.NoError(t, err) - defer func() { - r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) - if test.pod != nil { - r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) - } - }() assert.NoError(t, r.client.Create(ctx, test.du)) - if test.pod != nil { - assert.NoError(t, r.client.Create(ctx, test.pod)) + + dt := &duResumeTestHelper{ + resumeErr: test.resumeErr, } + + funcResumeCancellableDataBackup = dt.resumeCancellableDataPath + // Run the test err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) - if test.expectedError { - assert.Error(t, err) + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) } else { assert.NoError(t, err) @@ -1151,7 +1134,7 @@ func TestAttemptDataUploadResume(t *testing.T) { dataUpload := &velerov2alpha1api.DataUpload{} err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) require.NoError(t, err) - assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, dataUpload.Status.Phase) + assert.True(t, dataUpload.Spec.Cancel) } // Verify DataUploads marked as Accepted for _, duName := range test.acceptedDataUploads { @@ -1167,6 +1150,123 @@ func TestAttemptDataUploadResume(t *testing.T) { require.NoError(t, err) assert.Equal(t, velerov2alpha1api.DataUploadPhasePrepared, dataUpload.Status.Phase) } + // Verify DataUploads marked as InProgress + for _, duName := range test.inProgressDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseInProgress, dataUpload.Status.Phase) + } + } + }) + } +} + +func TestResumeCancellableBackup(t *testing.T) { + tests := []struct { + name string + dataUploads []velerov2alpha1api.DataUpload + du *velerov2alpha1api.DataUpload + getExposeErr error + exposeResult *exposer.ExposeResult + createWatcherErr error + initWatcherErr error + startWatcherErr error + mockInit bool + mockStart bool + mockClose bool + expectedError string + }{ + { + name: "not find exposer", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType("").Result(), + expectedError: fmt.Sprintf("error to find exposer for du %s", dataUploadName), + }, + { + name: "get expose failed", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(velerov2alpha1api.SnapshotTypeCSI).Result(), + getExposeErr: errors.New("fake-expose-error"), + expectedError: fmt.Sprintf("error to get exposed snapshot for du %s: fake-expose-error", dataUploadName), + }, + { + name: "no expose", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(), + expectedError: fmt.Sprintf("expose info missed for du %s", dataUploadName), + }, + { + name: "watcher init error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1.Pod{}, + }, + }, + mockInit: true, + mockClose: true, + initWatcherErr: errors.New("fake-init-watcher-error"), + expectedError: fmt.Sprintf("error to init asyncBR watcher for du %s: fake-init-watcher-error", dataUploadName), + }, + { + name: "start watcher error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + mockClose: true, + startWatcherErr: errors.New("fake-start-watcher-error"), + expectedError: fmt.Sprintf("error to resume asyncBR watche for du %s: fake-start-watcher-error", dataUploadName), + }, + { + name: "succeed", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initDataUploaderReconciler() + r.nodeName = "node-1" + require.NoError(t, err) + + mockAsyncBR := datapathmocks.NewAsyncBR(t) + + if test.mockInit { + mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr) + } + + if test.mockStart { + mockAsyncBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr) + } + + if test.mockClose { + mockAsyncBR.On("Close", mock.Anything).Return() + } + + dt := &duResumeTestHelper{ + getExposeErr: test.getExposeErr, + exposeResult: test.exposeResult, + asyncBR: mockAsyncBR, + } + + r.snapshotExposerList[velerov2alpha1api.SnapshotTypeCSI] = dt + + datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher + + err = r.resumeCancellableDataPath(ctx, test.du, velerotest.NewLogger()) + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) } }) }