From d73cef3b9416239d3fbc904be07c031d9182ff9b Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 19 Jun 2025 16:22:44 +0800 Subject: [PATCH] handle velero server and node-agent restarts for vgdp ms for pvb Signed-off-by: Lyndon-Li --- changelogs/unreleased/9030-Lyndon-Li | 1 + pkg/cmd/server/server.go | 88 ------------------- .../pod_volume_restore_controller.go | 32 +++---- .../pod_volume_restore_controller_test.go | 2 +- 4 files changed, 18 insertions(+), 105 deletions(-) create mode 100644 changelogs/unreleased/9030-Lyndon-Li diff --git a/changelogs/unreleased/9030-Lyndon-Li b/changelogs/unreleased/9030-Lyndon-Li new file mode 100644 index 000000000..c423b4840 --- /dev/null +++ b/changelogs/unreleased/9030-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8962, resume PVB/PVR during node-agent restarts \ No newline at end of file diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 99dbf6c19..50b91d71b 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -961,7 +961,6 @@ func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client, } log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason) markDataUploadsCancel(ctx, client, backup, log) - markPodVolumeBackupsCancel(ctx, client, backup, log) } } @@ -986,7 +985,6 @@ func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client, } log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason) markDataDownloadsCancel(ctx, client, restore, log) - markPodVolumeRestoresCancel(ctx, client, restore, log) } } @@ -1071,89 +1069,3 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest } } } - -func markPodVolumeBackupsCancel(ctx context.Context, client ctrlclient.Client, backup velerov1api.Backup, log logrus.FieldLogger) { - pvbs := &velerov1api.PodVolumeBackupList{} - - if err := client.List(ctx, pvbs, &ctrlclient.ListOptions{ - Namespace: backup.GetNamespace(), - LabelSelector: labels.Set(map[string]string{ - velerov1api.BackupUIDLabel: string(backup.GetUID()), - }).AsSelector(), - }); err != nil { - log.WithError(errors.WithStack(err)).Error("failed to list pvbs") - return - } - - for i := range pvbs.Items { - pvb := pvbs.Items[i] - if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted || - pvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared || - pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress || - pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseNew || - pvb.Status.Phase == "" { - err := controller.UpdatePVBWithRetry(ctx, client, types.NamespacedName{Namespace: pvb.Namespace, Name: pvb.Name}, log.WithField("pvb", pvb.Name), - func(pvb *velerov1api.PodVolumeBackup) bool { - if pvb.Spec.Cancel { - return false - } - - pvb.Spec.Cancel = true - pvb.Status.Message = fmt.Sprintf("pvb is in status %q during the velero server starting, mark it as cancel", pvb.Status.Phase) - - return true - }) - - if err != nil { - log.WithError(errors.WithStack(err)).Errorf("failed to mark pvb %q cancel", pvb.GetName()) - continue - } - log.WithField("pvb", pvb.GetName()).Warn(pvb.Status.Message) - } - } -} - -func markPodVolumeRestoresCancel(ctx context.Context, client ctrlclient.Client, restore velerov1api.Restore, log logrus.FieldLogger) { - pvrs := &velerov1api.PodVolumeRestoreList{} - - if err := client.List(ctx, pvrs, &ctrlclient.ListOptions{ - Namespace: restore.GetNamespace(), - LabelSelector: labels.Set(map[string]string{ - velerov1api.RestoreUIDLabel: string(restore.GetUID()), - }).AsSelector(), - }); err != nil { - log.WithError(errors.WithStack(err)).Error("failed to list pvrs") - return - } - - for i := range pvrs.Items { - pvr := pvrs.Items[i] - if pvr.Spec.UploaderType == uploader.ResticType { - log.WithField("pvr", pvr.GetName()).Warn("Found a legacy pvr during velero server restart, cannot stop it") - } else { - if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted || - pvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared || - pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress || - pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew || - pvr.Status.Phase == "" { - err := controller.UpdatePVRWithRetry(ctx, client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log.WithField("pvr", pvr.Name), - func(pvr *velerov1api.PodVolumeRestore) bool { - if pvr.Spec.Cancel { - return false - } - - pvr.Spec.Cancel = true - pvr.Status.Message = fmt.Sprintf("pvr is in status %q during the velero server starting, mark it as cancel", pvr.Status.Phase) - - return true - }) - - if err != nil { - log.WithError(errors.WithStack(err)).Errorf("failed to mark pvr %q cancel", pvr.GetName()) - continue - } - log.WithField("pvr", pvr.GetName()).Warn(pvr.Status.Message) - } - } - } -} diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index f72c66025..b9a76b73c 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -911,10 +911,10 @@ func UpdatePVRWithRetry(ctx context.Context, client client.Client, namespacedNam var funcResumeCancellablePVR = (*PodVolumeRestoreReconciler).resumeCancellableDataPath -func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logger *logrus.Entry, ns string) error { +func (r *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logger *logrus.Entry, ns string) error { pvrs := &velerov1api.PodVolumeRestoreList{} - if err := c.client.List(ctx, pvrs, &client.ListOptions{Namespace: ns}); err != nil { - c.logger.WithError(errors.WithStack(err)).Error("failed to list PVRs") + if err := r.client.List(ctx, pvrs, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(errors.WithStack(err)).Error("failed to list PVRs") return errors.Wrapf(err, "error to list PVRs") } @@ -925,21 +925,21 @@ func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logge } if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { - if pvr.Status.Node != c.nodeName { - logger.WithField("PVR", pvr.Name).WithField("current node", c.nodeName).Infof("PVR should be resumed by another node %s", pvr.Status.Node) + if pvr.Status.Node != r.nodeName { + logger.WithField("PVR", pvr.Name).WithField("current node", r.nodeName).Infof("PVR should be resumed by another node %s", pvr.Status.Node) continue } - err := funcResumeCancellablePVR(c, ctx, pvr, logger) + err := funcResumeCancellablePVR(r, ctx, pvr, logger) if err == nil { - logger.WithField("PVR", pvr.Name).WithField("current node", c.nodeName).Info("Completed to resume in progress PVR") + logger.WithField("PVR", pvr.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress PVR") continue } logger.WithField("PVR", pvr.GetName()).WithError(err).Warn("Failed to resume data path for PVR, have to cancel it") resumeErr := err - err = UpdatePVRWithRetry(ctx, c.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, logger.WithField("PVR", pvr.Name), + err = UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, logger.WithField("PVR", pvr.Name), func(pvr *velerov1api.PodVolumeRestore) bool { if pvr.Spec.Cancel { return false @@ -961,10 +961,10 @@ func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logge return nil } -func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) error { +func (r *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) error { log.Info("Resume cancelable PVR") - res, err := c.exposer.GetExposed(ctx, getPVROwnerObject(pvr), c.client, c.nodeName, c.resourceTimeout) + res, err := r.exposer.GetExposed(ctx, getPVROwnerObject(pvr), r.client, r.nodeName, r.resourceTimeout) if err != nil { return errors.Wrapf(err, "error to get exposed PVR %s", pvr.Name) } @@ -974,13 +974,13 @@ func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Conte } callbacks := datapath.Callbacks{ - OnCompleted: c.OnDataPathCompleted, - OnFailed: c.OnDataPathFailed, - OnCancelled: c.OnDataPathCancelled, - OnProgress: c.OnDataPathProgress, + OnCompleted: r.OnDataPathCompleted, + OnFailed: r.OnDataPathFailed, + OnCancelled: r.OnDataPathCancelled, + OnProgress: r.OnDataPathProgress, } - asyncBR, err := c.dataPathMgr.CreateMicroServiceBRWatcher(ctx, c.client, c.kubeClient, c.mgr, datapath.TaskTypeRestore, pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, true, log) + asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, true, log) if err != nil { return errors.Wrapf(err, "error to create asyncBR watcher for PVR %s", pvr.Name) } @@ -988,7 +988,7 @@ func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Conte resumeComplete := false defer func() { if !resumeComplete { - c.closeDataPath(ctx, pvr.Name) + r.closeDataPath(ctx, pvr.Name) } }() diff --git a/pkg/controller/pod_volume_restore_controller_test.go b/pkg/controller/pod_volume_restore_controller_test.go index 2ec17c43a..47d72a558 100644 --- a/pkg/controller/pod_volume_restore_controller_test.go +++ b/pkg/controller/pod_volume_restore_controller_test.go @@ -1501,7 +1501,7 @@ func TestAttemptPVRResume(t *testing.T) { r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{}) }() - assert.NoError(t, r.client.Create(ctx, test.pvr)) + require.NoError(t, r.client.Create(ctx, test.pvr)) dt := &pvbResumeTestHelper{ resumeErr: test.resumeErr,