diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 158327e06..2c3b8c75e 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -276,8 +276,6 @@ func (s *nodeAgentServer) run() { s.metrics.RegisterAllMetrics() s.metrics.InitMetricsForNode(s.nodeName) - s.markInProgressCRsFailed() - s.logger.Info("Starting controllers") var loadAffinity *kube.LoadAffinity @@ -307,7 +305,8 @@ func (s *nodeAgentServer) run() { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup) } - if err := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger).SetupWithManager(s.mgr); err != nil { + pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger) + if err := pvrReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } @@ -358,6 +357,16 @@ func (s *nodeAgentServer) run() { if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume") } + + if err := pvbReconciler.AttemptPVBResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt PVB resume") + } + + if err := pvrReconciler.AttemptPVRResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt PVR resume") + } + + s.markLegacyPVRsFailed(s.mgr.GetClient()) }() s.logger.Info("Controllers starting...") @@ -441,54 +450,18 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface return nil } -// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress status -// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue -func (s *nodeAgentServer) markInProgressCRsFailed() { - // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here - client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) - if err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to create client") - return - } - - s.markInProgressPVBsFailed(client) - - s.markInProgressPVRsFailed(client) -} - -func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) { - pvbs := &velerov1api.PodVolumeBackupList{} - if err := client.List(s.ctx, pvbs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to list podvolumebackups") - return - } - for i, pvb := range pvbs.Items { - if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseInProgress { - s.logger.Debugf("the status of podvolumebackup %q is %q, skip", pvb.GetName(), pvb.Status.Phase) - continue - } - if pvb.Spec.Node != s.nodeName { - s.logger.Debugf("the node of podvolumebackup %q is %q, not %q, skip", pvb.GetName(), pvb.Spec.Node, s.nodeName) - continue - } - - if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i], - fmt.Errorf("found a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed), - "", time.Now(), s.logger); err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName()) - continue - } - s.logger.WithField("podvolumebackup", pvb.GetName()).Warn(pvb.Status.Message) - } -} - -func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { +func (s *nodeAgentServer) markLegacyPVRsFailed(client ctrlclient.Client) { pvrs := &velerov1api.PodVolumeRestoreList{} if err := client.List(s.ctx, pvrs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to list podvolumerestores") return } + for i, pvr := range pvrs.Items { + if !controller.IsLegacyPVR(&pvr) { + continue + } + if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress { s.logger.Debugf("the status of podvolumerestore %q is %q, skip", pvr.GetName(), pvr.Status.Phase) continue @@ -509,7 +482,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { } if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], errors.New("cannot survive from node-agent restart"), - fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed), + fmt.Sprintf("get a legacy podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed), time.Now(), s.logger); err != nil { s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName()) continue diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 50b91d71b..99dbf6c19 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -961,6 +961,7 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client, } log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason) markDataUploadsCancel(ctx, client, backup, log) + markPodVolumeBackupsCancel(ctx, client, backup, log) } } @@ -985,6 +986,7 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client, } log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason) markDataDownloadsCancel(ctx, client, restore, log) + markPodVolumeRestoresCancel(ctx, client, restore, log) } } @@ -1069,3 +1071,89 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest } } } + +func markPodVolumeBackupsCancel(ctx context.Context, client ctrlclient.Client, backup velerov1api.Backup, log logrus.FieldLogger) { + pvbs := &velerov1api.PodVolumeBackupList{} + + if err := client.List(ctx, pvbs, &ctrlclient.ListOptions{ + Namespace: backup.GetNamespace(), + LabelSelector: labels.Set(map[string]string{ + velerov1api.BackupUIDLabel: string(backup.GetUID()), + }).AsSelector(), + }); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list pvbs") + return + } + + for i := range pvbs.Items { + pvb := pvbs.Items[i] + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseNew || + pvb.Status.Phase == "" { + err := controller.UpdatePVBWithRetry(ctx, client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log.WithField("pvb", pvb.Name), + func(pvb *velerov1api.PodVolumeBackup) bool { + if pvb.Spec.Cancel { + return false + } + + pvb.Spec.Cancel = true + pvb.Status.Message = fmt.Sprintf("pvb is in status %q during the velero server starting, mark it as cancel", pvb.Status.Phase) + + return true + }) + + if err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to mark pvb %q cancel", pvb.GetName()) + continue + } + log.WithField("pvb", pvb.GetName()).Warn(pvb.Status.Message) + } + } +} + +func markPodVolumeRestoresCancel(ctx context.Context, client ctrlclient.Client, restore velerov1api.Restore, log logrus.FieldLogger) { + pvrs := &velerov1api.PodVolumeRestoreList{} + + if err := client.List(ctx, pvrs, &ctrlclient.ListOptions{ + Namespace: restore.GetNamespace(), + LabelSelector: labels.Set(map[string]string{ + velerov1api.RestoreUIDLabel: string(restore.GetUID()), + }).AsSelector(), + }); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list pvrs") + return + } + + for i := range pvrs.Items { + pvr := pvrs.Items[i] + if pvr.Spec.UploaderType == uploader.ResticType { + log.WithField("pvr", pvr.GetName()).Warn("Found a legacy pvr during velero server restart, cannot stop it") + } else { + if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted || + pvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared || + pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress || + pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew || + pvr.Status.Phase == "" { + err := controller.UpdatePVRWithRetry(ctx, client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log.WithField("pvr", pvr.Name), + func(pvr *velerov1api.PodVolumeRestore) bool { + if pvr.Spec.Cancel { + return false + } + + pvr.Spec.Cancel = true + pvr.Status.Message = fmt.Sprintf("pvr is in status %q during the velero server starting, mark it as cancel", pvr.Status.Phase) + + return true + }) + + if err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to mark pvr %q cancel", pvr.GetName()) + continue + } + log.WithField("pvr", pvr.GetName()).Warn(pvr.Status.Message) + } + } + } +} diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 6fde05784..c024cdc28 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -844,3 +844,99 @@ func UpdatePVBWithRetry(ctx context.Context, client client.Client, namespacedNam return true, nil }) } + +var funcResumeCancellablePVB = (*PodVolumeBackupReconciler).resumeCancellableDataPath + +func (r *PodVolumeBackupReconciler) AttemptPVBResume(ctx context.Context, logger *logrus.Entry, ns string) error { + pvbs := &velerov1api.PodVolumeBackupList{} + if err := r.client.List(ctx, pvbs, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list PVBs") + return errors.Wrapf(err, "error to list PVBs") + } + + for i := range pvbs.Items { + pvb := &pvbs.Items[i] + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress { + if pvb.Spec.Node != r.nodeName { + logger.WithField("PVB", pvb.Name).WithField("current node", r.nodeName).Infof("PVB should be resumed by another node %s", pvb.Spec.Node) + continue + } + + err := funcResumeCancellablePVB(r, ctx, pvb, logger) + if err == nil { + logger.WithField("PVB", pvb.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress PVB") + continue + } + + logger.WithField("PVB", pvb.GetName()).WithError(err).Warn("Failed to resume data path for PVB, have to cancel it") + + resumeErr := err + err = UpdatePVBWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, logger.WithField("PVB", pvb.Name), + func(pvb *velerov1api.PodVolumeBackup) bool { + if pvb.Spec.Cancel { + return false + } + + pvb.Spec.Cancel = true + pvb.Status.Message = fmt.Sprintf("Resume InProgress PVB failed with error %v, mark it as cancel", resumeErr) + + return true + }) + if err != nil { + logger.WithField("PVB", pvb.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger PVB cancel") + } + } else { + logger.WithField("PVB", pvb.GetName()).Infof("find a PVB with status %s", pvb.Status.Phase) + } + } + + return nil +} + +func (r *PodVolumeBackupReconciler) resumeCancellableDataPath(ctx context.Context, pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) error { + log.Info("Resume cancelable PVB") + + res, err := r.exposer.GetExposed(ctx, getPVBOwnerObject(pvb), r.client, r.nodeName, r.resourceTimeout) + if err != nil { + return errors.Wrapf(err, "error to get exposed PVB %s", pvb.Name) + } + + if res == nil { + return errors.Errorf("expose info missed for PVB %s", pvb.Name) + } + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataPathCompleted, + OnFailed: r.OnDataPathFailed, + OnCancelled: r.OnDataPathCancelled, + OnProgress: r.OnDataPathProgress, + } + + asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, pvb.Name, pvb.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvb.Name, callbacks, true, log) + if err != nil { + return errors.Wrapf(err, "error to create asyncBR watcher for PVB %s", pvb.Name) + } + + resumeComplete := false + defer func() { + if !resumeComplete { + r.closeDataPath(ctx, pvb.Name) + } + }() + + if err := asyncBR.Init(ctx, nil); err != nil { + return errors.Wrapf(err, "error to init asyncBR watcher for PVB %s", pvb.Name) + } + + if err := asyncBR.StartBackup(datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, pvb.Spec.UploaderSettings, nil); err != nil { + return errors.Wrapf(err, "error to resume asyncBR watcher for PVB %s", pvb.Name) + } + + resumeComplete = true + + log.Infof("asyncBR is resumed for PVB %s", pvb.Name) + + return nil +} diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index 38249bfa0..5931ae2f1 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" appsv1api "k8s.io/api/apps/v1" corev1api "k8s.io/api/core/v1" @@ -37,11 +38,13 @@ import ( "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" @@ -49,6 +52,8 @@ import ( velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/kube" + + datapathmocks "github.com/vmware-tanzu/velero/pkg/datapath/mocks" ) const pvbName = "pvb-1" @@ -918,3 +923,246 @@ func TestUpdatePvbWithRetry(t *testing.T) { }) } } + +type pvbResumeTestHelper struct { + resumeErr error + getExposeErr error + exposeResult *exposer.ExposeResult + asyncBR datapath.AsyncBR +} + +func (dt *pvbResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error { + return dt.resumeErr +} + +func (dt *pvbResumeTestHelper) Expose(context.Context, corev1api.ObjectReference, exposer.PodVolumeExposeParam) error { + return nil +} + +func (dt *pvbResumeTestHelper) GetExposed(context.Context, corev1api.ObjectReference, kbclient.Client, string, time.Duration) (*exposer.ExposeResult, error) { + return dt.exposeResult, dt.getExposeErr +} + +func (dt *pvbResumeTestHelper) PeekExposed(context.Context, corev1api.ObjectReference) error { + return nil +} + +func (dt *pvbResumeTestHelper) DiagnoseExpose(context.Context, corev1api.ObjectReference) string { + return "" +} + +func (dt *pvbResumeTestHelper) CleanUp(context.Context, corev1api.ObjectReference) {} + +func (dt *pvbResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, + datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return dt.asyncBR +} + +func TestAttemptPVBResume(t *testing.T) { + tests := []struct { + name string + pvbs []*velerov1api.PodVolumeBackup + pvb *velerov1api.PodVolumeBackup + needErrs []bool + acceptedPvbs []string + preparedPvbs []string + cancelledPvbs []string + inProgressPvbs []string + resumeErr error + expectedError string + }{ + { + name: "Other pvb", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhasePrepared).Result(), + }, + { + name: "InProgress pvb, not the current node", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseInProgress).Result(), + inProgressPvbs: []string{pvbName}, + }, + { + name: "InProgress pvb, resume error and update error", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseInProgress).Node("node-1").Result(), + needErrs: []bool{false, false, true, false, false, false}, + resumeErr: errors.New("fake-resume-error"), + inProgressPvbs: []string{pvbName}, + }, + { + name: "InProgress pvb, resume error and update succeed", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseInProgress).Node("node-1").Result(), + resumeErr: errors.New("fake-resume-error"), + cancelledPvbs: []string{pvbName}, + inProgressPvbs: []string{pvbName}, + }, + { + name: "InProgress pvb and resume succeed", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseInProgress).Node("node-1").Result(), + inProgressPvbs: []string{pvbName}, + }, + { + name: "Error", + needErrs: []bool{false, false, false, false, false, true}, + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhasePrepared).Result(), + expectedError: "error to list PVBs: List error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initPVBReconciler(test.needErrs...) + r.nodeName = "node-1" + require.NoError(t, err) + + assert.NoError(t, r.client.Create(ctx, test.pvb)) + + dt := &pvbResumeTestHelper{ + resumeErr: test.resumeErr, + } + + funcResumeCancellableDataBackup = dt.resumeCancellableDataPath + + // Run the test + err = r.AttemptPVBResume(ctx, r.logger.WithField("name", test.name), test.pvb.Namespace) + + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } else { + assert.NoError(t, err) + + for _, pvbName := range test.cancelledPvbs { + pvb := &velerov1api.PodVolumeBackup{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvbName}, pvb) + require.NoError(t, err) + assert.True(t, pvb.Spec.Cancel) + } + + for _, pvbName := range test.acceptedPvbs { + pvb := &velerov1api.PodVolumeBackup{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvbName}, pvb) + require.NoError(t, err) + assert.Equal(t, velerov1api.PodVolumeBackupPhaseAccepted, pvb.Status.Phase) + } + + for _, pvbName := range test.preparedPvbs { + pvb := &velerov1api.PodVolumeBackup{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvbName}, pvb) + require.NoError(t, err) + assert.Equal(t, velerov1api.PodVolumeBackupPhasePrepared, pvb.Status.Phase) + } + + for _, pvbName := range test.inProgressPvbs { + pvb := &velerov1api.PodVolumeBackup{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvbName}, pvb) + require.NoError(t, err) + assert.Equal(t, velerov1api.PodVolumeBackupPhaseInProgress, pvb.Status.Phase) + } + } + }) + } +} + +func TestResumeCancellablePodVolumeBackup(t *testing.T) { + tests := []struct { + name string + pvbs []velerov1api.PodVolumeBackup + pvb *velerov1api.PodVolumeBackup + getExposeErr error + exposeResult *exposer.ExposeResult + createWatcherErr error + initWatcherErr error + startWatcherErr error + mockInit bool + mockStart bool + mockClose bool + expectedError string + }{ + { + name: "get expose failed", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseInProgress).Result(), + getExposeErr: errors.New("fake-expose-error"), + expectedError: fmt.Sprintf("error to get exposed PVB %s: fake-expose-error", pvbName), + }, + { + name: "no expose", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseAccepted).Node("node-1").Result(), + expectedError: fmt.Sprintf("expose info missed for PVB %s", pvbName), + }, + { + name: "watcher init error", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1api.Pod{}, + }, + }, + mockInit: true, + mockClose: true, + initWatcherErr: errors.New("fake-init-watcher-error"), + expectedError: fmt.Sprintf("error to init asyncBR watcher for PVB %s: fake-init-watcher-error", pvbName), + }, + { + name: "start watcher error", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1api.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + mockClose: true, + startWatcherErr: errors.New("fake-start-watcher-error"), + expectedError: fmt.Sprintf("error to resume asyncBR watcher for PVB %s: fake-start-watcher-error", pvbName), + }, + { + name: "succeed", + pvb: pvbBuilder().Phase(velerov1api.PodVolumeBackupPhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1api.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initPVBReconciler() + r.nodeName = "node-1" + require.NoError(t, err) + + mockAsyncBR := datapathmocks.NewAsyncBR(t) + + if test.mockInit { + mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr) + } + + if test.mockStart { + mockAsyncBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr) + } + + if test.mockClose { + mockAsyncBR.On("Close", mock.Anything).Return() + } + + dt := &pvbResumeTestHelper{ + getExposeErr: test.getExposeErr, + exposeResult: test.exposeResult, + asyncBR: mockAsyncBR, + } + + r.exposer = dt + + datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher + + err = r.resumeCancellableDataPath(ctx, test.pvb, velerotest.NewLogger()) + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } + }) + } +} diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 41362871a..f72c66025 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -545,7 +545,7 @@ func (r *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvrName func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { gp := kube.NewGenericEventPredicate(func(object client.Object) bool { pvr := object.(*velerov1api.PodVolumeRestore) - if isLegacyPVR(pvr) { + if IsLegacyPVR(pvr) { return false } @@ -570,7 +570,7 @@ func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { pred := kube.NewAllEventPredicate(func(obj client.Object) bool { pvr := obj.(*velerov1api.PodVolumeRestore) - return !isLegacyPVR(pvr) + return !IsLegacyPVR(pvr) }) return ctrl.NewControllerManagedBy(mgr). @@ -620,7 +620,7 @@ func (r *PodVolumeRestoreReconciler) findPVRForTargetPod(ctx context.Context, po requests := []reconcile.Request{} for _, item := range list.Items { - if isLegacyPVR(&item) { + if IsLegacyPVR(&item) { continue } @@ -908,3 +908,103 @@ func UpdatePVRWithRetry(ctx context.Context, client client.Client, namespacedNam return true, nil }) } + +var funcResumeCancellablePVR = (*PodVolumeRestoreReconciler).resumeCancellableDataPath + +func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logger *logrus.Entry, ns string) error { + pvrs := &velerov1api.PodVolumeRestoreList{} + if err := c.client.List(ctx, pvrs, &client.ListOptions{Namespace: ns}); err != nil { + c.logger.WithError(errors.WithStack(err)).Error("failed to list PVRs") + return errors.Wrapf(err, "error to list PVRs") + } + + for i := range pvrs.Items { + pvr := &pvrs.Items[i] + if IsLegacyPVR(pvr) { + continue + } + + if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { + if pvr.Status.Node != c.nodeName { + logger.WithField("PVR", pvr.Name).WithField("current node", c.nodeName).Infof("PVR should be resumed by another node %s", pvr.Status.Node) + continue + } + + err := funcResumeCancellablePVR(c, ctx, pvr, logger) + if err == nil { + logger.WithField("PVR", pvr.Name).WithField("current node", c.nodeName).Info("Completed to resume in progress PVR") + continue + } + + logger.WithField("PVR", pvr.GetName()).WithError(err).Warn("Failed to resume data path for PVR, have to cancel it") + + resumeErr := err + err = UpdatePVRWithRetry(ctx, c.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, logger.WithField("PVR", pvr.Name), + func(pvr *velerov1api.PodVolumeRestore) bool { + if pvr.Spec.Cancel { + return false + } + + pvr.Spec.Cancel = true + pvr.Status.Message = fmt.Sprintf("Resume InProgress PVR failed with error %v, mark it as cancel", resumeErr) + + return true + }) + if err != nil { + logger.WithField("PVR", pvr.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger PVR cancel") + } + } else { + logger.WithField("PVR", pvr.GetName()).Infof("find a PVR with status %s", pvr.Status.Phase) + } + } + + return nil +} + +func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) error { + log.Info("Resume cancelable PVR") + + res, err := c.exposer.GetExposed(ctx, getPVROwnerObject(pvr), c.client, c.nodeName, c.resourceTimeout) + if err != nil { + return errors.Wrapf(err, "error to get exposed PVR %s", pvr.Name) + } + + if res == nil { + return errors.Errorf("expose info missed for PVR %s", pvr.Name) + } + + callbacks := datapath.Callbacks{ + OnCompleted: c.OnDataPathCompleted, + OnFailed: c.OnDataPathFailed, + OnCancelled: c.OnDataPathCancelled, + OnProgress: c.OnDataPathProgress, + } + + asyncBR, err := c.dataPathMgr.CreateMicroServiceBRWatcher(ctx, c.client, c.kubeClient, c.mgr, datapath.TaskTypeRestore, pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, true, log) + if err != nil { + return errors.Wrapf(err, "error to create asyncBR watcher for PVR %s", pvr.Name) + } + + resumeComplete := false + defer func() { + if !resumeComplete { + c.closeDataPath(ctx, pvr.Name) + } + }() + + if err := asyncBR.Init(ctx, nil); err != nil { + return errors.Wrapf(err, "error to init asyncBR watcher for PVR %s", pvr.Name) + } + + if err := asyncBR.StartRestore(pvr.Spec.SnapshotID, datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, pvr.Spec.UploaderSettings); err != nil { + return errors.Wrapf(err, "error to resume asyncBR watcher for PVR %s", pvr.Name) + } + + resumeComplete = true + + log.Infof("asyncBR is resumed for PVR %s", pvr.Name) + + return nil +} diff --git a/pkg/controller/pod_volume_restore_controller_legacy.go b/pkg/controller/pod_volume_restore_controller_legacy.go index b03e27b0e..731b70db9 100644 --- a/pkg/controller/pod_volume_restore_controller_legacy.go +++ b/pkg/controller/pod_volume_restore_controller_legacy.go @@ -205,7 +205,7 @@ func (c *PodVolumeRestoreReconcilerLegacy) SetupWithManager(mgr ctrl.Manager) er // By watching the pods, we can trigger the PVR reconciliation again once the pod is finally scheduled on the node. pred := kube.NewAllEventPredicate(func(obj client.Object) bool { pvr := obj.(*velerov1api.PodVolumeRestore) - return isLegacyPVR(pvr) + return IsLegacyPVR(pvr) }) return ctrl.NewControllerManagedBy(mgr).Named("podvolumerestorelegacy"). @@ -229,7 +229,7 @@ func (c *PodVolumeRestoreReconcilerLegacy) findVolumeRestoresForPod(ctx context. requests := []reconcile.Request{} for _, item := range list.Items { - if !isLegacyPVR(&item) { + if !IsLegacyPVR(&item) { continue } @@ -359,6 +359,6 @@ func (c *PodVolumeRestoreReconcilerLegacy) closeDataPath(ctx context.Context, pv c.dataPathMgr.RemoveAsyncBR(pvbName) } -func isLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool { +func IsLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool { return pvr.Spec.UploaderType == uploader.ResticType } diff --git a/pkg/controller/pod_volume_restore_controller_test.go b/pkg/controller/pod_volume_restore_controller_test.go index 81800b805..2ec17c43a 100644 --- a/pkg/controller/pod_volume_restore_controller_test.go +++ b/pkg/controller/pod_volume_restore_controller_test.go @@ -1437,3 +1437,212 @@ func TestUpdatePVRWithRetry(t *testing.T) { }) } } + +func TestAttemptPVRResume(t *testing.T) { + tests := []struct { + name string + pvrs []velerov1api.PodVolumeRestore + pvr *velerov1api.PodVolumeRestore + needErrs []bool + resumeErr error + acceptedPvrs []string + preparedPvrs []string + cancelledPvrs []string + inProgressPvrs []string + expectedError string + }{ + { + name: "Other pvr", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), + }, + { + name: "Other pvr", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + }, + { + name: "InProgress pvr, not the current node", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + inProgressPvrs: []string{pvrName}, + }, + { + name: "InProgress pvr, no resume error", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), + inProgressPvrs: []string{pvrName}, + }, + { + name: "InProgress pvr, resume error, cancel error", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), + resumeErr: errors.New("fake-resume-error"), + needErrs: []bool{false, false, true, false, false, false}, + inProgressPvrs: []string{pvrName}, + }, + { + name: "InProgress pvr, resume error, cancel succeed", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), + resumeErr: errors.New("fake-resume-error"), + cancelledPvrs: []string{pvrName}, + inProgressPvrs: []string{pvrName}, + }, + { + name: "Error", + needErrs: []bool{false, false, false, false, false, true}, + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), + expectedError: "error to list PVRs: List error", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, test.needErrs...) + r.nodeName = "node-1" + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{}) + }() + + assert.NoError(t, r.client.Create(ctx, test.pvr)) + + dt := &pvbResumeTestHelper{ + resumeErr: test.resumeErr, + } + + funcResumeCancellableDataBackup = dt.resumeCancellableDataPath + + // Run the test + err = r.AttemptPVRResume(ctx, r.logger.WithField("name", test.name), test.pvr.Namespace) + + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } else { + assert.NoError(t, err) + + for _, pvrName := range test.cancelledPvrs { + pvr := &velerov1api.PodVolumeRestore{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvrName}, pvr) + require.NoError(t, err) + assert.True(t, pvr.Spec.Cancel) + } + + for _, pvrName := range test.acceptedPvrs { + pvr := &velerov1api.PodVolumeRestore{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvrName}, pvr) + require.NoError(t, err) + assert.Equal(t, velerov1api.PodVolumeRestorePhaseAccepted, pvr.Status.Phase) + } + + for _, pvrName := range test.preparedPvrs { + pvr := &velerov1api.PodVolumeRestore{} + err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: pvrName}, pvr) + require.NoError(t, err) + assert.Equal(t, velerov1api.PodVolumeRestorePhasePrepared, pvr.Status.Phase) + } + } + }) + } +} + +func TestResumeCancellablePodVolumeRestore(t *testing.T) { + tests := []struct { + name string + pvrs []velerov1api.PodVolumeRestore + pvr *velerov1api.PodVolumeRestore + getExposeErr error + exposeResult *exposer.ExposeResult + createWatcherErr error + initWatcherErr error + startWatcherErr error + mockInit bool + mockStart bool + mockClose bool + expectedError string + }{ + { + name: "get expose failed", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + getExposeErr: errors.New("fake-expose-error"), + expectedError: fmt.Sprintf("error to get exposed PVR %s: fake-expose-error", pvrName), + }, + { + name: "no expose", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Node("node-1").Result(), + expectedError: fmt.Sprintf("expose info missed for PVR %s", pvrName), + }, + { + name: "watcher init error", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1api.Pod{}, + }, + }, + mockInit: true, + mockClose: true, + initWatcherErr: errors.New("fake-init-watcher-error"), + expectedError: fmt.Sprintf("error to init asyncBR watcher for PVR %s: fake-init-watcher-error", pvrName), + }, + { + name: "start watcher error", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1api.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + mockClose: true, + startWatcherErr: errors.New("fake-start-watcher-error"), + expectedError: fmt.Sprintf("error to resume asyncBR watcher for PVR %s: fake-start-watcher-error", pvrName), + }, + { + name: "succeed", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Node("node-1").Result(), + exposeResult: &exposer.ExposeResult{ + ByPod: exposer.ExposeByPod{ + HostingPod: &corev1api.Pod{}, + }, + }, + mockInit: true, + mockStart: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}) + r.nodeName = "node-1" + require.NoError(t, err) + + mockAsyncBR := datapathmockes.NewAsyncBR(t) + + if test.mockInit { + mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr) + } + + if test.mockStart { + mockAsyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr) + } + + if test.mockClose { + mockAsyncBR.On("Close", mock.Anything).Return() + } + + dt := &pvbResumeTestHelper{ + getExposeErr: test.getExposeErr, + exposeResult: test.exposeResult, + asyncBR: mockAsyncBR, + } + + r.exposer = dt + + datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher + + err = r.resumeCancellableDataPath(ctx, test.pvr, velerotest.NewLogger()) + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } + }) + } +}