diff --git a/pkg/cmd/cli/restic/server.go b/pkg/cmd/cli/restic/server.go index 5a37ffee8..b056c2189 100644 --- a/pkg/cmd/cli/restic/server.go +++ b/pkg/cmd/cli/restic/server.go @@ -22,7 +22,9 @@ import ( "net/http" "os" "strings" + "time" + "github.com/apex/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -32,11 +34,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -50,6 +54,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/restic" "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/util/logging" ) @@ -105,6 +110,7 @@ type resticServer struct { metrics *metrics.ServerMetrics metricsAddress string namespace string + nodeName string } func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAddress string) (*resticServer, error) { @@ -121,11 +127,13 @@ func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAd v1.AddToScheme(scheme) storagev1api.AddToScheme(scheme) + nodeName := os.Getenv("NODE_NAME") + // use a field selector to filter to only pods scheduled on this node. cacheOption := cache.Options{ SelectorsByObject: cache.SelectorsByObject{ &v1.Pod{}: { - Field: fields.Set{"spec.nodeName": os.Getenv("NODE_NAME")}.AsSelector(), + Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(), }, }, } @@ -145,6 +153,7 @@ func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAd mgr: mgr, metricsAddress: metricAddress, namespace: factory.Namespace(), + nodeName: nodeName, } // the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot @@ -173,7 +182,9 @@ func (s *resticServer) run() { }() s.metrics = metrics.NewResticServerMetrics() s.metrics.RegisterAllMetrics() - s.metrics.InitResticMetricsForNode(os.Getenv("NODE_NAME")) + s.metrics.InitResticMetricsForNode(s.nodeName) + + s.markInProgressCRsFailed() s.logger.Info("Starting controllers") @@ -193,7 +204,7 @@ func (s *resticServer) run() { Clock: clock.RealClock{}, Metrics: s.metrics, CredsFileStore: credentialFileStore, - NodeName: os.Getenv("NODE_NAME"), + NodeName: s.nodeName, FileSystem: filesystem.NewFileSystem(), ResticExec: restic.BackupExec{}, Log: s.logger, @@ -229,7 +240,7 @@ func (s *resticServer) validatePodVolumesHostPath(client kubernetes.Interface) e } } - pods, err := client.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", os.Getenv("NODE_NAME"))}) + pods, err := client.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", s.nodeName)}) if err != nil { return errors.WithStack(err) } @@ -259,3 +270,83 @@ func (s *resticServer) validatePodVolumesHostPath(client kubernetes.Interface) e return nil } + +// if there is a restarting during the reconciling of pvbs/pvrs/etc, these CRs may be stuck in progress status +// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue +func (s *resticServer) markInProgressCRsFailed() { + // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here + client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()}) + if err != nil { + log.WithError(errors.WithStack(err)).Error("failed to create client") + return + } + + s.markInProgressPVBsFailed(client) + + s.markInProgressPVRsFailed(client) +} + +func (s *resticServer) markInProgressPVBsFailed(client ctrlclient.Client) { + pvbs := &velerov1api.PodVolumeBackupList{} + if err := client.List(s.ctx, pvbs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list podvolumebackups") + return + } + for _, pvb := range pvbs.Items { + if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseInProgress { + log.Debugf("the status of podvolumebackup %q is %q, skip", pvb.GetName(), pvb.Status.Phase) + continue + } + if pvb.Spec.Node != s.nodeName { + log.Debugf("the node of podvolumebackup %q is %q, not %q, skip", pvb.GetName(), pvb.Spec.Node, s.nodeName) + continue + } + updated := pvb.DeepCopy() + updated.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed + updated.Status.Message = fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, updated.Status.Phase) + updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()} + if err := kube.Patch(s.ctx, &pvb, updated, client); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName()) + continue + } + log.WithField("podvolumebackup", pvb.GetName()).Warn(updated.Status.Message) + } +} + +func (s *resticServer) markInProgressPVRsFailed(client ctrlclient.Client) { + pvrs := &velerov1api.PodVolumeRestoreList{} + if err := client.List(s.ctx, pvrs, &ctrlclient.MatchingFields{"metadata.namespace": s.namespace}); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list podvolumerestores") + return + } + for _, pvr := range pvrs.Items { + if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress { + log.Debugf("the status of podvolumerestore %q is %q, skip", pvr.GetName(), pvr.Status.Phase) + continue + } + + pod := &v1.Pod{} + if err := client.Get(s.ctx, types.NamespacedName{ + Namespace: pvr.Spec.Pod.Namespace, + Name: pvr.Spec.Pod.Name, + }, pod); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to get pod \"%s/%s\" of podvolumerestore %q", + pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name, pvr.GetName()) + continue + } + if pod.Spec.NodeName != s.nodeName { + log.Debugf("the node of pod referenced by podvolumebackup %q is %q, not %q, skip", pvr.GetName(), pod.Spec.NodeName, s.nodeName) + continue + } + + updated := pvr.DeepCopy() + updated.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed + updated.Status.Message = fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, updated.Status.Phase) + updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()} + if err := kube.Patch(s.ctx, &pvr, updated, client); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName()) + continue + } + log.WithField("podvolumerestore", pvr.GetName()).Warn(updated.Status.Message) + } +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index be59459c1..3c83ba75a 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -74,6 +74,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/vmware-tanzu/velero/internal/storage" @@ -378,6 +379,8 @@ func (s *server) run() error { return err } + markInProgressCRsFailed(s.ctx, s.mgr.GetConfig(), s.mgr.GetScheme(), s.namespace, s.logger) + if err := s.runControllers(s.config.defaultVolumeSnapshotLocations); err != nil { return err } @@ -925,3 +928,65 @@ func (w *CSIInformerFactoryWrapper) WaitForCacheSync(stopCh <-chan struct{}) map } return nil } + +// if there is a restarting during the reconciling of backups/restores/etc, these CRs may be stuck in progress status +// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue +func markInProgressCRsFailed(ctx context.Context, cfg *rest.Config, scheme *runtime.Scheme, namespace string, log logrus.FieldLogger) { + // the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here + client, err := ctrlclient.New(cfg, ctrlclient.Options{Scheme: scheme}) + if err != nil { + log.WithError(errors.WithStack(err)).Error("failed to create client") + return + } + + markInProgressBackupsFailed(ctx, client, namespace, log) + + markInProgressRestoresFailed(ctx, client, namespace, log) +} + +func markInProgressBackupsFailed(ctx context.Context, client ctrlclient.Client, namespace string, log logrus.FieldLogger) { + backups := &velerov1api.BackupList{} + if err := client.List(ctx, backups, &ctrlclient.MatchingFields{"metadata.namespace": namespace}); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list backups") + return + } + + for _, backup := range backups.Items { + if backup.Status.Phase != velerov1api.BackupPhaseInProgress { + log.Debugf("the status of backup %q is %q, skip", backup.GetName(), backup.Status.Phase) + continue + } + updated := backup.DeepCopy() + updated.Status.Phase = velerov1api.BackupPhaseFailed + updated.Status.FailureReason = fmt.Sprintf("get a backup with status %q during the server starting, mark it as %q", velerov1api.BackupPhaseInProgress, updated.Status.Phase) + updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()} + if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&backup)); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to patch backup %q", backup.GetName()) + continue + } + log.WithField("backup", backup.GetName()).Warn(updated.Status.FailureReason) + } +} + +func markInProgressRestoresFailed(ctx context.Context, client ctrlclient.Client, namespace string, log logrus.FieldLogger) { + restores := &velerov1api.RestoreList{} + if err := client.List(ctx, restores, &ctrlclient.MatchingFields{"metadata.namespace": namespace}); err != nil { + log.WithError(errors.WithStack(err)).Error("failed to list restores") + return + } + for _, restore := range restores.Items { + if restore.Status.Phase != velerov1api.RestorePhaseInProgress { + log.Debugf("the status of restore %q is %q, skip", restore.GetName(), restore.Status.Phase) + continue + } + updated := restore.DeepCopy() + updated.Status.Phase = velerov1api.RestorePhaseFailed + updated.Status.FailureReason = fmt.Sprintf("get a restore with status %q during the server starting, mark it as %q", velerov1api.RestorePhaseInProgress, updated.Status.Phase) + updated.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()} + if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&restore)); err != nil { + log.WithError(errors.WithStack(err)).Errorf("failed to patch restore %q", restore.GetName()) + continue + } + log.WithField("restore", restore.GetName()).Warn(updated.Status.FailureReason) + } +} diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 4aea25156..aeadc8500 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -157,12 +157,13 @@ func NewBackupController( backup := obj.(*velerov1api.Backup) switch backup.Status.Phase { - case "", velerov1api.BackupPhaseNew, velerov1api.BackupPhaseInProgress: + case "", velerov1api.BackupPhaseNew: + // only process new backups default: c.logger.WithFields(logrus.Fields{ "backup": kubeutil.NamespaceAndName(backup), "phase": backup.Status.Phase, - }).Debug("Backup is not new or in-progress, skipping") + }).Debug("Backup is not new, skipping") return } @@ -250,22 +251,7 @@ func (c *backupController) processBackup(key string) error { // this key (even though it was a no-op). switch original.Status.Phase { case "", velerov1api.BackupPhaseNew: - case velerov1api.BackupPhaseInProgress: - // A backup may stay in-progress forever because of - // 1) the controller restarts during the processing of a backup - // 2) the backup with in-progress status isn't updated to completed or failed status successfully - // So we try to mark such Backups as failed to avoid it - updated := original.DeepCopy() - updated.Status.Phase = velerov1api.BackupPhaseFailed - updated.Status.FailureReason = fmt.Sprintf("got a Backup with unexpected status %q, this may be due to a restart of the controller during the backing up, mark it as %q", - velerov1api.BackupPhaseInProgress, updated.Status.Phase) - updated.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} - _, err = patchBackup(original, updated, c.client) - if err != nil { - return errors.Wrapf(err, "error updating Backup status to %s", updated.Status.Phase) - } - log.Warn(updated.Status.FailureReason) - return nil + // only process new backups default: return nil } diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index fa1a95a38..29e7ecc9c 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -94,6 +94,11 @@ func TestProcessBackupNonProcessedItems(t *testing.T) { key: "velero/backup-1", backup: defaultBackup().Phase(velerov1api.BackupPhaseFailedValidation).Result(), }, + { + name: "InProgress backup is not processed", + key: "velero/backup-1", + backup: defaultBackup().Phase(velerov1api.BackupPhaseInProgress).Result(), + }, { name: "Completed backup is not processed", key: "velero/backup-1", @@ -135,28 +140,6 @@ func TestProcessBackupNonProcessedItems(t *testing.T) { } } -func TestMarkInProgressBackupAsFailed(t *testing.T) { - backup := defaultBackup().Phase(velerov1api.BackupPhaseInProgress).Result() - clientset := fake.NewSimpleClientset(backup) - sharedInformers := informers.NewSharedInformerFactory(clientset, 0) - logger := logging.DefaultLogger(logrus.DebugLevel, logging.FormatText) - - c := &backupController{ - genericController: newGenericController("backup-test", logger), - client: clientset.VeleroV1(), - lister: sharedInformers.Velero().V1().Backups().Lister(), - clock: &clock.RealClock{}, - } - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(backup)) - - err := c.processBackup(fmt.Sprintf("%s/%s", backup.Namespace, backup.Name)) - require.Nil(t, err) - - res, err := clientset.VeleroV1().Backups(backup.Namespace).Get(context.TODO(), backup.Name, metav1.GetOptions{}) - require.NoError(t, err) - assert.Equal(t, velerov1api.BackupPhaseFailed, res.Status.Phase) -} - func TestProcessBackupValidationFailures(t *testing.T) { defaultBackupLocation := builder.ForBackupStorageLocation("velero", "loc-1").Result() diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 3656bd081..b8089a835 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -93,20 +93,9 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ switch pvb.Status.Phase { case "", velerov1api.PodVolumeBackupPhaseNew: - case velerov1api.PodVolumeBackupPhaseInProgress: - original := pvb.DeepCopy() - pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed - pvb.Status.Message = fmt.Sprintf("got a PodVolumeBackup with unexpected status %q, this may be due to a restart of the controller during the backing up, mark it as %q", - velerov1api.PodVolumeBackupPhaseInProgress, pvb.Status.Phase) - pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if err := kube.Patch(ctx, original, &pvb, r.Client); err != nil { - log.WithError(err).Error("error updating PodVolumeBackup status") - return ctrl.Result{}, err - } - log.Warn(pvb.Status.Message) - return ctrl.Result{}, nil + // Only process new items. default: - log.Debug("PodVolumeBackup is not new or in-progress, not processing") + log.Debug("PodVolumeBackup is not new, not processing") return ctrl.Result{}, nil } @@ -298,7 +287,7 @@ func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.Po func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed - pvb.Status.Message = msg + pvb.Status.Message = errors.WithMessage(err, msg).Error() pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if err = kube.Patch(ctx, original, pvb, r.Client); err != nil { diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index 1a67b6995..ffc5f662c 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -179,16 +179,16 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), expectedRequeue: ctrl.Result{}, }), - Entry("in progress phase pvb on same node should be marked as failed", request{ + Entry("in progress phase pvb on same node should not be processed", request{ pvb: pvbBuilder(). Phase(velerov1api.PodVolumeBackupPhaseInProgress). Node("test_node"). Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - expectedProcessed: true, + expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). - Phase(velerov1api.PodVolumeBackupPhaseFailed). + Phase(velerov1api.PodVolumeBackupPhaseInProgress). Result(), expectedRequeue: ctrl.Result{}, }), diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index a35ca7467..6666e32ba 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -134,6 +134,11 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req } func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) { + if !isPVRNew(pvr) { + log.Debug("PodVolumeRestore is not new, skip") + return false, nil, nil + } + // we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller // so we don't need to compare the node anymore pod := &corev1api.Pod{} @@ -146,28 +151,6 @@ func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logr return false, nil, err } - // the status checking logic must be put after getting the PVR's pod because that the getting pod logic - // makes sure the PVR's pod is on the same node with the controller. The controller should only process - // the PVRs on the same node - switch pvr.Status.Phase { - case "", velerov1api.PodVolumeRestorePhaseNew: - case velerov1api.PodVolumeRestorePhaseInProgress: - original := pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed - pvr.Status.Message = fmt.Sprintf("got a PodVolumeRestore with unexpected status %q, this may be due to a restart of the controller during the restoring, mark it as %q", - velerov1api.PodVolumeRestorePhaseInProgress, pvr.Status.Phase) - pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} - if err := kube.Patch(ctx, original, pvr, c.Client); err != nil { - log.WithError(err).Error("Unable to update status to failed") - return false, nil, err - } - log.Warn(pvr.Status.Message) - return false, nil, nil - default: - log.Debug("PodVolumeRestore is not new or in-progress, skip") - return false, nil, nil - } - if !isResticInitContainerRunning(pod) { log.Debug("Pod is not running restic-wait init container, skip") return false, nil, nil @@ -209,6 +192,10 @@ func (c *PodVolumeRestoreReconciler) findVolumeRestoresForPod(pod client.Object) return requests } +func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool { + return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew +} + func isResticInitContainerRunning(pod *corev1api.Pod) bool { // Restic wait container can be anywhere in the list of init containers, but must be running. i := getResticInitContainerIndex(pod) diff --git a/pkg/controller/pod_volume_restore_controller_test.go b/pkg/controller/pod_volume_restore_controller_test.go index 220e00426..69bc313a3 100644 --- a/pkg/controller/pod_volume_restore_controller_test.go +++ b/pkg/controller/pod_volume_restore_controller_test.go @@ -27,7 +27,6 @@ import ( corev1api "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -44,48 +43,15 @@ func TestShouldProcess(t *testing.T) { obj *velerov1api.PodVolumeRestore pod *corev1api.Pod shouldProcessed bool - expectedPhase velerov1api.PodVolumeRestorePhase }{ { - name: "Unable to get pvr's pod should not be processed", + name: "InProgress phase pvr should not be processed", obj: &velerov1api.PodVolumeRestore{ - Spec: velerov1api.PodVolumeRestoreSpec{ - Pod: corev1api.ObjectReference{ - Namespace: "ns-1", - Name: "pod-1", - }, - }, - Status: velerov1api.PodVolumeRestoreStatus{ - Phase: "", - }, - }, - shouldProcessed: false, - }, - { - name: "InProgress phase pvr should be marked as failed", - obj: &velerov1api.PodVolumeRestore{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "pvr-1", - }, - Spec: velerov1api.PodVolumeRestoreSpec{ - Pod: corev1api.ObjectReference{ - Namespace: "ns-1", - Name: "pod-1", - }, - }, Status: velerov1api.PodVolumeRestoreStatus{ Phase: velerov1api.PodVolumeRestorePhaseInProgress, }, }, - pod: &corev1api.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns-1", - Name: "pod-1", - }, - }, shouldProcessed: false, - expectedPhase: velerov1api.PodVolumeRestorePhaseFailed, }, { name: "Completed phase pvr should not be processed", @@ -94,22 +60,10 @@ func TestShouldProcess(t *testing.T) { Namespace: "velero", Name: "pvr-1", }, - Spec: velerov1api.PodVolumeRestoreSpec{ - Pod: corev1api.ObjectReference{ - Namespace: "ns-1", - Name: "pod-1", - }, - }, Status: velerov1api.PodVolumeRestoreStatus{ Phase: velerov1api.PodVolumeRestorePhaseCompleted, }, }, - pod: &corev1api.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns-1", - Name: "pod-1", - }, - }, shouldProcessed: false, }, { @@ -119,6 +73,15 @@ func TestShouldProcess(t *testing.T) { Namespace: "velero", Name: "pvr-1", }, + Status: velerov1api.PodVolumeRestoreStatus{ + Phase: velerov1api.PodVolumeRestorePhaseFailed, + }, + }, + shouldProcessed: false, + }, + { + name: "Unable to get pvr's pod should not be processed", + obj: &velerov1api.PodVolumeRestore{ Spec: velerov1api.PodVolumeRestoreSpec{ Pod: corev1api.ObjectReference{ Namespace: "ns-1", @@ -126,13 +89,7 @@ func TestShouldProcess(t *testing.T) { }, }, Status: velerov1api.PodVolumeRestoreStatus{ - Phase: velerov1api.PodVolumeRestorePhaseFailed, - }, - }, - pod: &corev1api.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns-1", - Name: "pod-1", + Phase: "", }, }, shouldProcessed: false, @@ -244,12 +201,6 @@ func TestShouldProcess(t *testing.T) { shouldProcess, _, _ := c.shouldProcess(ctx, c.logger, ts.obj) require.Equal(t, ts.shouldProcessed, shouldProcess) - if len(ts.expectedPhase) > 0 { - pvr := &velerov1api.PodVolumeRestore{} - err := c.Client.Get(ctx, types.NamespacedName{Namespace: ts.obj.Namespace, Name: ts.obj.Name}, pvr) - require.Nil(t, err) - assert.Equal(t, ts.expectedPhase, pvr.Status.Phase) - } }) } } diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 910bd56cd..092b90002 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -144,12 +144,13 @@ func NewRestoreController( restore := obj.(*api.Restore) switch restore.Status.Phase { - case "", api.RestorePhaseNew, api.RestorePhaseInProgress: + case "", api.RestorePhaseNew: + // only process new restores default: c.logger.WithFields(logrus.Fields{ "restore": kubeutil.NamespaceAndName(restore), "phase": restore.Status.Phase, - }).Debug("Restore is not new or in-progress, skipping") + }).Debug("Restore is not new, skipping") return } @@ -201,21 +202,7 @@ func (c *restoreController) processQueueItem(key string) error { // is ("" | New) switch restore.Status.Phase { case "", api.RestorePhaseNew: - case api.RestorePhaseInProgress: - // A restore may stay in-progress forever because of - // 1) the controller restarts during the processing of a restore - // 2) the restore with in-progress status isn't updated to completed or failed status successfully - // So we try to mark such restores as failed to avoid it - updated := restore.DeepCopy() - updated.Status.Phase = api.RestorePhaseFailed - updated.Status.FailureReason = fmt.Sprintf("got a Restore with unexpected status %q, this may be due to a restart of the controller during the restore, mark it as %q", - api.RestorePhaseInProgress, updated.Status.Phase) - _, err = patchRestore(restore, updated, c.restoreClient) - if err != nil { - return errors.Wrapf(err, "error updating Restore status to %s", updated.Status.Phase) - } - log.Warn(updated.Status.FailureReason) - return nil + // only process new restores default: return nil } diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index d7ee568a5..16dd444e9 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" "io/ioutil" "testing" "time" @@ -171,6 +170,11 @@ func TestProcessQueueItemSkips(t *testing.T) { restoreKey: "foo/bar", expectError: true, }, + { + name: "restore with phase InProgress does not get processed", + restoreKey: "foo/bar", + restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseInProgress).Result(), + }, { name: "restore with phase Completed does not get processed", restoreKey: "foo/bar", @@ -222,31 +226,6 @@ func TestProcessQueueItemSkips(t *testing.T) { } } -func TestMarkInProgressRestoreAsFailed(t *testing.T) { - var ( - restore = builder.ForRestore("velero", "bar").Phase(velerov1api.RestorePhaseInProgress).Result() - client = fake.NewSimpleClientset(restore) - sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = velerotest.NewLogger() - ) - - c := restoreController{ - genericController: newGenericController("restore-test", logger), - restoreClient: client.VeleroV1(), - restoreLister: sharedInformers.Velero().V1().Restores().Lister(), - } - - err := sharedInformers.Velero().V1().Restores().Informer().GetStore().Add(restore) - require.Nil(t, err) - - err = c.processQueueItem(fmt.Sprintf("%s/%s", restore.Namespace, restore.Name)) - require.Nil(t, err) - - res, err := c.restoreClient.Restores(restore.Namespace).Get(context.Background(), restore.Name, metav1.GetOptions{}) - require.Nil(t, err) - assert.Equal(t, velerov1api.RestorePhaseFailed, res.Status.Phase) -} - func TestProcessQueueItem(t *testing.T) { defaultStorageLocation := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()