diff --git a/changelogs/unreleased/9014-Lyndon-Li b/changelogs/unreleased/9014-Lyndon-Li new file mode 100644 index 000000000..b36740f9d --- /dev/null +++ b/changelogs/unreleased/9014-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8959, add VGDP MS PVR controller \ No newline at end of file diff --git a/config/crd/v1/bases/velero.io_podvolumerestores.yaml b/config/crd/v1/bases/velero.io_podvolumerestores.yaml index 4c4b1b91d..09eda5b28 100644 --- a/config/crd/v1/bases/velero.io_podvolumerestores.yaml +++ b/config/crd/v1/bases/velero.io_podvolumerestores.yaml @@ -15,39 +15,40 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: - - description: Namespace of the pod containing the volume to be restored - jsonPath: .spec.pod.namespace - name: Namespace + - description: PodVolumeRestore status such as New/InProgress + jsonPath: .status.phase + name: Status type: string - - description: Name of the pod containing the volume to be restored - jsonPath: .spec.pod.name - name: Pod + - description: Time duration since this PodVolumeRestore was started + jsonPath: .status.startTimestamp + name: Started + type: date + - description: Completed bytes + format: int64 + jsonPath: .status.progress.bytesDone + name: Bytes Done + type: integer + - description: Total bytes + format: int64 + jsonPath: .status.progress.totalBytes + name: Total Bytes + type: integer + - description: Name of the Backup Storage Location where the backup data is stored + jsonPath: .spec.backupStorageLocation + name: Storage Location + type: string + - description: Time duration since this PodVolumeRestore was created + jsonPath: .metadata.creationTimestamp + name: Age + type: date + - description: Name of the node where the PodVolumeRestore is processed + jsonPath: .status.node + name: Node type: string - description: The type of the uploader to handle data transfer jsonPath: .spec.uploaderType name: Uploader Type type: string - - description: Name of the volume to be restored - jsonPath: .spec.volume - name: Volume - type: string - - description: Pod Volume Restore status such as New/InProgress - jsonPath: .status.phase - name: Status - type: string - - description: Pod Volume Restore status such as New/InProgress - format: int64 - jsonPath: .status.progress.totalBytes - name: TotalBytes - type: integer - - description: Pod Volume Restore status such as New/InProgress - format: int64 - jsonPath: .status.progress.bytesDone - name: BytesDone - type: integer - - jsonPath: .metadata.creationTimestamp - name: Age - type: date name: v1 schema: openAPIV3Schema: @@ -167,6 +168,13 @@ spec: status: description: PodVolumeRestoreStatus is the current status of a PodVolumeRestore. properties: + acceptedTimestamp: + description: |- + AcceptedTimestamp records the time the pod volume restore is to be prepared. + The server's time is used for AcceptedTimestamp + format: date-time + nullable: true + type: string completionTimestamp: description: |- CompletionTimestamp records the time a restore was completed. @@ -178,11 +186,19 @@ spec: message: description: Message is a message about the pod volume restore's status. type: string + node: + description: Node is name of the node where the pod volume restore + is processed. + type: string phase: description: Phase is the current state of the PodVolumeRestore. enum: - New + - Accepted + - Prepared - InProgress + - Canceling + - Canceled - Completed - Failed type: string diff --git a/pkg/apis/velero/v1/labels_annotations.go b/pkg/apis/velero/v1/labels_annotations.go index 9d075d2bd..ad24b97ba 100644 --- a/pkg/apis/velero/v1/labels_annotations.go +++ b/pkg/apis/velero/v1/labels_annotations.go @@ -107,6 +107,9 @@ const ( // PVBLabel is the label key used to identify the pvb for pvb pod PVBLabel = "velero.io/pod-volume-backup" + + // PVRLabel is the label key used to identify the pvb for pvr pod + PVRLabel = "velero.io/pod-volume-restore" ) type AsyncOperationIDPrefix string diff --git a/pkg/apis/velero/v1/pod_volume_restore_type.go b/pkg/apis/velero/v1/pod_volume_restore_type.go index d8871b708..2d059a14a 100644 --- a/pkg/apis/velero/v1/pod_volume_restore_type.go +++ b/pkg/apis/velero/v1/pod_volume_restore_type.go @@ -61,12 +61,16 @@ type PodVolumeRestoreSpec struct { } // PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore. -// +kubebuilder:validation:Enum=New;InProgress;Completed;Failed +// +kubebuilder:validation:Enum=New;Accepted;Prepared;InProgress;Canceling;Canceled;Completed;Failed type PodVolumeRestorePhase string const ( PodVolumeRestorePhaseNew PodVolumeRestorePhase = "New" + PodVolumeRestorePhaseAccepted PodVolumeRestorePhase = "Accepted" + PodVolumeRestorePhasePrepared PodVolumeRestorePhase = "Prepared" PodVolumeRestorePhaseInProgress PodVolumeRestorePhase = "InProgress" + PodVolumeRestorePhaseCanceling PodVolumeRestorePhase = "Canceling" + PodVolumeRestorePhaseCanceled PodVolumeRestorePhase = "Canceled" PodVolumeRestorePhaseCompleted PodVolumeRestorePhase = "Completed" PodVolumeRestorePhaseFailed PodVolumeRestorePhase = "Failed" ) @@ -99,6 +103,16 @@ type PodVolumeRestoreStatus struct { // about the restore operation. // +optional Progress shared.DataMoveOperationProgress `json:"progress,omitempty"` + + // AcceptedTimestamp records the time the pod volume restore is to be prepared. + // The server's time is used for AcceptedTimestamp + // +optional + // +nullable + AcceptedTimestamp *metav1.Time `json:"acceptedTimestamp,omitempty"` + + // Node is name of the node where the pod volume restore is processed. + // +optional + Node string `json:"node,omitempty"` } // TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed. @@ -107,14 +121,14 @@ type PodVolumeRestoreStatus struct { // +kubebuilder:object:generate=true // +kubebuilder:object:root=true // +kubebuilder:storageversion -// +kubebuilder:printcolumn:name="Namespace",type="string",JSONPath=".spec.pod.namespace",description="Namespace of the pod containing the volume to be restored" -// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".spec.pod.name",description="Name of the pod containing the volume to be restored" +// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="PodVolumeRestore status such as New/InProgress" +// +kubebuilder:printcolumn:name="Started",type="date",JSONPath=".status.startTimestamp",description="Time duration since this PodVolumeRestore was started" +// +kubebuilder:printcolumn:name="Bytes Done",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Completed bytes" +// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes" +// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where the backup data is stored" +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this PodVolumeRestore was created" +// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the PodVolumeRestore is processed" // +kubebuilder:printcolumn:name="Uploader Type",type="string",JSONPath=".spec.uploaderType",description="The type of the uploader to handle data transfer" -// +kubebuilder:printcolumn:name="Volume",type="string",JSONPath=".spec.volume",description="Name of the volume to be restored" -// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Pod Volume Restore status such as New/InProgress" -// +kubebuilder:printcolumn:name="TotalBytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Pod Volume Restore status such as New/InProgress" -// +kubebuilder:printcolumn:name="BytesDone",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Pod Volume Restore status such as New/InProgress" -// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" type PodVolumeRestore struct { metav1.TypeMeta `json:",inline"` diff --git a/pkg/apis/velero/v1/zz_generated.deepcopy.go b/pkg/apis/velero/v1/zz_generated.deepcopy.go index 1cf3aaff8..c998a1656 100644 --- a/pkg/apis/velero/v1/zz_generated.deepcopy.go +++ b/pkg/apis/velero/v1/zz_generated.deepcopy.go @@ -1153,6 +1153,10 @@ func (in *PodVolumeRestoreStatus) DeepCopyInto(out *PodVolumeRestoreStatus) { *out = (*in).DeepCopy() } out.Progress = in.Progress + if in.AcceptedTimestamp != nil { + in, out := &in.AcceptedTimestamp, &out.AcceptedTimestamp + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeRestoreStatus. diff --git a/pkg/builder/pod_builder.go b/pkg/builder/pod_builder.go index 50f8f5e51..c38acf66e 100644 --- a/pkg/builder/pod_builder.go +++ b/pkg/builder/pod_builder.go @@ -88,6 +88,11 @@ func (b *PodBuilder) InitContainers(containers ...*corev1api.Container) *PodBuil return b } +func (b *PodBuilder) InitContainerState(state corev1api.ContainerState) *PodBuilder { + b.object.Status.InitContainerStatuses = append(b.object.Status.InitContainerStatuses, corev1api.ContainerStatus{State: state}) + return b +} + func (b *PodBuilder) Containers(containers ...*corev1api.Container) *PodBuilder { for _, c := range containers { b.object.Spec.Containers = append(b.object.Spec.Containers, *c) diff --git a/pkg/builder/pod_volume_restore_builder.go b/pkg/builder/pod_volume_restore_builder.go index 47acd3cd0..5f54b3d33 100644 --- a/pkg/builder/pod_volume_restore_builder.go +++ b/pkg/builder/pod_volume_restore_builder.go @@ -103,3 +103,33 @@ func (b *PodVolumeRestoreBuilder) OwnerReference(ownerRef []metav1.OwnerReferenc b.object.OwnerReferences = ownerRef return b } + +// Cancel sets the DataDownload's Cancel. +func (b *PodVolumeRestoreBuilder) Cancel(cancel bool) *PodVolumeRestoreBuilder { + b.object.Spec.Cancel = cancel + return b +} + +// AcceptedTimestamp sets the PodVolumeRestore's AcceptedTimestamp. +func (b *PodVolumeRestoreBuilder) AcceptedTimestamp(acceptedTimestamp *metav1.Time) *PodVolumeRestoreBuilder { + b.object.Status.AcceptedTimestamp = acceptedTimestamp + return b +} + +// Finalizers sets the PodVolumeRestore's Finalizers. +func (b *PodVolumeRestoreBuilder) Finalizers(finalizers []string) *PodVolumeRestoreBuilder { + b.object.Finalizers = finalizers + return b +} + +// Message sets the PodVolumeRestore's Message. +func (b *PodVolumeRestoreBuilder) Message(msg string) *PodVolumeRestoreBuilder { + b.object.Status.Message = msg + return b +} + +// Message sets the PodVolumeRestore's Node. +func (b *PodVolumeRestoreBuilder) Node(node string) *PodVolumeRestoreBuilder { + b.object.Status.Node = node + return b +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 4044b8897..7bf632807 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -48,7 +48,6 @@ import ( snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/buildinfo" @@ -60,7 +59,6 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" - "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/util/logging" @@ -282,28 +280,6 @@ func (s *nodeAgentServer) run() { s.logger.Info("Starting controllers") - credentialFileStore, err := credentials.NewNamespacedFileStore( - s.mgr.GetClient(), - s.namespace, - credentials.DefaultStoreDirectory(), - filesystem.NewFileSystem(), - ) - if err != nil { - s.logger.Fatalf("Failed to create credentials file store: %v", err) - } - - credSecretStore, err := credentials.NewNamespacedSecretStore(s.mgr.GetClient(), s.namespace) - if err != nil { - s.logger.Fatalf("Failed to create secret file store: %v", err) - } - - credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} - repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - - if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { - s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") - } - var loadAffinity *kube.LoadAffinity if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] @@ -332,6 +308,10 @@ func (s *nodeAgentServer) run() { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup) } + if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger).SetupWithManager(s.mgr); err != nil { + s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") + } + dataUploadReconciler := controller.NewDataUploadReconciler( s.mgr.GetClient(), s.mgr, @@ -525,7 +505,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { continue } - if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], + if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], errors.New("cannot survive from node-agent restart"), fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed), time.Now(), s.logger); err != nil { s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName()) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index e65d1b606..41362871a 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -19,8 +19,7 @@ package controller import ( "context" "fmt" - "os" - "path/filepath" + "strings" "time" "github.com/pkg/errors" @@ -30,49 +29,63 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/constant" "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" - "github.com/vmware-tanzu/velero/pkg/podvolume" - "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/restorehelper" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/boolptr" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) -func NewPodVolumeRestoreReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, - credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { +func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, + nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements, + logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ - Client: client, - kubeClient: kubeClient, - logger: logger.WithField("controller", "PodVolumeRestore"), - repositoryEnsurer: ensurer, - credentialGetter: credentialGetter, - fileSystem: filesystem.NewFileSystem(), - clock: &clocks.RealClock{}, - dataPathMgr: dataPathMgr, + client: client, + mgr: mgr, + kubeClient: kubeClient, + logger: logger.WithField("controller", "PodVolumeRestore"), + nodeName: nodeName, + clock: &clocks.RealClock{}, + podResources: podResources, + dataPathMgr: dataPathMgr, + preparingTimeout: preparingTimeout, + resourceTimeout: resourceTimeout, + exposer: exposer.NewPodVolumeExposer(kubeClient, logger), + cancelledPVR: make(map[string]time.Time), } } type PodVolumeRestoreReconciler struct { - client.Client - kubeClient kubernetes.Interface - logger logrus.FieldLogger - repositoryEnsurer *repository.Ensurer - credentialGetter *credentials.CredentialGetter - fileSystem filesystem.Interface - clock clocks.WithTickerAndDelayedExecution - dataPathMgr *datapath.Manager + client client.Client + mgr manager.Manager + kubeClient kubernetes.Interface + logger logrus.FieldLogger + nodeName string + clock clocks.WithTickerAndDelayedExecution + podResources corev1api.ResourceRequirements + exposer exposer.PodVolumeExposer + dataPathMgr *datapath.Manager + preparingTimeout time.Duration + resourceTimeout time.Duration + cancelledPVR map[string]time.Time } // +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete @@ -81,123 +94,429 @@ type PodVolumeRestoreReconciler struct { // +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get // +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get -func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := c.logger.WithField("PodVolumeRestore", req.NamespacedName.String()) +func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.logger.WithField("PodVolumeRestore", req.NamespacedName.String()) + log.Info("Reconciling PVR by advanced controller") pvr := &velerov1api.PodVolumeRestore{} - if err := c.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil { + if err := r.client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil { if apierrors.IsNotFound(err) { - log.Warn("PodVolumeRestore not found, skip") + log.Warn("PVR not found, skip") return ctrl.Result{}, nil } - log.WithError(err).Error("Unable to get the PodVolumeRestore") + log.WithError(err).Error("Unable to get the PVR") return ctrl.Result{}, err } + log = log.WithField("pod", fmt.Sprintf("%s/%s", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)) if len(pvr.OwnerReferences) == 1 { log = log.WithField("restore", fmt.Sprintf("%s/%s", pvr.Namespace, pvr.OwnerReferences[0].Name)) } - shouldProcess, pod, err := c.shouldProcess(ctx, log, pvr) - if err != nil { - return ctrl.Result{}, err - } - if !shouldProcess { - return ctrl.Result{}, nil - } + // Logic for clear resources when pvr been deleted + if !isPVRInFinalState(pvr) { + if !controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { + if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { + return false + } - initContainerIndex := getInitContainerIndex(pod) - if initContainerIndex > 0 { - log.Warnf(`Init containers before the %s container may cause issues - if they interfere with volumes being restored: %s index %d`, restorehelper.WaitInitContainer, restorehelper.WaitInitContainer, initContainerIndex) - } + controllerutil.AddFinalizer(pvr, PodVolumeFinalizer) - log.Info("Restore starting") + return true + }); err != nil { + log.WithError(err).Errorf("failed to add finalizer for PVR %s/%s", pvr.Namespace, pvr.Name) + return ctrl.Result{}, err + } - callbacks := datapath.Callbacks{ - OnCompleted: c.OnDataPathCompleted, - OnFailed: c.OnDataPathFailed, - OnCancelled: c.OnDataPathCancelled, - OnProgress: c.OnDataPathProgress, - } + return ctrl.Result{}, nil + } - fsRestore, err := c.dataPathMgr.CreateFileSystemBR(pvr.Name, pVBRRequestor, ctx, c.Client, pvr.Namespace, callbacks, log) - if err != nil { - if err == datapath.ConcurrentLimitExceed { - return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil - } else { - return c.errorOut(ctx, pvr, err, "error to create data path", log) + if !pvr.DeletionTimestamp.IsZero() { + if !pvr.Spec.Cancel { + log.Warnf("Cancel PVR under phase %s because it is being deleted", pvr.Status.Phase) + + if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if pvr.Spec.Cancel { + return false + } + + pvr.Spec.Cancel = true + pvr.Status.Message = "Cancel PVR because it is being deleted" + + return true + }); err != nil { + log.WithError(err).Errorf("failed to set cancel flag for PVR %s/%s", pvr.Namespace, pvr.Name) + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil + } + } + } else { + delete(r.cancelledPVR, pvr.Name) + + if controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { + if err := UpdatePVRWithRetry(ctx, r.client, req.NamespacedName, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if !controllerutil.ContainsFinalizer(pvr, PodVolumeFinalizer) { + return false + } + + controllerutil.RemoveFinalizer(pvr, PodVolumeFinalizer) + + return true + }); err != nil { + log.WithError(err).Error("error to remove finalizer") + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil } } - original := pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress - pvr.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()} - if err = c.Patch(ctx, pvr, client.MergeFrom(original)); err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error to update status to in progress", log) + if pvr.Spec.Cancel { + if spotted, found := r.cancelledPVR[pvr.Name]; !found { + r.cancelledPVR[pvr.Name] = r.clock.Now() + } else { + delay := cancelDelayOthers + if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { + delay = cancelDelayInProgress + } + + if time.Since(spotted) > delay { + log.Infof("PVR %s is canceled in Phase %s but not handled in rasonable time", pvr.GetName(), pvr.Status.Phase) + if r.tryCancelPodVolumeRestore(ctx, pvr, "") { + delete(r.cancelledPVR, pvr.Name) + } + + return ctrl.Result{}, nil + } + } } - volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log) - if err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log) + if pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew { + if pvr.Spec.Cancel { + log.Infof("PVR %s is canceled in Phase %s", pvr.GetName(), pvr.Status.Phase) + _ = r.tryCancelPodVolumeRestore(ctx, pvr, "") + + return ctrl.Result{}, nil + } + + shouldProcess, pod, err := shouldProcess(ctx, r.client, log, pvr) + if err != nil { + return ctrl.Result{}, err + } + if !shouldProcess { + return ctrl.Result{}, nil + } + + log.Info("Accepting PVR") + + if err := r.acceptPodVolumeRestore(ctx, pvr); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "error accepting PVR %s", pvr.Name) + } + + initContainerIndex := getInitContainerIndex(pod) + if initContainerIndex > 0 { + log.Warnf(`Init containers before the %s container may cause issues + if they interfere with volumes being restored: %s index %d`, restorehelper.WaitInitContainer, restorehelper.WaitInitContainer, initContainerIndex) + } + + log.Info("Exposing PVR") + + exposeParam := r.setupExposeParam(pvr) + if err := r.exposer.Expose(ctx, getPVROwnerObject(pvr), exposeParam); err != nil { + return r.errorOut(ctx, pvr, err, "error to expose PVR", log) + } + + log.Info("PVR is exposed") + + return ctrl.Result{}, nil + } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted { + if peekErr := r.exposer.PeekExposed(ctx, getPVROwnerObject(pvr)); peekErr != nil { + log.Errorf("Cancel PVR %s/%s because of expose error %s", pvr.Namespace, pvr.Name, peekErr) + _ = r.tryCancelPodVolumeRestore(ctx, pvr, fmt.Sprintf("found a PVR %s/%s with expose error: %s. mark it as cancel", pvr.Namespace, pvr.Name, peekErr)) + } else if pvr.Status.AcceptedTimestamp != nil { + if time.Since(pvr.Status.AcceptedTimestamp.Time) >= r.preparingTimeout { + r.onPrepareTimeout(ctx, pvr) + } + } + + return ctrl.Result{}, nil + } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared { + log.Infof("PVR is prepared and should be processed by %s (%s)", pvr.Status.Node, r.nodeName) + + if pvr.Status.Node != r.nodeName { + return ctrl.Result{}, nil + } + + if pvr.Spec.Cancel { + log.Info("Prepared PVR is being canceled") + r.OnDataPathCancelled(ctx, pvr.GetNamespace(), pvr.GetName()) + return ctrl.Result{}, nil + } + + asyncBR := r.dataPathMgr.GetAsyncBR(pvr.Name) + if asyncBR != nil { + log.Info("Cancellable data path is already started") + return ctrl.Result{}, nil + } + + res, err := r.exposer.GetExposed(ctx, getPVROwnerObject(pvr), r.client, r.nodeName, r.resourceTimeout) + if err != nil { + return r.errorOut(ctx, pvr, err, "exposed PVR is not ready", log) + } else if res == nil { + return r.errorOut(ctx, pvr, errors.New("no expose result is available for the current node"), "exposed PVR is not ready", log) + } + + log.Info("Exposed PVR is ready and creating data path routine") + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataPathCompleted, + OnFailed: r.OnDataPathFailed, + OnCancelled: r.OnDataPathCancelled, + OnProgress: r.OnDataPathProgress, + } + + asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, + pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, false, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + log.Info("Data path instance is concurrent limited requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } else { + return r.errorOut(ctx, pvr, err, "error to create data path", log) + } + } + + if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { + log.WithError(err).Errorf("Failed to init cancelable data path for %s", pvr.Name) + + r.closeDataPath(ctx, pvr.Name) + return r.errorOut(ctx, pvr, err, "error initializing data path", log) + } + + terminated := false + if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if isPVRInFinalState(pvr) { + terminated = true + return false + } + + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress + pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + + return true + }); err != nil { + log.WithError(err).Warnf("Failed to update PVR %s to InProgress, will data path close and retry", pvr.Name) + + r.closeDataPath(ctx, pvr.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } + + if terminated { + log.Warnf("PVR %s is terminated during transition from prepared", pvr.Name) + r.closeDataPath(ctx, pvr.Name) + return ctrl.Result{}, nil + } + + log.Info("PVR is marked as in progress") + + if err := r.startCancelableDataPath(asyncBR, pvr, res, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", pvr.Name) + r.closeDataPath(ctx, pvr.Name) + + return r.errorOut(ctx, pvr, err, "error starting data path", log) + } + + return ctrl.Result{}, nil + } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { + if pvr.Spec.Cancel { + if pvr.Status.Node != r.nodeName { + return ctrl.Result{}, nil + } + + log.Info("PVR is being canceled") + + asyncBR := r.dataPathMgr.GetAsyncBR(pvr.Name) + if asyncBR == nil { + r.OnDataPathCancelled(ctx, pvr.GetNamespace(), pvr.GetName()) + return ctrl.Result{}, nil + } + + // Update status to Canceling + if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if isPVRInFinalState(pvr) { + log.Warnf("PVR %s is terminated, abort setting it to canceling", pvr.Name) + return false + } + + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceling + return true + }); err != nil { + log.WithError(err).Error("error updating PVR into canceling status") + return ctrl.Result{}, err + } + + asyncBR.Cancel() + return ctrl.Result{}, nil + } + return ctrl.Result{}, nil } - log.WithField("path", volumePath.ByPath).Debugf("Found host path") - - if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{ - BSLName: pvr.Spec.BackupStorageLocation, - SourceNamespace: pvr.Spec.SourceNamespace, - UploaderType: pvr.Spec.UploaderType, - RepositoryType: podvolume.GetPvrRepositoryType(pvr), - RepoIdentifier: pvr.Spec.RepoIdentifier, - RepositoryEnsurer: c.repositoryEnsurer, - CredentialGetter: c.credentialGetter, - }); err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error to initialize data path", log) - } - - if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, volumePath, pvr.Spec.UploaderSettings); err != nil { - c.closeDataPath(ctx, pvr.Name) - return c.errorOut(ctx, pvr, err, "error starting data path restore", log) - } - - log.WithField("path", volumePath.ByPath).Info("Async fs restore data path started") - return ctrl.Result{}, nil } -func (c *PodVolumeRestoreReconciler) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { - _ = UpdatePVRStatusToFailed(ctx, c.Client, pvr, errors.WithMessage(err, msg).Error(), c.clock.Now(), log) - return ctrl.Result{}, err +func (r *PodVolumeRestoreReconciler) acceptPodVolumeRestore(ctx context.Context, pvr *velerov1api.PodVolumeRestore) error { + return UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, r.logger, func(pvr *velerov1api.PodVolumeRestore) bool { + pvr.Status.AcceptedTimestamp = &metav1.Time{Time: r.clock.Now()} + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseAccepted + pvr.Status.Node = r.nodeName + + return true + }) } -func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeRestore, errString string, time time.Time, log logrus.FieldLogger) error { - original := pvb.DeepCopy() - pvb.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed - pvb.Status.Message = errString - pvb.Status.CompletionTimestamp = &metav1.Time{Time: time} +func (r *PodVolumeRestoreReconciler) tryCancelPodVolumeRestore(ctx context.Context, pvr *velerov1api.PodVolumeRestore, message string) bool { + log := r.logger.WithField("PVR", pvr.Name) + succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) { + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceled + if pvr.Status.StartTimestamp.IsZero() { + pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + } + pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + + if message != "" { + pvr.Status.Message = message + } + }) - err := c.Patch(ctx, pvb, client.MergeFrom(original)) if err != nil { - log.WithError(err).Error("error updating PodVolumeRestore status") + log.WithError(err).Error("error updating PVR status") + return false + } else if !succeeded { + log.Warn("conflict in updating PVR status and will try it again later") + return false + } + + r.exposer.CleanUp(ctx, getPVROwnerObject(pvr)) + + log.Warn("PVR is canceled") + + return true +} + +var funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore + +func exclusiveUpdatePodVolumeRestore(ctx context.Context, cli client.Client, pvr *velerov1api.PodVolumeRestore, + updateFunc func(*velerov1api.PodVolumeRestore)) (bool, error) { + updateFunc(pvr) + + err := cli.Update(ctx, pvr) + if err == nil { + return true, nil + } + + // warn we won't rollback pvr values in memory when error + if apierrors.IsConflict(err) { + return false, nil + } else { + return false, err + } +} + +func (r *PodVolumeRestoreReconciler) onPrepareTimeout(ctx context.Context, pvr *velerov1api.PodVolumeRestore) { + log := r.logger.WithField("PVR", pvr.Name) + + log.Info("Timeout happened for preparing PVR") + + succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) { + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed + pvr.Status.Message = "timeout on preparing PVR" + }) + + if err != nil { + log.WithError(err).Warn("Failed to update PVR") + return + } + + if !succeeded { + log.Warn("PVR has been updated by others") + return + } + + diags := strings.Split(r.exposer.DiagnoseExpose(ctx, getPVROwnerObject(pvr)), "\n") + for _, diag := range diags { + log.Warnf("[Diagnose PVR expose]%s", diag) + } + + r.exposer.CleanUp(ctx, getPVROwnerObject(pvr)) + + log.Info("PVR has been cleaned up") +} + +func (r *PodVolumeRestoreReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable PVR") + + if err := asyncBR.Init(ctx, nil); err != nil { + return errors.Wrap(err, "error initializing asyncBR") + } + + log.Infof("async data path init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + + return nil +} + +func (r *PodVolumeRestoreReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, pvr *velerov1api.PodVolumeRestore, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable PVR") + + if err := asyncBR.StartRestore(pvr.Spec.SnapshotID, datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, pvr.Spec.UploaderSettings); err != nil { + return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + } + + log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *PodVolumeRestoreReconciler) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { + r.exposer.CleanUp(ctx, getPVROwnerObject(pvr)) + + return ctrl.Result{}, UpdatePVRStatusToFailed(ctx, r.client, pvr, err, msg, r.clock.Now(), log) +} + +func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1api.PodVolumeRestore, err error, msg string, time time.Time, log logrus.FieldLogger) error { + log.Info("update PVR status to Failed") + + if patchErr := UpdatePVRWithRetry(context.Background(), c, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, + func(pvr *velerov1api.PodVolumeRestore) bool { + if isPVRInFinalState(pvr) { + return false + } + + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed + pvr.Status.Message = errors.WithMessage(err, msg).Error() + pvr.Status.CompletionTimestamp = &metav1.Time{Time: time} + + return true + }); patchErr != nil { + log.WithError(patchErr).Warn("error updating PVR status") } return err } -func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) { +func shouldProcess(ctx context.Context, client client.Client, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) { if !isPVRNew(pvr) { - log.Debug("PodVolumeRestore is not new, skip") + log.Debug("PVR is not new, skip") return false, nil, nil } // we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller // so we don't need to compare the node anymore pod := &corev1api.Pod{} - if err := c.Get(ctx, types.NamespacedName{Namespace: pvr.Spec.Pod.Namespace, Name: pvr.Spec.Pod.Name}, pod); err != nil { + if err := client.Get(ctx, types.NamespacedName{Namespace: pvr.Spec.Pod.Namespace, Name: pvr.Spec.Pod.Name}, pod); err != nil { if apierrors.IsNotFound(err) { log.WithError(err).Debug("Pod not found on this node, skip") return false, nil, nil @@ -214,39 +533,175 @@ func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logr return true, pod, nil } -func (c *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { - // The pod may not being scheduled at the point when its PVRs are initially reconciled. - // By watching the pods, we can trigger the PVR reconciliation again once the pod is finally scheduled on the node. - return ctrl.NewControllerManagedBy(mgr). - For(&velerov1api.PodVolumeRestore{}). - Watches(&corev1api.Pod{}, handler.EnqueueRequestsFromMapFunc(c.findVolumeRestoresForPod)). - Complete(c) +func (r *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvrName string) { + asyncBR := r.dataPathMgr.GetAsyncBR(pvrName) + if asyncBR != nil { + asyncBR.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(pvrName) } -func (c *PodVolumeRestoreReconciler) findVolumeRestoresForPod(ctx context.Context, pod client.Object) []reconcile.Request { +func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { + gp := kube.NewGenericEventPredicate(func(object client.Object) bool { + pvr := object.(*velerov1api.PodVolumeRestore) + if isLegacyPVR(pvr) { + return false + } + + if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted { + return true + } + + if pvr.Spec.Cancel && !isPVRInFinalState(pvr) { + return true + } + + if isPVRInFinalState(pvr) && !pvr.DeletionTimestamp.IsZero() { + return true + } + + return false + }) + + s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerPodVolumeRestore), r.client, &velerov1api.PodVolumeRestoreList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{ + Predicates: []predicate.Predicate{gp}, + }) + + pred := kube.NewAllEventPredicate(func(obj client.Object) bool { + pvr := obj.(*velerov1api.PodVolumeRestore) + return !isLegacyPVR(pvr) + }) + + return ctrl.NewControllerManagedBy(mgr). + For(&velerov1api.PodVolumeRestore{}, builder.WithPredicates(pred)). + WatchesRawSource(s). + Watches(&corev1api.Pod{}, handler.EnqueueRequestsFromMapFunc(r.findPVRForTargetPod)). + Watches(&corev1api.Pod{}, kube.EnqueueRequestsFromMapUpdateFunc(r.findPVRForRestorePod), + builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + newObj := ue.ObjectNew.(*corev1api.Pod) + + if _, ok := newObj.Labels[velerov1api.PVRLabel]; !ok { + return false + } + + if newObj.Spec.NodeName == "" { + return false + } + + return true + }, + CreateFunc: func(event.CreateEvent) bool { + return false + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + })). + Complete(r) +} + +func (r *PodVolumeRestoreReconciler) findPVRForTargetPod(ctx context.Context, pod client.Object) []reconcile.Request { list := &velerov1api.PodVolumeRestoreList{} options := &client.ListOptions{ LabelSelector: labels.Set(map[string]string{ velerov1api.PodUIDLabel: string(pod.GetUID()), }).AsSelector(), } - if err := c.List(context.TODO(), list, options); err != nil { - c.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err). + if err := r.client.List(context.TODO(), list, options); err != nil { + r.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err). Error("unable to list PodVolumeRestores") return []reconcile.Request{} } - requests := make([]reconcile.Request, len(list.Items)) - for i, item := range list.Items { - requests[i] = reconcile.Request{ + + requests := []reconcile.Request{} + for _, item := range list.Items { + if isLegacyPVR(&item) { + continue + } + + requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: item.GetNamespace(), Name: item.GetName(), }, - } + }) } return requests } +func (r *PodVolumeRestoreReconciler) findPVRForRestorePod(ctx context.Context, podObj client.Object) []reconcile.Request { + pod := podObj.(*corev1api.Pod) + pvr, err := findPVRByRestorePod(r.client, *pod) + + log := r.logger.WithField("pod", pod.Name) + if err != nil { + log.WithError(err).Error("unable to get PVR") + return []reconcile.Request{} + } else if pvr == nil { + log.Error("get empty PVR") + return []reconcile.Request{} + } + log = log.WithFields(logrus.Fields{ + "PVR": pvr.Name, + }) + + if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseAccepted { + return []reconcile.Request{} + } + + if pod.Status.Phase == corev1api.PodRunning { + log.Info("Preparing PVR") + + if err = UpdatePVRWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, + func(pvr *velerov1api.PodVolumeRestore) bool { + if isPVRInFinalState(pvr) { + log.Warnf("PVR %s is terminated, abort setting it to prepared", pvr.Name) + return false + } + + pvr.Status.Phase = velerov1api.PodVolumeRestorePhasePrepared + return true + }); err != nil { + log.WithError(err).Warn("failed to update PVR, prepare will halt for this PVR") + return []reconcile.Request{} + } + } else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { + err := UpdatePVRWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, + func(pvr *velerov1api.PodVolumeRestore) bool { + if pvr.Spec.Cancel { + return false + } + + pvr.Spec.Cancel = true + pvr.Status.Message = fmt.Sprintf("Cancel PVR because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason) + + return true + }) + + if err != nil { + log.WithError(err).Warn("failed to cancel PVR, and it will wait for prepare timeout") + return []reconcile.Request{} + } + + log.Infof("Exposed pod is in abnormal status(reason %s) and PVR is marked as cancel", reason) + } else { + return []reconcile.Request{} + } + + request := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: pvr.Namespace, + Name: pvr.Name, + }, + } + return []reconcile.Request{request} +} + func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool { return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew } @@ -270,118 +725,186 @@ func getInitContainerIndex(pod *corev1api.Pod) int { return -1 } -func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { - defer c.dataPathMgr.RemoveAsyncBR(pvrName) +func (r *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { + defer r.dataPathMgr.RemoveAsyncBR(pvrName) - log := c.logger.WithField("pvr", pvrName) + log := r.logger.WithField("PVR", pvrName) - log.WithField("PVR", pvrName).Info("Async fs restore data path completed") + log.WithField("PVR", pvrName).WithField("result", result.Restore).Info("Async fs restore data path completed") var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { + if err := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { log.WithError(err).Warn("Failed to get PVR on completion") return } - volumePath := result.Restore.Target.ByPath - if volumePath == "" { - _, _ = c.errorOut(ctx, &pvr, errors.New("path is empty"), "invalid restore target", log) - return - } + log.Info("Cleaning up exposed environment") + r.exposer.CleanUp(ctx, getPVROwnerObject(&pvr)) - // Remove the .velero directory from the restored volume (it may contain done files from previous restores - // of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since - // this is non-essential cleanup (the done files are named based on restore UID and the init container looks - // for the one specific to the restore being executed). - if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil { - log.WithError(err).Warnf("error removing .velero directory from directory %s", volumePath) - } - - var restoreUID types.UID - for _, owner := range pvr.OwnerReferences { - if boolptr.IsSetToTrue(owner.Controller) { - restoreUID = owner.UID - break + if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if isPVRInFinalState(pvr) { + return false } - } - // Create the .velero directory within the volume dir so we can write a done file - // for this restore. - if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil { - _, _ = c.errorOut(ctx, &pvr, err, "error creating .velero directory for done file", log) - return - } + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted + pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} - // Write a done file with name= into the just-created .velero dir - // within the volume. The velero init container on the pod is waiting - // for this file to exist in each restored volume before completing. - if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check. - _, _ = c.errorOut(ctx, &pvr, err, "error writing done file", log) - return + return true + }); err != nil { + log.WithError(err).Error("error updating PVR status") + } else { + log.Info("Restore completed") } - - original := pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted - pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} - if err := c.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating PodVolumeRestore status") - } - - log.Info("Restore completed") } -func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { - defer c.dataPathMgr.RemoveAsyncBR(pvrName) +func (r *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { + defer r.dataPathMgr.RemoveAsyncBR(pvrName) - log := c.logger.WithField("pvr", pvrName) + log := r.logger.WithField("PVR", pvrName) log.WithError(err).Error("Async fs restore data path failed") var pvr velerov1api.PodVolumeRestore - if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { + if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { log.WithError(getErr).Warn("Failed to get PVR on failure") } else { - _, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log) + _, _ = r.errorOut(ctx, &pvr, err, "data path restore failed", log) } } -func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { - defer c.dataPathMgr.RemoveAsyncBR(pvrName) +func (r *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { + defer r.dataPathMgr.RemoveAsyncBR(pvrName) - log := c.logger.WithField("pvr", pvrName) + log := r.logger.WithField("PVR", pvrName) log.Warn("Async fs restore data path canceled") var pvr velerov1api.PodVolumeRestore - if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { + if getErr := r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { log.WithError(getErr).Warn("Failed to get PVR on cancel") + return + } + // cleans up any objects generated during the snapshot expose + r.exposer.CleanUp(ctx, getPVROwnerObject(&pvr)) + + if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, log, func(pvr *velerov1api.PodVolumeRestore) bool { + if isPVRInFinalState(pvr) { + return false + } + + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCanceled + if pvr.Status.StartTimestamp.IsZero() { + pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + } + pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + + return true + }); err != nil { + log.WithError(err).Error("error updating PVR status on cancel") } else { - _, _ = c.errorOut(ctx, &pvr, errors.New("PVR is canceled"), "data path restore canceled", log) + delete(r.cancelledPVR, pvr.Name) } } -func (c *PodVolumeRestoreReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) { - log := c.logger.WithField("pvr", pvrName) +func (r *PodVolumeRestoreReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) { + log := r.logger.WithField("PVR", pvrName) - var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVB on progress") - return - } - - original := pvr.DeepCopy() - pvr.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} - - if err := c.Client.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil { + if err := UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: pvrName}, log, func(pvr *velerov1api.PodVolumeRestore) bool { + pvr.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + return true + }); err != nil { log.WithError(err).Error("Failed to update progress") } } -func (c *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvbName string) { - fsRestore := c.dataPathMgr.GetAsyncBR(pvbName) - if fsRestore != nil { - fsRestore.Close(ctx) +func (r *PodVolumeRestoreReconciler) setupExposeParam(pvr *velerov1api.PodVolumeRestore) exposer.PodVolumeExposeParam { + log := r.logger.WithField("PVR", pvr.Name) + + hostingPodLabels := map[string]string{velerov1api.PVRLabel: pvr.Name} + for _, k := range util.ThirdPartyLabels { + if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, pvr.Namespace, k, ""); err != nil { + if err != nodeagent.ErrNodeAgentLabelNotFound { + log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k) + } + } else { + hostingPodLabels[k] = v + } } - c.dataPathMgr.RemoveAsyncBR(pvbName) + hostingPodAnnotation := map[string]string{} + for _, k := range util.ThirdPartyAnnotations { + if v, err := nodeagent.GetAnnotationValue(context.Background(), r.kubeClient, pvr.Namespace, k, ""); err != nil { + if err != nodeagent.ErrNodeAgentAnnotationNotFound { + log.WithError(err).Warnf("Failed to check node-agent annotation, skip adding host pod annotation %s", k) + } + } else { + hostingPodAnnotation[k] = v + } + } + + return exposer.PodVolumeExposeParam{ + Type: exposer.PodVolumeExposeTypeRestore, + ClientNamespace: pvr.Spec.Pod.Namespace, + ClientPodName: pvr.Spec.Pod.Name, + ClientPodVolume: pvr.Spec.Volume, + HostingPodLabels: hostingPodLabels, + HostingPodAnnotations: hostingPodAnnotation, + OperationTimeout: r.resourceTimeout, + Resources: r.podResources, + } +} + +func getPVROwnerObject(pvr *velerov1api.PodVolumeRestore) corev1api.ObjectReference { + return corev1api.ObjectReference{ + Kind: pvr.Kind, + Namespace: pvr.Namespace, + Name: pvr.Name, + UID: pvr.UID, + APIVersion: pvr.APIVersion, + } +} + +func findPVRByRestorePod(client client.Client, pod corev1api.Pod) (*velerov1api.PodVolumeRestore, error) { + if label, exist := pod.Labels[velerov1api.PVRLabel]; exist { + pvr := &velerov1api.PodVolumeRestore{} + err := client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: label, + }, pvr) + + if err != nil { + return nil, errors.Wrapf(err, "error to find PVR by pod %s/%s", pod.Namespace, pod.Name) + } + return pvr, nil + } + return nil, nil +} + +func isPVRInFinalState(pvr *velerov1api.PodVolumeRestore) bool { + return pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || + pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled || + pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted +} + +func UpdatePVRWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov1api.PodVolumeRestore) bool) error { + return wait.PollUntilContextCancel(ctx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) { + pvr := &velerov1api.PodVolumeRestore{} + if err := client.Get(ctx, namespacedName, pvr); err != nil { + return false, errors.Wrap(err, "getting PVR") + } + + if updateFunc(pvr) { + err := client.Update(ctx, pvr) + if err != nil { + if apierrors.IsConflict(err) { + log.Warnf("failed to update PVR for %s/%s and will retry it", pvr.Namespace, pvr.Name) + return false, nil + } else { + return false, errors.Wrapf(err, "error updating PVR %s/%s", pvr.Namespace, pvr.Name) + } + } + } + + return true, nil + }) } diff --git a/pkg/controller/pod_volume_restore_controller_legacy.go b/pkg/controller/pod_volume_restore_controller_legacy.go new file mode 100644 index 000000000..03c8b4c56 --- /dev/null +++ b/pkg/controller/pod_volume_restore_controller_legacy.go @@ -0,0 +1,10 @@ +package controller + +import ( + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/uploader" +) + +func isLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool { + return pvr.Spec.UploaderType == uploader.ResticType +} diff --git a/pkg/controller/pod_volume_restore_controller_test.go b/pkg/controller/pod_volume_restore_controller_test.go index f24aed653..81800b805 100644 --- a/pkg/controller/pod_volume_restore_controller_test.go +++ b/pkg/controller/pod_volume_restore_controller_test.go @@ -18,21 +18,43 @@ package controller import ( "context" + "fmt" "testing" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + appsv1api "k8s.io/api/apps/v1" corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + clientgofake "k8s.io/client-go/kubernetes/fake" clocks "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" + "github.com/vmware-tanzu/velero/pkg/exposer" + exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks" "github.com/vmware-tanzu/velero/pkg/restorehelper" "github.com/vmware-tanzu/velero/pkg/test" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) func TestShouldProcess(t *testing.T) { @@ -195,11 +217,11 @@ func TestShouldProcess(t *testing.T) { c := &PodVolumeRestoreReconciler{ logger: logrus.New(), - Client: cli, + client: cli, clock: &clocks.RealClock{}, } - shouldProcess, _, _ := c.shouldProcess(ctx, c.logger, ts.obj) + shouldProcess, _, _ := shouldProcess(ctx, c.client, c.logger, ts.obj) require.Equal(t, ts.shouldProcessed, shouldProcess) }) } @@ -478,7 +500,7 @@ func TestGetInitContainerIndex(t *testing.T) { } } -func TestFindVolumeRestoresForPod(t *testing.T) { +func TestFindPVRForTargetPod(t *testing.T) { pod := &corev1api.Pod{} pod.UID = "uid" @@ -488,14 +510,14 @@ func TestFindVolumeRestoresForPod(t *testing.T) { // no matching PVR reconciler := &PodVolumeRestoreReconciler{ - Client: clientBuilder.Build(), + client: clientBuilder.Build(), logger: logrus.New(), } - requests := reconciler.findVolumeRestoresForPod(context.Background(), pod) + requests := reconciler.findPVRForTargetPod(context.Background(), pod) assert.Empty(t, requests) // contain one matching PVR - reconciler.Client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{ + reconciler.client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{ Items: []velerov1api.PodVolumeRestore{ { ObjectMeta: metav1.ObjectMeta{ @@ -515,6 +537,903 @@ func TestFindVolumeRestoresForPod(t *testing.T) { }, }, }).Build() - requests = reconciler.findVolumeRestoresForPod(context.Background(), pod) + requests = reconciler.findPVRForTargetPod(context.Background(), pod) assert.Len(t, requests, 1) } + +const pvrName string = "pvr-1" + +func pvrBuilder() *builder.PodVolumeRestoreBuilder { + return builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName). + BackupStorageLocation("bsl-loc"). + SnapshotID("test-snapshot-id") +} + +func initPodVolumeRestoreReconciler(objects []runtime.Object, cliObj []client.Object, needError ...bool) (*PodVolumeRestoreReconciler, error) { + var errs = make([]error, 6) + for k, isError := range needError { + if k == 0 && isError { + errs[0] = fmt.Errorf("Get error") + } else if k == 1 && isError { + errs[1] = fmt.Errorf("Create error") + } else if k == 2 && isError { + errs[2] = fmt.Errorf("Update error") + } else if k == 3 && isError { + errs[3] = fmt.Errorf("Patch error") + } else if k == 4 && isError { + errs[4] = apierrors.NewConflict(velerov1api.Resource("podvolumerestore"), pvrName, errors.New("conflict")) + } else if k == 5 && isError { + errs[5] = fmt.Errorf("List error") + } + } + return initPodVolumeRestoreReconcilerWithError(objects, cliObj, errs...) +} + +func initPodVolumeRestoreReconcilerWithError(objects []runtime.Object, cliObj []client.Object, needError ...error) (*PodVolumeRestoreReconciler, error) { + scheme := runtime.NewScheme() + err := velerov1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + + err = corev1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + + fakeClient := &FakeClient{ + Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(cliObj...).Build(), + } + + for k := range needError { + if k == 0 { + fakeClient.getError = needError[0] + } else if k == 1 { + fakeClient.createError = needError[1] + } else if k == 2 { + fakeClient.updateError = needError[2] + } else if k == 3 { + fakeClient.patchError = needError[3] + } else if k == 4 { + fakeClient.updateConflict = needError[4] + } else if k == 5 { + fakeClient.listError = needError[5] + } + } + + var fakeKubeClient *clientgofake.Clientset + if len(objects) != 0 { + fakeKubeClient = clientgofake.NewSimpleClientset(objects...) + } else { + fakeKubeClient = clientgofake.NewSimpleClientset() + } + + fakeFS := velerotest.NewFakeFileSystem() + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "test-uid", "test-pvc") + _, err = fakeFS.Create(pathGlob) + if err != nil { + return nil, err + } + + dataPathMgr := datapath.NewManager(1) + + return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil +} + +func TestPodVolumeRestoreReconcile(t *testing.T) { + daemonSet := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: appsv1api.SchemeGroupVersion.String(), + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Containers: []corev1api.Container{ + { + Image: "fake-image", + }, + }, + }, + }, + }, + } + + node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result() + + tests := []struct { + name string + pvr *velerov1api.PodVolumeRestore + notCreatePVR bool + targetPod *corev1api.Pod + dataMgr *datapath.Manager + needErrs []bool + needCreateFSBR bool + needDelete bool + sportTime *metav1.Time + mockExposeErr *bool + isGetExposeErr bool + isGetExposeNil bool + isPeekExposeErr bool + isNilExposer bool + notNilExpose bool + notMockCleanUp bool + mockInit bool + mockInitErr error + mockStart bool + mockStartErr error + mockCancel bool + mockClose bool + needExclusiveUpdateError error + expected *velerov1api.PodVolumeRestore + expectDeleted bool + expectCancelRecord bool + expectedResult *ctrl.Result + expectedErr string + expectDataPath bool + }{ + { + name: "pvr not found", + pvr: pvrBuilder().Result(), + notCreatePVR: true, + }, + { + name: "pvr not created in velero default namespace", + pvr: builder.ForPodVolumeRestore("test-ns", pvrName).Result(), + }, + { + name: "get dd fail", + pvr: builder.ForPodVolumeRestore("test-ns", pvrName).Result(), + needErrs: []bool{true, false, false, false}, + expectedErr: "Get error", + }, + { + name: "add finalizer to pvr", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "add finalizer to pvr failed", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result(), + needErrs: []bool{false, false, true, false}, + expectedErr: "error updating PVR velero/pvr-1: Update error", + }, + { + name: "pvr is under deletion", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(), + needDelete: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Result(), + }, + { + name: "pvr is under deletion but cancel failed", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(), + needErrs: []bool{false, false, true, false}, + needDelete: true, + expectedErr: "error updating PVR velero/pvr-1: Update error", + }, + { + name: "pvr is under deletion and in terminal state", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(), + sportTime: &metav1.Time{Time: time.Now()}, + needDelete: true, + expectDeleted: true, + }, + { + name: "pvr is under deletion and in terminal state, but remove finalizer failed", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(), + needErrs: []bool{false, false, true, false}, + needDelete: true, + expectedErr: "error updating PVR velero/pvr-1: Update error", + }, + { + name: "delay cancel negative for others", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), + sportTime: &metav1.Time{Time: time.Now()}, + expectCancelRecord: true, + }, + { + name: "delay cancel negative for inProgress", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 58)}, + expectCancelRecord: true, + }, + { + name: "delay cancel affirmative for others", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Minute * 5)}, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(), + }, + { + name: "delay cancel affirmative for inProgress", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(), + }, + { + name: "delay cancel failed", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + needErrs: []bool{false, false, true, false}, + sportTime: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + expectCancelRecord: true, + }, + { + name: "Unknown pvr status", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "new pvr but accept failed", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(), + targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(), + needErrs: []bool{false, false, true, false}, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(), + expectedErr: "error accepting PVR pvr-1: error updating PVR velero/pvr-1: Update error", + }, + { + name: "pvr is cancel on accepted", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Result(), + expectCancelRecord: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(), + }, + { + name: "pvr expose failed", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).PodNamespace("test-ns").PodName("test-pod").Finalizers([]string{PodVolumeFinalizer}).Result(), + targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(), + mockExposeErr: boolptr.True(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Message("error to expose PVR").Result(), + expectedErr: "Error to expose restore exposer", + }, + { + name: "pvr succeeds for accepted", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).PodNamespace("test-ns").PodName("test-pod").Finalizers([]string{PodVolumeFinalizer}).Result(), + mockExposeErr: boolptr.False(), + notMockCleanUp: true, + targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + }, + { + name: "prepare timeout on accepted", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Finalizers([]string{PodVolumeFinalizer}).AcceptedTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 30)}).Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseFailed).Message("timeout on preparing PVR").Result(), + }, + { + name: "peek error on accepted", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Finalizers([]string{PodVolumeFinalizer}).Result(), + isPeekExposeErr: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Finalizers([]string{PodVolumeFinalizer}).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Message("found a PVR velero/pvr-1 with expose error: fake-peek-error. mark it as cancel").Result(), + }, + { + name: "cancel on pvr", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Node("test-node").Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Finalizers([]string{PodVolumeFinalizer}).Cancel(true).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Result(), + }, + { + name: "Failed to get restore expose on prepared", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + isGetExposeErr: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("exposed PVR is not ready").Result(), + expectedErr: "Error to get PVR exposer", + }, + { + name: "Get nil restore expose on prepared", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + isGetExposeNil: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("exposed PVR is not ready").Result(), + expectedErr: "no expose result is available for the current node", + }, + { + name: "Error in data path is concurrent limited", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + dataMgr: datapath.NewManager(0), + notNilExpose: true, + notMockCleanUp: true, + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path init error", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + mockInit: true, + mockInitErr: errors.New("fake-data-path-init-error"), + mockClose: true, + notNilExpose: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("error initializing data path").Result(), + expectedErr: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for pvr", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + needErrs: []bool{false, false, true, false}, + mockInit: true, + mockClose: true, + notNilExpose: true, + notMockCleanUp: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "data path start error", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + mockInit: true, + mockStart: true, + mockStartErr: errors.New("fake-data-path-start-error"), + mockClose: true, + notNilExpose: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseFailed).Finalizers([]string{PodVolumeFinalizer}).Message("error starting data path").Result(), + expectedErr: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", + }, + { + name: "Prepare succeeds", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhasePrepared).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + mockInit: true, + mockStart: true, + notNilExpose: true, + notMockCleanUp: true, + expectDataPath: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "In progress pvr is not handled by the current node", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "In progress pvr is not set as cancel", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "Cancel pvr in progress with empty FSBR", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceled).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(), + }, + { + name: "Cancel pvr in progress and patch pvr error", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + needErrs: []bool{false, false, true, false}, + needCreateFSBR: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(), + expectedErr: "error updating PVR velero/pvr-1: Update error", + expectCancelRecord: true, + expectDataPath: true, + }, + { + name: "Cancel pvr in progress succeeds", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + needCreateFSBR: true, + mockCancel: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseCanceling).Cancel(true).Finalizers([]string{PodVolumeFinalizer}).Result(), + expectDataPath: true, + expectCancelRecord: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + objs := []runtime.Object{daemonSet, node} + + ctlObj := []client.Object{} + if test.targetPod != nil { + ctlObj = append(ctlObj, test.targetPod) + } + + r, err := initPodVolumeRestoreReconciler(objs, ctlObj, test.needErrs...) + require.NoError(t, err) + + if !test.notCreatePVR { + err = r.client.Create(context.Background(), test.pvr) + require.NoError(t, err) + } + + if test.needDelete { + err = r.client.Delete(context.Background(), test.pvr) + require.NoError(t, err) + } + + if test.dataMgr != nil { + r.dataPathMgr = test.dataMgr + } else { + r.dataPathMgr = datapath.NewManager(1) + } + + if test.sportTime != nil { + r.cancelledPVR[test.pvr.Name] = test.sportTime.Time + } + + funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore + if test.needExclusiveUpdateError != nil { + funcExclusiveUpdatePodVolumeRestore = func(context.Context, kbclient.Client, *velerov1api.PodVolumeRestore, func(*velerov1api.PodVolumeRestore)) (bool, error) { + return false, test.needExclusiveUpdateError + } + } + + datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, + string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + asyncBR := datapathmockes.NewAsyncBR(t) + if test.mockInit { + asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr) + } + + if test.mockStart { + asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr) + } + + if test.mockCancel { + asyncBR.On("Cancel").Return() + } + + if test.mockClose { + asyncBR.On("Close", mock.Anything).Return() + } + + return asyncBR + } + + if test.mockExposeErr != nil || test.isGetExposeErr || test.isGetExposeNil || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose { + if test.isNilExposer { + r.exposer = nil + } else { + r.exposer = func() exposer.PodVolumeExposer { + ep := exposermockes.NewPodVolumeExposer(t) + if test.mockExposeErr != nil { + if boolptr.IsSetToTrue(test.mockExposeErr) { + ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer")) + } else { + ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + } + } else if test.notNilExpose { + hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1api.Volume{Name: "test-pvc"}).Result() + hostingPod.ObjectMeta.SetUID("test-uid") + ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil) + } else if test.isGetExposeErr { + ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get PVR exposer")) + } else if test.isGetExposeNil { + ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + } else if test.isPeekExposeErr { + ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error")) + } + + if !test.notMockCleanUp { + ep.On("CleanUp", mock.Anything, mock.Anything).Return() + } + return ep + }() + } + } + + if test.needCreateFSBR { + if fsBR := r.dataPathMgr.GetAsyncBR(test.pvr.Name); fsBR == nil { + _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeRestore, test.pvr.Name, pVBRRequestor, + velerov1api.DefaultNamespace, "", "", datapath.Callbacks{OnCancelled: r.OnDataPathCancelled}, false, velerotest.NewLogger()) + require.NoError(t, err) + } + } + + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: velerov1api.DefaultNamespace, + Name: test.pvr.Name, + }, + }) + + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + } + + if test.expectedResult != nil { + assert.Equal(t, test.expectedResult.Requeue, actualResult.Requeue) + assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) + } + + if test.expected != nil || test.expectDeleted { + pvr := velerov1api.PodVolumeRestore{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.pvr.Name, + Namespace: test.pvr.Namespace, + }, &pvr) + + if test.expectDeleted { + assert.True(t, apierrors.IsNotFound(err)) + } else { + require.NoError(t, err) + + assert.Equal(t, test.expected.Status.Phase, pvr.Status.Phase) + assert.Contains(t, pvr.Status.Message, test.expected.Status.Message) + assert.Equal(t, test.expected.Finalizers, pvr.Finalizers) + assert.Equal(t, test.expected.Spec.Cancel, pvr.Spec.Cancel) + } + } + + if !test.expectDataPath { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.pvr.Name)) + } else { + assert.NotNil(t, r.dataPathMgr.GetAsyncBR(test.pvr.Name)) + } + + if test.expectCancelRecord { + assert.Contains(t, r.cancelledPVR, test.pvr.Name) + } else { + assert.Empty(t, r.cancelledPVR) + } + }) + } +} + +func TestOnPodVolumeRestoreFailed(t *testing.T) { + for _, getErr := range []bool{true, false} { + ctx := context.TODO() + needErrs := []bool{getErr, false, false, false} + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...) + require.NoError(t, err) + + pvr := pvrBuilder().Result() + namespace := pvr.Namespace + pvrName := pvr.Name + + assert.NoError(t, r.client.Create(ctx, pvr)) + r.OnDataPathFailed(ctx, namespace, pvrName, fmt.Errorf("Failed to handle %v", pvrName)) + updatedPVR := &velerov1api.PodVolumeRestore{} + if getErr { + assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR)) + assert.NotEqual(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase) + assert.True(t, updatedPVR.Status.StartTimestamp.IsZero()) + } else { + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR)) + assert.Equal(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase) + assert.True(t, updatedPVR.Status.StartTimestamp.IsZero()) + } + } +} + +func TestOnPodVolumeRestoreCancelled(t *testing.T) { + for _, getErr := range []bool{true, false} { + ctx := context.TODO() + needErrs := []bool{getErr, false, false, false} + r, err := initPodVolumeRestoreReconciler(nil, nil, needErrs...) + require.NoError(t, err) + + pvr := pvrBuilder().Result() + namespace := pvr.Namespace + pvrName := pvr.Name + + assert.NoError(t, r.client.Create(ctx, pvr)) + r.OnDataPathCancelled(ctx, namespace, pvrName) + updatedPVR := &velerov1api.PodVolumeRestore{} + if getErr { + assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR)) + assert.NotEqual(t, velerov1api.PodVolumeRestorePhaseFailed, updatedPVR.Status.Phase) + assert.True(t, updatedPVR.Status.StartTimestamp.IsZero()) + } else { + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR)) + assert.Equal(t, velerov1api.PodVolumeRestorePhaseCanceled, updatedPVR.Status.Phase) + assert.False(t, updatedPVR.Status.StartTimestamp.IsZero()) + assert.False(t, updatedPVR.Status.CompletionTimestamp.IsZero()) + } + } +} + +func TestOnPodVolumeRestoreCompleted(t *testing.T) { + tests := []struct { + name string + emptyFSBR bool + isGetErr bool + rebindVolumeErr bool + }{ + { + name: "PVR complete", + emptyFSBR: false, + isGetErr: false, + rebindVolumeErr: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + needErrs := []bool{test.isGetErr, false, false, false} + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...) + r.exposer = func() exposer.PodVolumeExposer { + ep := exposermockes.NewPodVolumeExposer(t) + ep.On("CleanUp", mock.Anything, mock.Anything).Return() + return ep + }() + + require.NoError(t, err) + pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result() + namespace := pvr.Namespace + ddName := pvr.Name + + assert.NoError(t, r.client.Create(ctx, pvr)) + r.OnDataPathCompleted(ctx, namespace, ddName, datapath.Result{}) + updatedDD := &velerov1api.PodVolumeRestore{} + if test.isGetErr { + assert.Error(t, r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, updatedDD)) + assert.Equal(t, velerov1api.PodVolumeRestorePhase(""), updatedDD.Status.Phase) + assert.True(t, updatedDD.Status.CompletionTimestamp.IsZero()) + } else { + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, updatedDD)) + assert.Equal(t, velerov1api.PodVolumeRestorePhaseCompleted, updatedDD.Status.Phase) + assert.False(t, updatedDD.Status.CompletionTimestamp.IsZero()) + } + }) + } +} + +func TestOnPodVolumeRestoreProgress(t *testing.T) { + totalBytes := int64(1024) + bytesDone := int64(512) + tests := []struct { + name string + pvr *velerov1api.PodVolumeRestore + progress uploader.Progress + needErrs []bool + }{ + { + name: "patch in progress phase success", + pvr: pvrBuilder().Result(), + progress: uploader.Progress{ + TotalBytes: totalBytes, + BytesDone: bytesDone, + }, + }, + { + name: "failed to get pvr", + pvr: pvrBuilder().Result(), + needErrs: []bool{true, false, false, false}, + }, + { + name: "failed to patch pvr", + pvr: pvrBuilder().Result(), + needErrs: []bool{false, false, true, false}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, test.needErrs...) + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{}) + }() + + pvr := pvrBuilder().Result() + namespace := pvr.Namespace + pvrName := pvr.Name + + assert.NoError(t, r.client.Create(context.Background(), pvr)) + + // Create a Progress object + progress := &uploader.Progress{ + TotalBytes: totalBytes, + BytesDone: bytesDone, + } + + r.OnDataPathProgress(ctx, namespace, pvrName, progress) + if len(test.needErrs) != 0 && !test.needErrs[0] { + updatedPVR := &velerov1api.PodVolumeRestore{} + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, updatedPVR)) + assert.Equal(t, test.progress.TotalBytes, updatedPVR.Status.Progress.TotalBytes) + assert.Equal(t, test.progress.BytesDone, updatedPVR.Status.Progress.BytesDone) + } + }) + } +} + +func TestFindPVBForRestorePod(t *testing.T) { + needErrs := []bool{false, false, false, false} + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, needErrs...) + require.NoError(t, err) + tests := []struct { + name string + pvr *velerov1api.PodVolumeRestore + pod *corev1api.Pod + checkFunc func(*velerov1api.PodVolumeRestore, []reconcile.Request) + }{ + { + name: "find pvr for pod", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(), + checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { + // Assert that the function returns a single request + assert.Len(t, requests, 1) + // Assert that the request contains the correct namespaced name + assert.Equal(t, pvr.Namespace, requests[0].Namespace) + assert.Equal(t, pvr.Name, requests[0].Name) + }, + }, { + name: "no selected label found for pod", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Result(), + checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { + // Assert that the function returns a single request + assert.Empty(t, requests) + }, + }, { + name: "no matched pod", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: "non-existing-pvr"}).Result(), + checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { + assert.Empty(t, requests) + }, + }, + { + name: "pvr not accept", + pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Result(), + checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) { + assert.Empty(t, requests) + }, + }, + } + for _, test := range tests { + ctx := context.Background() + assert.NoError(t, r.client.Create(ctx, test.pod)) + assert.NoError(t, r.client.Create(ctx, test.pvr)) + // Call the findSnapshotRestoreForPod function + requests := r.findPVRForRestorePod(context.Background(), test.pod) + test.checkFunc(test.pvr, requests) + r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + } +} + +func TestOnPVRPrepareTimeout(t *testing.T) { + tests := []struct { + name string + pvr *velerov1api.PodVolumeRestore + needErrs []error + expected *velerov1api.PodVolumeRestore + }{ + { + name: "update fail", + pvr: pvrBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + expected: pvrBuilder().Result(), + }, + { + name: "update interrupted", + pvr: pvrBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + expected: pvrBuilder().Result(), + }, + { + name: "succeed", + pvr: pvrBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + expected: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseFailed).Result(), + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initPodVolumeRestoreReconcilerWithError(nil, []client.Object{}, test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.pvr) + require.NoError(t, err) + + r.onPrepareTimeout(ctx, test.pvr) + + pvr := velerov1api.PodVolumeRestore{} + _ = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.pvr.Name, + Namespace: test.pvr.Namespace, + }, &pvr) + + assert.Equal(t, test.expected.Status.Phase, pvr.Status.Phase) + } +} + +func TestTryCancelPVR(t *testing.T) { + tests := []struct { + name string + pvr *velerov1api.PodVolumeRestore + needErrs []error + succeeded bool + expectedErr string + }{ + { + name: "update fail", + pvr: pvrBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + }, + { + name: "cancel by others", + pvr: pvrBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + }, + { + name: "succeed", + pvr: pvrBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + succeeded: true, + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initPodVolumeRestoreReconcilerWithError(nil, []client.Object{}, test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.pvr) + require.NoError(t, err) + + r.tryCancelPodVolumeRestore(ctx, test.pvr, "") + + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectedErr) + } + } +} + +func TestUpdatePVRWithRetry(t *testing.T) { + namespacedName := types.NamespacedName{ + Name: pvrName, + Namespace: "velero", + } + + // Define test cases + testCases := []struct { + Name string + needErrs []bool + noChange bool + ExpectErr bool + }{ + { + Name: "SuccessOnFirstAttempt", + }, + { + Name: "Error get", + needErrs: []bool{true, false, false, false, false}, + ExpectErr: true, + }, + { + Name: "Error update", + needErrs: []bool{false, false, true, false, false}, + ExpectErr: true, + }, + { + Name: "no change", + noChange: true, + needErrs: []bool{false, false, true, false, false}, + }, + { + Name: "Conflict with error timeout", + needErrs: []bool{false, false, false, false, true}, + ExpectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + ctx, cancelFunc := context.WithTimeout(context.TODO(), time.Second*5) + defer cancelFunc() + r, err := initPodVolumeRestoreReconciler(nil, []client.Object{}, tc.needErrs...) + require.NoError(t, err) + err = r.client.Create(ctx, pvrBuilder().Result()) + require.NoError(t, err) + updateFunc := func(pvr *velerov1api.PodVolumeRestore) bool { + if tc.noChange { + return false + } + + pvr.Spec.Cancel = true + + return true + } + err = UpdatePVRWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc) + if tc.ExpectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/exposer/mocks/PodVolumeExposer.go b/pkg/exposer/mocks/PodVolumeExposer.go new file mode 100644 index 000000000..fbd5749ba --- /dev/null +++ b/pkg/exposer/mocks/PodVolumeExposer.go @@ -0,0 +1,125 @@ +// Code generated by mockery v2.39.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + client "sigs.k8s.io/controller-runtime/pkg/client" + + exposer "github.com/vmware-tanzu/velero/pkg/exposer" + + mock "github.com/stretchr/testify/mock" + + time "time" + + corev1api "k8s.io/api/core/v1" +) + +// PodVolumeExposer is an autogenerated mock type for the PodVolumeExposer type +type PodVolumeExposer struct { + mock.Mock +} + +// CleanUp provides a mock function with given fields: _a0, _a1 +func (_m *PodVolumeExposer) CleanUp(_a0 context.Context, _a1 corev1api.ObjectReference) { + _m.Called(_a0, _a1) +} + +// DiagnoseExpose provides a mock function with given fields: _a0, _a1 +func (_m *PodVolumeExposer) DiagnoseExpose(_a0 context.Context, _a1 corev1api.ObjectReference) string { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for DiagnoseExpose") + } + + var r0 string + if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference) string); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Expose provides a mock function with given fields: _a0, _a1, _a2 +func (_m *PodVolumeExposer) Expose(_a0 context.Context, _a1 corev1api.ObjectReference, _a2 exposer.PodVolumeExposeParam) error { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for Expose") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, exposer.PodVolumeExposeParam) error); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetExposed provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *PodVolumeExposer) GetExposed(_a0 context.Context, _a1 corev1api.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + if len(ret) == 0 { + panic("no return value specified for GetExposed") + } + + var r0 *exposer.ExposeResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok { + return rf(_a0, _a1, _a2, _a3, _a4) + } + if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) *exposer.ExposeResult); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*exposer.ExposeResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PeekExposed provides a mock function with given fields: _a0, _a1 +func (_m *PodVolumeExposer) PeekExposed(_a0 context.Context, _a1 corev1api.ObjectReference) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for PeekExposed") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, corev1api.ObjectReference) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewPodVolumeExposer creates a new instance of PodVolumeExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPodVolumeExposer(t interface { + mock.TestingT + Cleanup(func()) +}) *PodVolumeExposer { + mock := &PodVolumeExposer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/podvolume/restorer.go b/pkg/podvolume/restorer.go index 1c50b79d9..bebbeee3e 100644 --- a/pkg/podvolume/restorer.go +++ b/pkg/podvolume/restorer.go @@ -98,7 +98,7 @@ func newRestorer( return } - if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed { + if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled { r.resultsLock.Lock() defer r.resultsLock.Unlock() @@ -234,7 +234,7 @@ ForEachVolume: errs = append(errs, errors.New("timed out waiting for all PodVolumeRestores to complete")) break ForEachVolume case res := <-resultsChan: - if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed { + if res.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed || res.Status.Phase == velerov1api.PodVolumeRestorePhaseCanceled { errs = append(errs, errors.Errorf("pod volume restore failed: %s", res.Status.Message)) } tracker.TrackPodVolume(res)