From da3e7cfaaf61b52a8a497fba101966f7d03f1222 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Tue, 21 Apr 2026 16:15:09 +0800 Subject: [PATCH] Remove Restic code path from PodVolumeRestore. Signed-off-by: Xun Jiang --- changelogs/unreleased/9732-blackpiglet | 1 + .../velero-restore-helper.go | 2 +- .../resourcepolicies/resource_policies.go | 2 +- pkg/cmd/cli/nodeagent/server.go | 48 --- pkg/cmd/server/server.go | 4 +- .../pod_volume_restore_controller.go | 14 +- .../pod_volume_restore_controller_legacy.go | 364 ------------------ ...d_volume_restore_controller_legacy_test.go | 93 ----- .../pod_volume_restore_controller_test.go | 39 +- 9 files changed, 43 insertions(+), 524 deletions(-) create mode 100644 changelogs/unreleased/9732-blackpiglet delete mode 100644 pkg/controller/pod_volume_restore_controller_legacy.go delete mode 100644 pkg/controller/pod_volume_restore_controller_legacy_test.go diff --git a/changelogs/unreleased/9732-blackpiglet b/changelogs/unreleased/9732-blackpiglet new file mode 100644 index 000000000..d110fa2d6 --- /dev/null +++ b/changelogs/unreleased/9732-blackpiglet @@ -0,0 +1 @@ +Remove Restic code path from PodVolumeRestore. \ No newline at end of file diff --git a/cmd/velero-restore-helper/velero-restore-helper.go b/cmd/velero-restore-helper/velero-restore-helper.go index d310f807e..40a942137 100644 --- a/cmd/velero-restore-helper/velero-restore-helper.go +++ b/cmd/velero-restore-helper/velero-restore-helper.go @@ -35,7 +35,7 @@ func main() { for { <-ticker.C if done() { - fmt.Println("All restic restores are done") + fmt.Println("All PodVolumeRestores are done") err := removeFolder() if err != nil { fmt.Println(err) diff --git a/internal/resourcepolicies/resource_policies.go b/internal/resourcepolicies/resource_policies.go index 6b5046e57..3a173fce1 100644 --- a/internal/resourcepolicies/resource_policies.go +++ b/internal/resourcepolicies/resource_policies.go @@ -38,7 +38,7 @@ const ( ConfigmapRefType string = "configmap" // skip action implies the volume would be skipped from the backup operation Skip VolumeActionType = "skip" - // fs-backup action implies that the volume would be backed up via file system copy method using the uploader(kopia/restic) configured by the user + // fs-backup action implies that the volume would be backed up via file system copy method using the uploader(kopia) configured by the user FSBackup VolumeActionType = "fs-backup" // snapshot action can have 3 different meaning based on velero configuration and backup spec - cloud provider based snapshots, local csi snapshots and datamover snapshots Snapshot VolumeActionType = "snapshot" diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 7e7c86e6c..bb1764cb3 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -37,7 +37,6 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" cacheutil "k8s.io/client-go/tools/cache" @@ -430,10 +429,6 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - if err := controller.InitLegacyPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.namespace, s.config.resourceTimeout, s.logger); err != nil { - s.logger.WithError(err).Fatal("Unable to create the legacy pod volume restore controller") - } - dataUploadReconciler := controller.NewDataUploadReconciler( s.mgr.GetClient(), s.mgr, @@ -509,8 +504,6 @@ func (s *nodeAgentServer) run() { if err := pvrReconciler.AttemptPVRResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt PVR resume") } - - s.markLegacyPVRsFailed(s.mgr.GetClient()) }() s.logger.Info("Controllers starting...") @@ -604,47 +597,6 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface return nil } -func (s *nodeAgentServer) markLegacyPVRsFailed(client ctrlclient.Client) { - pvrs := &velerov1api.PodVolumeRestoreList{} - if err := client.List(s.ctx, pvrs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to list podvolumerestores") - return - } - - for i, pvr := range pvrs.Items { - if !controller.IsLegacyPVR(&pvr) { - continue - } - - if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress { - s.logger.Debugf("the status of podvolumerestore %q is %q, skip", pvr.GetName(), pvr.Status.Phase) - continue - } - - pod := &corev1api.Pod{} - if err := client.Get(s.ctx, types.NamespacedName{ - Namespace: pvr.Spec.Pod.Namespace, - Name: pvr.Spec.Pod.Name, - }, pod); err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to get pod \"%s/%s\" of podvolumerestore %q", - pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name, pvr.GetName()) - continue - } - if pod.Spec.NodeName != s.nodeName { - s.logger.Debugf("the node of pod referenced by podvolumerestore %q is %q, not %q, skip", pvr.GetName(), pod.Spec.NodeName, s.nodeName) - continue - } - - if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], errors.New("cannot survive from node-agent restart"), - fmt.Sprintf("get a legacy podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed), - time.Now(), s.logger); err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName()) - continue - } - s.logger.WithField("podvolumerestore", pvr.GetName()).Warn(pvr.Status.Message) - } -} - var getConfigsFunc = nodeagent.GetConfigs func (s *nodeAgentServer) getDataPathConfigs() error { diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index cb38a40d0..44c88b980 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -1164,8 +1164,8 @@ func markPodVolumeRestoresCancel(ctx context.Context, client ctrlclient.Client, for i := range pvrs.Items { pvr := pvrs.Items[i] - if controller.IsLegacyPVR(&pvr) { - log.WithField("PVR", pvr.GetName()).Warn("Found a legacy PVR during velero server restart, cannot stop it") + if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil { + log.WithField("PVR", pvr.Name).Warnf("invalid uploader type %s, skip marking cancel for this PVR", pvr.Spec.UploaderType) continue } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 0af0d8c86..f40de528b 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -603,7 +603,7 @@ func (r *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvrName func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { gp := kube.NewGenericEventPredicate(func(object client.Object) bool { pvr := object.(*velerov1api.PodVolumeRestore) - if IsLegacyPVR(pvr) { + if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil { return false } @@ -628,7 +628,8 @@ func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { pred := kube.NewAllEventPredicate(func(obj client.Object) bool { pvr := obj.(*velerov1api.PodVolumeRestore) - return !IsLegacyPVR(pvr) + _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType) + return err == nil }) return ctrl.NewControllerManagedBy(mgr). @@ -678,7 +679,7 @@ func (r *PodVolumeRestoreReconciler) findPVRForTargetPod(ctx context.Context, po requests := []reconcile.Request{} for _, item := range list.Items { - if IsLegacyPVR(&item) { + if _, err := uploader.ValidateUploaderType(item.Spec.UploaderType); err != nil { continue } @@ -708,6 +709,11 @@ func (r *PodVolumeRestoreReconciler) findPVRForRestorePod(ctx context.Context, p "PVR": pvr.Name, }) + if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil { + log.WithField("uploaderType", pvr.Spec.UploaderType).Debug("skip PVR with invalid uploader type") + return []reconcile.Request{} + } + if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseAccepted { return []reconcile.Request{} } @@ -1029,7 +1035,7 @@ func (r *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logge for i := range pvrs.Items { pvr := &pvrs.Items[i] - if IsLegacyPVR(pvr) { + if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil { continue } diff --git a/pkg/controller/pod_volume_restore_controller_legacy.go b/pkg/controller/pod_volume_restore_controller_legacy.go deleted file mode 100644 index 9ddececf5..000000000 --- a/pkg/controller/pod_volume_restore_controller_legacy.go +++ /dev/null @@ -1,364 +0,0 @@ -/* -Copyright The Velero Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - corev1api "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - clocks "k8s.io/utils/clock" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/vmware-tanzu/velero/internal/credentials" - veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/datapath" - "github.com/vmware-tanzu/velero/pkg/exposer" - "github.com/vmware-tanzu/velero/pkg/podvolume" - "github.com/vmware-tanzu/velero/pkg/repository" - "github.com/vmware-tanzu/velero/pkg/restorehelper" - "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/boolptr" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" - "github.com/vmware-tanzu/velero/pkg/util/kube" -) - -func InitLegacyPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, namespace string, - resourceTimeout time.Duration, logger logrus.FieldLogger) error { - log := logger.WithField("controller", "PodVolumeRestoreLegacy") - - credentialFileStore, err := credentials.NewNamespacedFileStore(client, namespace, credentials.DefaultStoreDirectory(), filesystem.NewFileSystem()) - if err != nil { - return errors.Wrapf(err, "error creating credentials file store") - } - - credSecretStore, err := credentials.NewNamespacedSecretStore(client, namespace) - if err != nil { - return errors.Wrapf(err, "error creating secret file store") - } - - credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} - ensurer := repository.NewEnsurer(client, log, resourceTimeout) - - reconciler := &PodVolumeRestoreReconcilerLegacy{ - Client: client, - kubeClient: kubeClient, - logger: log, - repositoryEnsurer: ensurer, - credentialGetter: credentialGetter, - fileSystem: filesystem.NewFileSystem(), - clock: &clocks.RealClock{}, - dataPathMgr: dataPathMgr, - } - - if err = reconciler.SetupWithManager(mgr); err != nil { - return errors.Wrapf(err, "error setup controller manager") - } - - return nil -} - -type PodVolumeRestoreReconcilerLegacy struct { - client.Client - kubeClient kubernetes.Interface - logger logrus.FieldLogger - repositoryEnsurer *repository.Ensurer - credentialGetter *credentials.CredentialGetter - fileSystem filesystem.Interface - clock clocks.WithTickerAndDelayedExecution - dataPathMgr *datapath.Manager -} - -// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch -// +kubebuilder:rbac:groups="",resources=pods,verbs=get -// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get -// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get - -func (c *PodVolumeRestoreReconcilerLegacy) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := c.logger.WithField("PodVolumeRestore", req.NamespacedName.String()) - log.Info("Reconciling PVR by legacy controller") - - pvr := &velerov1api.PodVolumeRestore{} - if err := c.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil { - if apierrors.IsNotFound(err) { - log.Warn("PodVolumeRestore not found, skip") - return ctrl.Result{}, nil - } - log.WithError(err).Error("Unable to get the PodVolumeRestore") - return ctrl.Result{}, err - } - - log = log.WithField("pod", fmt.Sprintf("%s/%s", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)) - if len(pvr.OwnerReferences) == 1 { - log = log.WithField("restore", fmt.Sprintf("%s/%s", pvr.Namespace, pvr.OwnerReferences[0].Name)) - } - - shouldProcess, pod, err := shouldProcess(ctx, c.Client, log, pvr) - if err != nil { - return ctrl.Result{}, err - } - if !shouldProcess { - return ctrl.Result{}, nil - } - - initContainerIndex := getInitContainerIndex(pod) - if initContainerIndex > 0 { - log.Warnf(`Init containers before the %s container may cause issues - if they interfere with volumes being restored: %s index %d`, restorehelper.WaitInitContainer, restorehelper.WaitInitContainer, initContainerIndex) - } - - log.Info("Restore starting") - - callbacks := datapath.Callbacks{ - OnCompleted: c.OnDataPathCompleted, - OnFailed: c.OnDataPathFailed, - OnCancelled: c.OnDataPathCancelled, - OnProgress: c.OnDataPathProgress, - } - - fsRestore, err := c.dataPathMgr.CreateFileSystemBR(pvr.Name, pVBRRequestor, ctx, c.Client, pvr.Namespace, callbacks, log) - if err != nil { - if err == datapath.ConcurrentLimitExceed { - return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil - } else { - return c.errorOut(ctx, pvr, err, "error to create data path", log) - } - } - - original := pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress - pvr.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()} - if err = c.Patch(ctx, pvr, client.MergeFrom(original)); err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error to update status to in progress", log) - } - - volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log) - if err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log) - } - - log.WithField("path", volumePath.ByPath).Debugf("Found host path") - - if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{ - BSLName: pvr.Spec.BackupStorageLocation, - SourceNamespace: pvr.Spec.SourceNamespace, - UploaderType: pvr.Spec.UploaderType, - RepositoryType: podvolume.GetPvrRepositoryType(pvr), - RepoIdentifier: pvr.Spec.RepoIdentifier, - RepositoryEnsurer: c.repositoryEnsurer, - CredentialGetter: c.credentialGetter, - }); err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error to initialize data path", log) - } - - if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, volumePath, pvr.Spec.UploaderSettings); err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error starting data path restore", log) - } - - log.WithField("path", volumePath.ByPath).Info("Async fs restore data path started") - - return ctrl.Result{}, nil -} - -func (c *PodVolumeRestoreReconcilerLegacy) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { - _ = UpdatePVRStatusToFailed(ctx, c.Client, pvr, err, msg, c.clock.Now(), log) - return ctrl.Result{}, err -} - -func (c *PodVolumeRestoreReconcilerLegacy) SetupWithManager(mgr ctrl.Manager) error { - // The pod may not being scheduled at the point when its PVRs are initially reconciled. - // By watching the pods, we can trigger the PVR reconciliation again once the pod is finally scheduled on the node. - pred := kube.NewAllEventPredicate(func(obj client.Object) bool { - pvr := obj.(*velerov1api.PodVolumeRestore) - return IsLegacyPVR(pvr) - }) - - return ctrl.NewControllerManagedBy(mgr).Named("podvolumerestorelegacy"). - For(&velerov1api.PodVolumeRestore{}, builder.WithPredicates(pred)). - Watches(&corev1api.Pod{}, handler.EnqueueRequestsFromMapFunc(c.findVolumeRestoresForPod)). - Complete(c) -} - -func (c *PodVolumeRestoreReconcilerLegacy) findVolumeRestoresForPod(ctx context.Context, pod client.Object) []reconcile.Request { - list := &velerov1api.PodVolumeRestoreList{} - options := &client.ListOptions{ - LabelSelector: labels.Set(map[string]string{ - velerov1api.PodUIDLabel: string(pod.GetUID()), - }).AsSelector(), - } - if err := c.Client.List(context.TODO(), list, options); err != nil { - c.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err). - Error("unable to list PodVolumeRestores") - return []reconcile.Request{} - } - - requests := []reconcile.Request{} - for _, item := range list.Items { - if !IsLegacyPVR(&item) { - continue - } - - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: item.GetNamespace(), - Name: item.GetName(), - }, - }) - } - return requests -} - -func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { - defer c.dataPathMgr.RemoveAsyncBR(pvrName) - - log := c.logger.WithField("pvr", pvrName) - - log.WithField("PVR", pvrName).Info("Async fs restore data path completed") - - var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVR on completion") - return - } - - volumePath := result.Restore.Target.ByPath - if volumePath == "" { - _, _ = c.errorOut(ctx, &pvr, errors.New("path is empty"), "invalid restore target", log) - return - } - - // Remove the .velero directory from the restored volume (it may contain done files from previous restores - // of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since - // this is non-essential cleanup (the done files are named based on restore UID and the init container looks - // for the one specific to the restore being executed). - if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil { - log.WithError(err).Warnf("error removing .velero directory from directory %s", volumePath) - } - - var restoreUID types.UID - for _, owner := range pvr.OwnerReferences { - if boolptr.IsSetToTrue(owner.Controller) { - restoreUID = owner.UID - break - } - } - - // Create the .velero directory within the volume dir so we can write a done file - // for this restore. - if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil { - _, _ = c.errorOut(ctx, &pvr, err, "error creating .velero directory for done file", log) - return - } - - // Write a done file with name= into the just-created .velero dir - // within the volume. The velero init container on the pod is waiting - // for this file to exist in each restored volume before completing. - if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check. - _, _ = c.errorOut(ctx, &pvr, err, "error writing done file", log) - return - } - - original := pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted - pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} - if err := c.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating PodVolumeRestore status") - } - - log.Info("Restore completed") -} - -func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { - defer c.dataPathMgr.RemoveAsyncBR(pvrName) - - log := c.logger.WithField("pvr", pvrName) - - log.WithError(err).Error("Async fs restore data path failed") - - var pvr velerov1api.PodVolumeRestore - if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { - log.WithError(getErr).Warn("Failed to get PVR on failure") - } else { - _, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log) - } -} - -func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { - defer c.dataPathMgr.RemoveAsyncBR(pvrName) - - log := c.logger.WithField("pvr", pvrName) - - log.Warn("Async fs restore data path canceled") - - var pvr velerov1api.PodVolumeRestore - if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { - log.WithError(getErr).Warn("Failed to get PVR on cancel") - } else { - _, _ = c.errorOut(ctx, &pvr, errors.New("PVR is canceled"), "data path restore canceled", log) - } -} - -func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) { - log := c.logger.WithField("pvr", pvrName) - - var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVB on progress") - return - } - - original := pvr.DeepCopy() - pvr.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} - - if err := c.Client.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Failed to update progress") - } -} - -func (c *PodVolumeRestoreReconcilerLegacy) closeDataPath(ctx context.Context, pvbName string) { - fsRestore := c.dataPathMgr.GetAsyncBR(pvbName) - if fsRestore != nil { - fsRestore.Close(ctx) - } - - c.dataPathMgr.RemoveAsyncBR(pvbName) -} - -func IsLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool { - return pvr.Spec.UploaderType == "restic" -} diff --git a/pkg/controller/pod_volume_restore_controller_legacy_test.go b/pkg/controller/pod_volume_restore_controller_legacy_test.go deleted file mode 100644 index a107603e0..000000000 --- a/pkg/controller/pod_volume_restore_controller_legacy_test.go +++ /dev/null @@ -1,93 +0,0 @@ -/* -Copyright The Velero Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "testing" - - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - corev1api "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" -) - -func TestFindVolumeRestoresForPodLegacy(t *testing.T) { - pod := &corev1api.Pod{} - pod.UID = "uid" - - scheme := runtime.NewScheme() - scheme.AddKnownTypes(velerov1api.SchemeGroupVersion, &velerov1api.PodVolumeRestore{}, &velerov1api.PodVolumeRestoreList{}) - clientBuilder := fake.NewClientBuilder().WithScheme(scheme) - - // no matching PVR - reconciler := &PodVolumeRestoreReconcilerLegacy{ - Client: clientBuilder.Build(), - logger: logrus.New(), - } - requests := reconciler.findVolumeRestoresForPod(t.Context(), pod) - assert.Empty(t, requests) - - // contain one matching PVR - reconciler.Client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{ - Items: []velerov1api.PodVolumeRestore{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pvr1", - Labels: map[string]string{ - velerov1api.PodUIDLabel: string(pod.GetUID()), - }, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pvr2", - Labels: map[string]string{ - velerov1api.PodUIDLabel: "non-matching-uid", - }, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pvr3", - Labels: map[string]string{ - velerov1api.PodUIDLabel: string(pod.GetUID()), - }, - }, - Spec: velerov1api.PodVolumeRestoreSpec{ - UploaderType: "kopia", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pvr4", - Labels: map[string]string{ - velerov1api.PodUIDLabel: string(pod.GetUID()), - }, - }, - Spec: velerov1api.PodVolumeRestoreSpec{ - UploaderType: "restic", - }, - }, - }, - }).Build() - requests = reconciler.findVolumeRestoresForPod(t.Context(), pod) - assert.Len(t, requests, 1) -} diff --git a/pkg/controller/pod_volume_restore_controller_test.go b/pkg/controller/pod_volume_restore_controller_test.go index 9f2fe7a7f..8999543d7 100644 --- a/pkg/controller/pod_volume_restore_controller_test.go +++ b/pkg/controller/pod_volume_restore_controller_test.go @@ -526,6 +526,7 @@ func TestFindPVRForTargetPod(t *testing.T) { velerov1api.PodUIDLabel: string(pod.GetUID()), }, }, + Spec: velerov1api.PodVolumeRestoreSpec{UploaderType: uploader.KopiaType}, }, { ObjectMeta: metav1.ObjectMeta{ @@ -688,6 +689,7 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { mockClose bool needExclusiveUpdateError error constrained bool + preserveEmptyUploader bool expected *velerov1api.PodVolumeRestore expectDeleted bool expectCancelRecord bool @@ -939,6 +941,13 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + if !test.preserveEmptyUploader && test.pvr != nil && test.pvr.Spec.UploaderType == "" { + test.pvr.Spec.UploaderType = uploader.KopiaType + } + if !test.preserveEmptyUploader && test.expected != nil && test.expected.Spec.UploaderType == "" { + test.expected.Spec.UploaderType = uploader.KopiaType + } + objs := []runtime.Object{daemonSet, node} ctlObj := []client.Object{} @@ -1396,7 +1405,7 @@ func TestFindPVBForRestorePod(t *testing.T) { }{ { name: "find pvr for pod", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(), checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { // Assert that the function returns a single request @@ -1407,7 +1416,7 @@ func TestFindPVBForRestorePod(t *testing.T) { }, }, { name: "no selected label found for pod", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Result(), checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { // Assert that the function returns a single request @@ -1415,7 +1424,7 @@ func TestFindPVBForRestorePod(t *testing.T) { }, }, { name: "no matched pod", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: "non-existing-pvr"}).Result(), checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { assert.Empty(t, requests) @@ -1423,12 +1432,20 @@ func TestFindPVBForRestorePod(t *testing.T) { }, { name: "pvr not accept", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Result(), checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { assert.Empty(t, requests) }, }, + { + name: "invalid uploader type", + pvr: pvrBuilder().UploaderType("restic").Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(), + checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { + assert.Empty(t, requests) + }, + }, } for _, test := range tests { ctx := t.Context() @@ -1613,32 +1630,32 @@ func TestAttemptPVRResume(t *testing.T) { }{ { name: "Other pvr", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), }, { name: "Other pvr", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), }, { name: "InProgress pvr, not the current node", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), inProgressPvrs: []string{pvrName}, }, { name: "InProgress pvr, no resume error", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), inProgressPvrs: []string{pvrName}, }, { name: "InProgress pvr, resume error, cancel error", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), resumeErr: errors.New("fake-resume-error"), needErrs: []bool{false, false, true, false, false, false}, inProgressPvrs: []string{pvrName}, }, { name: "InProgress pvr, resume error, cancel succeed", - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(), resumeErr: errors.New("fake-resume-error"), cancelledPvrs: []string{pvrName}, inProgressPvrs: []string{pvrName}, @@ -1646,7 +1663,7 @@ func TestAttemptPVRResume(t *testing.T) { { name: "Error", needErrs: []bool{false, false, false, false, false, true}, - pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), + pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), expectedError: "error to list PVRs: List error", }, }