diff --git a/changelogs/unreleased/7130-qiuming-best b/changelogs/unreleased/7130-qiuming-best new file mode 100644 index 000000000..f6f6c6f74 --- /dev/null +++ b/changelogs/unreleased/7130-qiuming-best @@ -0,0 +1 @@ +Node agent restart enhancement diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 835b899c3..53c45fb81 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -285,13 +285,13 @@ func (s *nodeAgentServer) run() { } dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) - s.markDataUploadsCancel(dataUploadReconciler) + s.attemptDataUploadResume(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) - s.markDataDownloadsCancel(dataDownloadReconciler) + s.attemptDataDownloadResume(dataDownloadReconciler) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } @@ -365,65 +365,28 @@ func (s *nodeAgentServer) markInProgressCRsFailed() { s.markInProgressPVRsFailed(client) } -func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) { +func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) { // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) if err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to create client") return } - if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to find data uploads") - } else { - for i := range dataUploads { - du := dataUploads[i] - if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || - du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || - du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name), - func(dataUpload *velerov2alpha1api.DataUpload) { - dataUpload.Spec.Cancel = true - dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) - }) - - if err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) - continue - } - s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) - } - } + if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") } } -func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) { +func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) { // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) if err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to create client") return } - if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads") - } else { - for i := range dataDownloads { - dd := dataDownloads[i] - if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || - dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || - dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name), - func(dataDownload *velerov2alpha1api.DataDownload) { - dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) - }) - if err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName()) - continue - } - s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message) - } - } + if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") } } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index bf4299ea4..065c48a5f 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -140,7 +140,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // to help clear up resources instead of clear them directly in case of some conflict with Expose action if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Spec.Cancel = true - dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) + dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) }); err != nil { log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) return ctrl.Result{}, err @@ -192,7 +192,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to expose snapshot", log) } } - log.Info("Restore is exposed") // we need to get CR again for it may canceled by datadownload controller on other @@ -205,7 +204,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } return ctrl.Result{}, errors.Wrap(err, "getting datadownload") } - // we need to clean up resources as resources created in Expose it may later than cancel action or prepare time // and need to clean up resources again if isDataDownloadInFinalState(dd) { @@ -267,7 +265,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to create data path", log) } } - // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress @@ -576,6 +573,51 @@ func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli clie return dataDownloads, nil } +func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) { + dataDownloads := &velerov2alpha1api.DataDownloadList{} + if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") + return nil, errors.Wrapf(err, "failed to list datauploads") + } + + var result []velerov2alpha1api.DataDownload + for _, dd := range dataDownloads.Items { + if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { + continue + } + if dd.Labels[acceptNodeLabelKey] == r.nodeName { + result = append(result, dd) + } + } + return result, nil +} + +// CancelAcceptedDataDownload will cancel the accepted data download +func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) { + r.logger.Infof("Canceling accepted data for node %s", r.nodeName) + dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns) + if err != nil { + r.logger.WithError(err).Error("failed to find data downloads") + return + } + + for _, dd := range dataDownloads { + if dd.Spec.Cancel { + continue + } + err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) + }) + + r.logger.Warn(dd.Status.Message) + if err != nil { + r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error()) + } + } +} + func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) { ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared ssb.Status.Node = r.nodeName @@ -749,3 +791,35 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name return true, nil }) } + +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { + if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil { + return errors.Wrapf(err, "failed to find data downloads") + } else { + for i := range dataDownloads { + dd := dataDownloads[i] + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { + // keep doing nothing let controller re-download the data + // the Prepared CR could be still handled by datadownload controller after node-agent restart + logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared") + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) + }) + + if err != nil { + logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName()) + continue + } + logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled") + } + } + } + + //If the data download is in Accepted status, the expoded PVC may be not created + // so we need to mark the data download as canceled for it may not be recoverable + r.CancelAcceptedDataDownload(ctx, cli, ns) + return nil +} diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index de9fa7516..afdadf61d 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -69,7 +69,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder { } func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { - var errs []error = make([]error, 5) + var errs []error = make([]error, 6) for k, isError := range needError { if k == 0 && isError { errs[0] = fmt.Errorf("Get error") @@ -81,6 +81,8 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D errs[3] = fmt.Errorf("Patch error") } else if k == 4 && isError { errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict")) + } else if k == 5 && isError { + errs[5] = fmt.Errorf("List error") } } return initDataDownloadReconcilerWithError(objects, errs...) @@ -116,6 +118,8 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... fakeClient.patchError = needError[3] } else if k == 4 { fakeClient.updateConflict = needError[4] + } else if k == 5 { + fakeClient.listError = needError[5] } } @@ -939,3 +943,111 @@ func TestFindDataDownloads(t *testing.T) { }) } } + +func TestAttemptDataDownloadResume(t *testing.T) { + tests := []struct { + name string + dataUploads []velerov2alpha1api.DataDownload + du *velerov2alpha1api.DataDownload + pod *corev1.Pod + needErrs []bool + acceptedDataDownloads []string + prepareddDataDownloads []string + cancelledDataDownloads []string + expectedError bool + }{ + // Test case 1: Process Accepted DataDownload + { + name: "AcceptedDataDownload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + acceptedDataDownloads: []string{dataDownloadName}, + expectedError: false, + }, + // Test case 2: Cancel an Accepted DataDownload + { + name: "CancelAcceptedDataDownload", + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + }, + // Test case 3: Process Accepted Prepared DataDownload + { + name: "PreparedDataDownload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + prepareddDataDownloads: []string{dataDownloadName}, + }, + // Test case 4: Process Accepted InProgress DataDownload + { + name: "InProgressDataDownload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + prepareddDataDownloads: []string{dataDownloadName}, + }, + // Test case 5: get resume error + { + name: "ResumeError", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }).Result(), + needErrs: []bool{false, false, false, false, false, true}, + du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + expectedError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initDataDownloadReconciler(nil, test.needErrs...) + r.nodeName = "node-1" + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + }() + + assert.NoError(t, r.client.Create(ctx, test.du)) + if test.pod != nil { + assert.NoError(t, r.client.Create(ctx, test.pod)) + } + // Run the test + err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + // Verify DataDownload marked as Cancelled + for _, duName := range test.cancelledDataDownloads { + dataUpload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase) + } + // Verify DataDownload marked as Accepted + for _, duName := range test.acceptedDataDownloads { + dataUpload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataDownloadPhaseAccepted, dataUpload.Status.Phase) + } + // Verify DataDownload marked as Prepared + for _, duName := range test.prepareddDataDownloads { + dataUpload := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataDownloadPhasePrepared, dataUpload.Status.Phase) + } + } + }) + } +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 9465528e3..524d8a057 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -274,7 +274,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.errorOut(ctx, du, err, "error to create data path", log) } } - // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress @@ -581,7 +580,7 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco return []reconcile.Request{requests} } -func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { +func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { pods := &corev1.PodList{} var dataUploads []velerov2alpha1api.DataUpload if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil { @@ -605,6 +604,51 @@ func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.C return dataUploads, nil } +func (r *DataUploadReconciler) findAcceptDataUploadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) { + dataUploads := &velerov2alpha1api.DataUploadList{} + if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") + return nil, errors.Wrapf(err, "failed to list datauploads") + } + + var result []velerov2alpha1api.DataUpload + for _, du := range dataUploads.Items { + if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted { + continue + } + if du.Labels[acceptNodeLabelKey] == r.nodeName { + result = append(result, du) + } + } + return result, nil +} + +func (r *DataUploadReconciler) CancelAcceptedDataupload(ctx context.Context, cli client.Client, ns string) { + r.logger.Infof("Reset accepted dataupload for node %s", r.nodeName) + dataUploads, err := r.findAcceptDataUploadsByNodeLabel(ctx, cli, ns) + if err != nil { + r.logger.WithError(err).Error("failed to find dataupload") + return + } + + for _, du := range dataUploads { + if du.Spec.Cancel { + continue + } + err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) + }) + + r.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) + if err != nil { + r.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) + continue + } + } +} + func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) { du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared du.Status.Node = r.nodeName @@ -833,3 +877,34 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp return true, nil }) } + +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { + if dataUploads, err := r.FindDataUploadsByPod(ctx, cli, ns); err != nil { + return errors.Wrap(err, "failed to find data uploads") + } else { + for _, du := range dataUploads { + if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { + // keep doing nothing let controller re-download the data + // the Prepared CR could be still handled by dataupload controller after node-agent restart + logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared") + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) + }) + + if err != nil { + logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q into canceled", du.GetName()) + continue + } + logger.WithField("dataupload", du.GetName()).Debug("mark dataupload into canceled") + } + } + } + + //If the data upload is in Accepted status, the volume snapshot may be deleted and the exposed pod may not be created + // so we need to mark the data upload as canceled for it may not be recoverable + r.CancelAcceptedDataupload(ctx, cli, ns) + return nil +} diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index b61cd07b3..05ee97430 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -68,6 +68,7 @@ type FakeClient struct { updateError error patchError error updateConflict error + listError error } func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error { @@ -106,8 +107,16 @@ func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbcli return c.Client.Patch(ctx, obj, patch, opts...) } +func (c *FakeClient) List(ctx context.Context, list kbclient.ObjectList, opts ...kbclient.ListOption) error { + if c.listError != nil { + return c.listError + } + + return c.Client.List(ctx, list, opts...) +} + func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) { - var errs []error = make([]error, 5) + var errs []error = make([]error, 6) for k, isError := range needError { if k == 0 && isError { errs[0] = fmt.Errorf("Get error") @@ -118,7 +127,9 @@ func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error } else if k == 3 && isError { errs[3] = fmt.Errorf("Patch error") } else if k == 4 && isError { - errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict")) + errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("dataupload"), dataUploadName, errors.New("conflict")) + } else if k == 5 && isError { + errs[5] = fmt.Errorf("List error") } } @@ -198,6 +209,8 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeClient.patchError = needError[3] } else if k == 4 { fakeClient.updateConflict = needError[4] + } else if k == 5 { + fakeClient.listError = needError[5] } } @@ -983,7 +996,7 @@ func TestFindDataUploads(t *testing.T) { require.NoError(t, err) err = r.client.Create(ctx, &test.pod) require.NoError(t, err) - uploads, err := r.FindDataUploads(context.Background(), r.client, "velero") + uploads, err := r.FindDataUploadsByPod(context.Background(), r.client, "velero") if test.expectedError { assert.Error(t, err) @@ -994,3 +1007,110 @@ func TestFindDataUploads(t *testing.T) { }) } } +func TestAttemptDataUploadResume(t *testing.T) { + tests := []struct { + name string + dataUploads []velerov2alpha1api.DataUpload + du *velerov2alpha1api.DataUpload + pod *corev1.Pod + needErrs []bool + acceptedDataUploads []string + prepareddDataUploads []string + cancelledDataUploads []string + expectedError bool + }{ + // Test case 1: Process Accepted DataUpload + { + name: "AcceptedDataUpload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + acceptedDataUploads: []string{dataUploadName}, + expectedError: false, + }, + // Test case 2: Cancel an Accepted DataUpload + { + name: "CancelAcceptedDataUpload", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + }, + // Test case 3: Process Accepted Prepared DataUpload + { + name: "PreparedDataUpload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + prepareddDataUploads: []string{dataUploadName}, + }, + // Test case 4: Process Accepted InProgress DataUpload + { + name: "InProgressDataUpload", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + prepareddDataUploads: []string{dataUploadName}, + }, + // Test case 5: get resume error + { + name: "ResumeError", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }).Result(), + needErrs: []bool{false, false, false, false, false, true}, + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initDataUploaderReconciler(test.needErrs...) + r.nodeName = "node-1" + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + }() + + assert.NoError(t, r.client.Create(ctx, test.du)) + if test.pod != nil { + assert.NoError(t, r.client.Create(ctx, test.pod)) + } + // Run the test + err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + // Verify DataUploads marked as Cancelled + for _, duName := range test.cancelledDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, dataUpload.Status.Phase) + } + // Verify DataUploads marked as Accepted + for _, duName := range test.acceptedDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseAccepted, dataUpload.Status.Phase) + } + // Verify DataUploads marked as Prepared + for _, duName := range test.prepareddDataUploads { + dataUpload := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload) + require.NoError(t, err) + assert.Equal(t, velerov2alpha1api.DataUploadPhasePrepared, dataUpload.Status.Phase) + } + } + }) + } +}