diff --git a/changelogs/unreleased/6410-blackpiglet b/changelogs/unreleased/6410-blackpiglet new file mode 100644 index 000000000..a72bab343 --- /dev/null +++ b/changelogs/unreleased/6410-blackpiglet @@ -0,0 +1 @@ +Retrieve DataUpload into backup result ConfigMap during volume snapshot restore. \ No newline at end of file diff --git a/changelogs/unreleased/6436-qiuming-best b/changelogs/unreleased/6436-qiuming-best new file mode 100644 index 000000000..adb564bb9 --- /dev/null +++ b/changelogs/unreleased/6436-qiuming-best @@ -0,0 +1 @@ +Add data download controller for data mover diff --git a/pkg/apis/velero/v1/labels_annotations.go b/pkg/apis/velero/v1/labels_annotations.go index 15bd57d78..defd56421 100644 --- a/pkg/apis/velero/v1/labels_annotations.go +++ b/pkg/apis/velero/v1/labels_annotations.go @@ -78,4 +78,27 @@ const ( // AsyncOperationIDLabel is the label key used to identify the async operation ID AsyncOperationIDLabel = "velero.io/async-operation-id" + + // PVCNameLabel is the label key used to identify the the PVC's namespace and name. + // The format is /. + PVCNamespaceNameLabel = "velero.io/pvc-namespace-name" + + // DynamicPVRestoreLabel is the label key for dynamic PV restore + DynamicPVRestoreLabel = "velero.io/dynamic-pv-restore" + + // ResourceUsageLabel is the label key to explain the Velero resource usage. + ResourceUsageLabel = "velero.io/resource-usage" +) + +type AsyncOperationIDPrefix string + +const ( + AsyncOperationIDPrefixDataDownload AsyncOperationIDPrefix = "dd-" + AsyncOperationIDPrefixDataUpload AsyncOperationIDPrefix = "du-" +) + +type VeleroResourceUsage string + +const ( + VeleroResourceUsageDataUploadResult VeleroResourceUsage = "DataUpload" ) diff --git a/pkg/builder/data_download_builder.go b/pkg/builder/data_download_builder.go new file mode 100644 index 000000000..f46e5b4e6 --- /dev/null +++ b/pkg/builder/data_download_builder.go @@ -0,0 +1,112 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" +) + +// DataDownloadBuilder builds DataDownload objects. +type DataDownloadBuilder struct { + object *velerov2alpha1api.DataDownload +} + +// ForDataDownload is the constructor of DataDownloadBuilder +func ForDataDownload(namespace, name string) *DataDownloadBuilder { + return &DataDownloadBuilder{ + object: &velerov2alpha1api.DataDownload{ + TypeMeta: metav1.TypeMeta{ + Kind: "DataDownload", + APIVersion: velerov2alpha1api.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + } +} + +// Result returns the built DataDownload. +func (d *DataDownloadBuilder) Result() *velerov2alpha1api.DataDownload { + return d.object +} + +// BackupStorageLocation sets the DataDownload's backup storage location. +func (d *DataDownloadBuilder) BackupStorageLocation(name string) *DataDownloadBuilder { + d.object.Spec.BackupStorageLocation = name + return d +} + +// Phase sets the DataDownload's phase. +func (d *DataDownloadBuilder) Phase(phase velerov2alpha1api.DataDownloadPhase) *DataDownloadBuilder { + d.object.Status.Phase = phase + return d +} + +// SnapshotID sets the DataDownload's SnapshotID. +func (d *DataDownloadBuilder) SnapshotID(id string) *DataDownloadBuilder { + d.object.Spec.SnapshotID = id + return d +} + +// DataMover sets the DataDownload's DataMover. +func (d *DataDownloadBuilder) DataMover(dataMover string) *DataDownloadBuilder { + d.object.Spec.DataMover = dataMover + return d +} + +// SourceNamespace sets the DataDownload's SourceNamespace. +func (d *DataDownloadBuilder) SourceNamespace(sourceNamespace string) *DataDownloadBuilder { + d.object.Spec.SourceNamespace = sourceNamespace + return d +} + +// TargetVolume sets the DataDownload's TargetVolume. +func (d *DataDownloadBuilder) TargetVolume(targetVolume velerov2alpha1api.TargetVolumeSpec) *DataDownloadBuilder { + d.object.Spec.TargetVolume = targetVolume + return d +} + +// Cancel sets the DataDownload's Cancel. +func (d *DataDownloadBuilder) Cancel(cancel bool) *DataDownloadBuilder { + d.object.Spec.Cancel = cancel + return d +} + +// OperationTimeout sets the DataDownload's OperationTimeout. +func (d *DataDownloadBuilder) OperationTimeout(timeout metav1.Duration) *DataDownloadBuilder { + d.object.Spec.OperationTimeout = timeout + return d +} + +// DataMoverConfig sets the DataDownload's DataMoverConfig. +func (d *DataDownloadBuilder) DataMoverConfig(config *map[string]string) *DataDownloadBuilder { + d.object.Spec.DataMoverConfig = *config + return d +} + +// ObjectMeta applies functional options to the DataDownload's ObjectMeta. +func (b *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBuilder { + for _, opt := range opts { + opt(b.object) + } + + return b +} diff --git a/pkg/builder/object_meta.go b/pkg/builder/object_meta.go index 90730e4be..561187f9e 100644 --- a/pkg/builder/object_meta.go +++ b/pkg/builder/object_meta.go @@ -160,3 +160,10 @@ func WithCreationTimestamp(t time.Time) func(obj metav1.Object) { obj.SetCreationTimestamp(metav1.Time{Time: t}) } } + +// WithOwnerReference is a functional option that applies the specified OwnerReference to an object. +func WithOwnerReference(val []metav1.OwnerReference) func(obj metav1.Object) { + return func(obj metav1.Object) { + obj.SetOwnerReferences(val) + } +} diff --git a/pkg/builder/persistent_volume_claim_builder.go b/pkg/builder/persistent_volume_claim_builder.go index 376d71444..569277dd3 100644 --- a/pkg/builder/persistent_volume_claim_builder.go +++ b/pkg/builder/persistent_volume_claim_builder.go @@ -18,6 +18,7 @@ package builder import ( corev1api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -73,3 +74,39 @@ func (b *PersistentVolumeClaimBuilder) Phase(phase corev1api.PersistentVolumeCla b.object.Status.Phase = phase return b } + +// RequestResource sets the PersistentVolumeClaim's spec.Resources.Requests. +func (b *PersistentVolumeClaimBuilder) RequestResource(requests corev1api.ResourceList) *PersistentVolumeClaimBuilder { + if b.object.Spec.Resources.Requests == nil { + b.object.Spec.Resources.Requests = make(map[corev1api.ResourceName]resource.Quantity) + } + b.object.Spec.Resources.Requests = requests + return b +} + +// LimitResource sets the PersistentVolumeClaim's spec.Resources.Limits. +func (b *PersistentVolumeClaimBuilder) LimitResource(limits corev1api.ResourceList) *PersistentVolumeClaimBuilder { + if b.object.Spec.Resources.Limits == nil { + b.object.Spec.Resources.Limits = make(map[corev1api.ResourceName]resource.Quantity) + } + b.object.Spec.Resources.Limits = limits + return b +} + +// DataSource sets the PersistentVolumeClaim's spec.DataSource. +func (b *PersistentVolumeClaimBuilder) DataSource(dataSource *corev1api.TypedLocalObjectReference) *PersistentVolumeClaimBuilder { + b.object.Spec.DataSource = dataSource + return b +} + +// DataSourceRef sets the PersistentVolumeClaim's spec.DataSourceRef. +func (b *PersistentVolumeClaimBuilder) DataSourceRef(dataSourceRef *corev1api.TypedLocalObjectReference) *PersistentVolumeClaimBuilder { + b.object.Spec.DataSourceRef = dataSourceRef + return b +} + +// Selector sets the PersistentVolumeClaim's spec.Selector. +func (b *PersistentVolumeClaimBuilder) Selector(labelSelector *metav1.LabelSelector) *PersistentVolumeClaimBuilder { + b.object.Spec.Selector = labelSelector + return b +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 5e138b250..3f635602e 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -260,6 +260,10 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } + if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil { + s.logger.WithError(err).Fatal("Unable to create the data download controller") + } + s.logger.Info("Controllers starting...") if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/pkg/cmd/server/plugin/plugin.go b/pkg/cmd/server/plugin/plugin.go index a42f88730..45e45389a 100644 --- a/pkg/cmd/server/plugin/plugin.go +++ b/pkg/cmd/server/plugin/plugin.go @@ -22,11 +22,10 @@ import ( apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/features" - "github.com/vmware-tanzu/velero/pkg/backup" "github.com/vmware-tanzu/velero/pkg/client" velerodiscovery "github.com/vmware-tanzu/velero/pkg/discovery" + "github.com/vmware-tanzu/velero/pkg/features" veleroplugin "github.com/vmware-tanzu/velero/pkg/plugin/framework" plugincommon "github.com/vmware-tanzu/velero/pkg/plugin/framework/common" "github.com/vmware-tanzu/velero/pkg/restore" @@ -59,7 +58,8 @@ func NewCommand(f client.Factory) *cobra.Command { RegisterRestoreItemAction("velero.io/change-pvc-node-selector", newChangePVCNodeSelectorItemAction(f)). RegisterRestoreItemAction("velero.io/apiservice", newAPIServiceRestoreItemAction). RegisterRestoreItemAction("velero.io/admission-webhook-configuration", newAdmissionWebhookConfigurationAction). - RegisterRestoreItemAction("velero.io/secret", newSecretRestoreItemAction(f)) + RegisterRestoreItemAction("velero.io/secret", newSecretRestoreItemAction(f)). + RegisterRestoreItemAction("velero.io/dataupload", newDataUploadRetrieveAction(f)) if !features.IsEnabled(velerov1api.APIGroupVersionsFeatureFlag) { // Do not register crd-remap-version BIA if the API Group feature flag is enabled, so that the v1 CRD can be backed up pluginServer = pluginServer.RegisterBackupItemAction("velero.io/crd-remap-version", newRemapCRDVersionAction(f)) @@ -245,3 +245,13 @@ func newSecretRestoreItemAction(f client.Factory) plugincommon.HandlerInitialize return restore.NewSecretAction(logger, client), nil } } + +func newDataUploadRetrieveAction(f client.Factory) plugincommon.HandlerInitializer { + return func(logger logrus.FieldLogger) (interface{}, error) { + client, err := f.KubeClient() + if err != nil { + return nil, err + } + return restore.NewDataUploadRetrieveAction(logger, client.CoreV1().ConfigMaps(f.Namespace())), nil + } +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index d2c596f29..00a4456f0 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -521,6 +521,7 @@ High priorities: - VolumeSnapshotContents are needed as they contain the handle to the volume snapshot in the storage provider - VolumeSnapshots are needed to create PVCs using the VolumeSnapshot as their data source. + - DataUploads need to restore before PVC for Snapshot DataMover to work, because PVC needs the DataUploadResults to create DataDownloads. - PVs go before PVCs because PVCs depend on them. - PVCs go before pods or controllers so they can be mounted as volumes. - Service accounts go before secrets so service account token secrets can be filled automatically. @@ -551,6 +552,7 @@ var defaultRestorePriorities = restore.Priorities{ "volumesnapshotclass.snapshot.storage.k8s.io", "volumesnapshotcontents.snapshot.storage.k8s.io", "volumesnapshots.snapshot.storage.k8s.io", + "datauploads.velero.io", "persistentvolumes", "persistentvolumeclaims", "serviceaccounts", diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go new file mode 100644 index 000000000..1f0e78c69 --- /dev/null +++ b/pkg/controller/data_download_controller.go @@ -0,0 +1,487 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/vmware-tanzu/velero/internal/credentials" + "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + datamover "github.com/vmware-tanzu/velero/pkg/datamover" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" + repository "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +// DataDownloadReconciler reconciles a DataDownload object +type DataDownloadReconciler struct { + client client.Client + kubeClient kubernetes.Interface + logger logrus.FieldLogger + credentialGetter *credentials.CredentialGetter + fileSystem filesystem.Interface + clock clock.WithTickerAndDelayedExecution + restoreExposer exposer.GenericRestoreExposer + nodeName string + repositoryEnsurer *repository.Ensurer + dataPathMgr *datapath.Manager +} + +func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, + repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, logger logrus.FieldLogger) *DataDownloadReconciler { + return &DataDownloadReconciler{ + client: client, + kubeClient: kubeClient, + logger: logger.WithField("controller", "DataDownload"), + credentialGetter: credentialGetter, + fileSystem: filesystem.NewFileSystem(), + clock: &clock.RealClock{}, + nodeName: nodeName, + repositoryEnsurer: repoEnsurer, + restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + dataPathMgr: datapath.NewManager(1), + } +} + +// +kubebuilder:rbac:groups=velero.io,resources=datadownloads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=velero.io,resources=datadownloads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get +// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get +// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get + +func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.logger.WithFields(logrus.Fields{ + "controller": "datadownload", + "datadownload": req.NamespacedName, + }) + + log.Infof("Reconcile %s", req.Name) + + dd := &velerov2alpha1api.DataDownload{} + if err := r.client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, dd); err != nil { + if apierrors.IsNotFound(err) { + log.Warn("DataDownload not found, skip") + return ctrl.Result{}, nil + } + log.WithError(err).Error("Unable to get the DataDownload") + return ctrl.Result{}, err + } + + if dd.Spec.DataMover != "" && dd.Spec.DataMover != dataMoverType { + log.WithField("data mover", dd.Spec.DataMover).Info("it is not one built-in data mover which is not supported by Velero") + return ctrl.Result{}, nil + } + + if r.restoreExposer == nil { + return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log) + } + + if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew { + log.Info("Data download starting") + + if _, err := r.getTargetPVC(ctx, dd); err != nil { + return ctrl.Result{Requeue: true}, nil + } + + accepted, err := r.acceptDataDownload(ctx, dd) + if err != nil { + return r.errorOut(ctx, dd, err, "error to accept the data download", log) + } + + if !accepted { + log.Debug("Data download is not accepted") + return ctrl.Result{}, nil + } + + log.Info("Data download is accepted") + + hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name} + + // ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, + // but the pod maybe is not in the same node of the current controller, so we need to return it here. + // And then only the controller who is in the same node could do the rest work. + err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration) + if err != nil { + return r.errorOut(ctx, dd, err, "error to start restore expose", log) + } + log.Info("Restore is exposed") + + return ctrl.Result{}, nil + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { + log.Info("Data download is prepared") + fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) + + if fsRestore != nil { + log.Info("Cancellable data path is already started") + return ctrl.Result{}, nil + } + + result, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration) + if err != nil { + return r.errorOut(ctx, dd, err, "restore exposer is not ready", log) + } else if result == nil { + log.Debug("Get empty restore exposer") + return ctrl.Result{}, nil + } + + log.Info("Restore PVC is ready") + + // Update status to InProgress + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress + dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Unable to update status to in progress") + return ctrl.Result{}, err + } + + log.Info("Data download is marked as in progress") + + return r.runCancelableDataPath(ctx, dd, result, log) + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + log.Info("Data download is in progress") + if dd.Spec.Cancel { + fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) + if fsRestore == nil { + return ctrl.Result{}, nil + } + + log.Info("Data download is being canceled") + // Update status to Canceling. + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling + if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data download status") + return ctrl.Result{}, err + } + + fsRestore.Cancel() + return ctrl.Result{}, nil + } + + return ctrl.Result{}, nil + } else { + log.Debugf("Data download now is in %s phase and do nothing by current %s controller", dd.Status.Phase, r.nodeName) + return ctrl.Result{}, nil + } +} + +func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { + log.Info("Creating data path routine") + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataDownloadCompleted, + OnFailed: r.OnDataDownloadFailed, + OnCancelled: r.OnDataDownloadCancelled, + OnProgress: r.OnDataDownloadProgress, + } + + fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + log.Info("runCancelableDataDownload is concurrent limited") + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return r.errorOut(ctx, dd, err, "error to create data path", log) + } + } + + path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log) + if err != nil { + return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log) + } + + log.WithField("path", path.ByPath).Debug("Found host path") + if err := fsRestore.Init(ctx, dd.Spec.BackupStorageLocation, dd.Spec.SourceNamespace, datamover.GetUploaderType(dd.Spec.DataMover), + velerov1api.BackupRepositoryTypeKopia, "", r.repositoryEnsurer, r.credentialGetter); err != nil { + return r.errorOut(ctx, dd, err, "error to initialize data path", log) + } + log.WithField("path", path.ByPath).Info("fs init") + + if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path); err != nil { + return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting data path %s restore", path.ByPath), log) + } + + log.WithField("path", path.ByPath).Info("Async fs restore data path started") + return ctrl.Result{}, nil +} + +func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + log.Info("Async fs restore data path completed") + + var dd velerov2alpha1api.DataDownload + if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil { + log.WithError(err).Warn("Failed to get datadownload on completion") + return + } + + objRef := getDataDownloadOwnerObject(&dd) + err := r.restoreExposer.RebindVolume(ctx, objRef, dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, dd.Spec.OperationTimeout.Duration) + if err != nil { + log.WithError(err).Error("Failed to rebind PV to target PVC on completion") + return + } + + log.Info("Cleaning up exposed environment") + r.restoreExposer.CleanUp(ctx, objRef) + + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data download status") + } else { + log.Infof("Data download is marked as %s", dd.Status.Phase) + } +} + +func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + + log.WithError(err).Error("Async fs restore data path failed") + + var dd velerov2alpha1api.DataDownload + if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { + log.WithError(getErr).Warn("Failed to get data download on failure") + } else { + if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil { + log.WithError(err).Warnf("Failed to patch data download with err %v", errOut) + } + } +} + +func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + + log.Warn("Async fs backup data path canceled") + + var dd velerov2alpha1api.DataDownload + if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { + log.WithError(getErr).Warn("Failed to get datadownload on cancel") + } else { + // cleans up any objects generated during the snapshot expose + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd)) + + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled + if dd.Status.StartTimestamp.IsZero() { + dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + } + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data download status") + } + } +} + +func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { + log := r.logger.WithField("datadownload", ddName) + + var dd velerov2alpha1api.DataDownload + if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil { + log.WithError(err).Warn("Failed to get data download on progress") + return + } + + original := dd.DeepCopy() + dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + + if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Failed to update restore snapshot progress") + } +} + +// SetupWithManager registers the DataDownload controller. +// The fresh new DataDownload CR first created will trigger to create one pod (long time, maybe failure or unknown status) by one of the datadownload controllers +// then the request will get out of the Reconcile queue immediately by not blocking others' CR handling, in order to finish the rest data download process we need to +// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted +// pod status and also avoid block others CR handling +func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&velerov2alpha1api.DataDownload{}). + Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod), + builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + newObj := ue.ObjectNew.(*v1.Pod) + + if _, ok := newObj.Labels[velerov1api.DataDownloadLabel]; !ok { + return false + } + + if newObj.Status.Phase != v1.PodRunning { + return false + } + + if newObj.Spec.NodeName == "" { + return false + } + + return true + }, + CreateFunc: func(event.CreateEvent) bool { + return false + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + })). + Complete(r) +} + +func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request { + pod := podObj.(*v1.Pod) + + dd := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Labels[velerov1api.DataDownloadLabel], + }, dd) + + if err != nil { + r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload") + return []reconcile.Request{} + } + + if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, 1) + + r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name) + err = r.patchDataDownload(context.Background(), dd, prepareDataDownload) + if err != nil { + r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to patch data download") + return []reconcile.Request{} + } + + requests[0] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: dd.Namespace, + Name: dd.Name, + }, + } + + return requests +} + +func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error { + original := req.DeepCopy() + mutate(req) + if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil { + return errors.Wrap(err, "error patching data download") + } + + return nil +} + +func prepareDataDownload(ssb *velerov2alpha1api.DataDownload) { + ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared +} + +func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + return ctrl.Result{}, r.updateStatusToFailed(ctx, dd, err, msg, log) +} + +func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error { + log.Infof("update data download status to %v", dd.Status.Phase) + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed + dd.Status.Message = errors.WithMessage(err, msg).Error() + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + + if err = r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating DataDownload status") + return err + } + + return nil +} + +func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) { + updated := dd.DeepCopy() + updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted + + r.logger.Infof("Accepting snapshot restore %s", dd.Name) + // For all data download controller in each node-agent will try to update download CR, and only one controller will success, + // and the success one could handle later logic + err := r.client.Update(ctx, updated) + if err == nil { + return true, nil + } else if apierrors.IsConflict(err) { + r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others") + return false, nil + } else { + return false, err + } +} + +func (r *DataDownloadReconciler) getTargetPVC(ctx context.Context, dd *velerov2alpha1api.DataDownload) (*v1.PersistentVolumeClaim, error) { + return r.kubeClient.CoreV1().PersistentVolumeClaims(dd.Spec.TargetVolume.Namespace).Get(ctx, dd.Spec.TargetVolume.PVC, metav1.GetOptions{}) +} + +func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName string) { + fsBackup := r.dataPathMgr.GetAsyncBR(ddName) + if fsBackup != nil { + fsBackup.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(ddName) +} + +func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectReference { + return v1.ObjectReference{ + Kind: dd.Kind, + Namespace: dd.Namespace, + Name: dd.Name, + UID: dd.UID, + APIVersion: dd.APIVersion, + } +} diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go new file mode 100644 index 000000000..5db1170c7 --- /dev/null +++ b/pkg/controller/data_download_controller_test.go @@ -0,0 +1,211 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clientgofake "k8s.io/client-go/kubernetes/fake" + ctrl "sigs.k8s.io/controller-runtime" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" + exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks" +) + +const dataDownloadName string = "datadownload-1" + +func dataDownloadBuilder() *builder.DataDownloadBuilder { + return builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName). + BackupStorageLocation("bsl-loc"). + DataMover("velero"). + SnapshotID("test-snapshot-id").TargetVolume(velerov2alpha1api.TargetVolumeSpec{ + PV: "test-pv", + PVC: "test-pvc", + Namespace: "test-ns", + }) +} + +func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { + scheme := runtime.NewScheme() + err := velerov1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + err = velerov2alpha1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + err = corev1.AddToScheme(scheme) + if err != nil { + return nil, err + } + + fakeClient := &FakeClient{ + Client: fake.NewClientBuilder().WithScheme(scheme).Build(), + } + + if len(needError) == 4 { + fakeClient.getError = needError[0] + fakeClient.createError = needError[1] + fakeClient.updateError = needError[2] + fakeClient.patchError = needError[3] + } + + fakeKubeClient := clientgofake.NewSimpleClientset(objects...) + fakeFS := velerotest.NewFakeFileSystem() + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataDownloadName) + _, err = fakeFS.Create(pathGlob) + if err != nil { + return nil, err + } + + credentialFileStore, err := credentials.NewNamespacedFileStore( + fakeClient, + velerov1api.DefaultNamespace, + "/tmp/credentials", + fakeFS, + ) + if err != nil { + return nil, err + } + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", velerotest.NewLogger()), nil +} + +func TestDataDownloadReconcile(t *testing.T) { + tests := []struct { + name string + dd *velerov2alpha1api.DataDownload + targetPVC *corev1.PersistentVolumeClaim + dataMgr *datapath.Manager + needErrs []bool + isExposeErr bool + isGetExposeErr bool + expectedStatusMsg string + }{ + { + name: "Restore is exposed", + dd: dataDownloadBuilder().Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + }, + { + name: "Get empty restore exposer", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + }, + { + name: "Failed to get restore exposer", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + expectedStatusMsg: "Error to get restore exposer", + isGetExposeErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r, err := initDataDownloadReconciler([]runtime.Object{test.targetPVC}, test.needErrs...) + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{}) + if test.targetPVC != nil { + r.client.Delete(ctx, test.targetPVC, &kbclient.DeleteOptions{}) + } + }() + + ctx := context.Background() + if test.dd.Namespace == velerov1api.DefaultNamespace { + err = r.client.Create(ctx, test.dd) + require.NoError(t, err) + } + + if test.dataMgr != nil { + r.dataPathMgr = test.dataMgr + } else { + r.dataPathMgr = datapath.NewManager(1) + } + + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return datapathmockes.NewAsyncBR(t) + } + + if test.isExposeErr || test.isGetExposeErr { + r.restoreExposer = func() exposer.GenericRestoreExposer { + ep := exposermockes.NewGenericRestoreExposer(t) + if test.isExposeErr { + ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer")) + } + + if test.isGetExposeErr { + ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer")) + } + + ep.On("CleanUp", mock.Anything, mock.Anything).Return() + return ep + }() + } + + if test.dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + if fsBR := r.dataPathMgr.GetAsyncBR(test.dd.Name); fsBR == nil { + _, err := r.dataPathMgr.CreateFileSystemBR(test.dd.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, velerotest.NewLogger()) + require.NoError(t, err) + } + } + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: velerov1api.DefaultNamespace, + Name: test.dd.Name, + }, + }) + + require.Nil(t, err) + require.NotNil(t, actualResult) + + dd := velerov2alpha1api.DataDownload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.dd.Name, + Namespace: test.dd.Namespace, + }, &dd) + + if test.isGetExposeErr { + assert.Contains(t, dd.Status.Message, test.expectedStatusMsg) + } + require.Nil(t, err) + t.Logf("%s: \n %v \n", test.name, dd) + }) + } +} diff --git a/pkg/datapath/mocks/types.go b/pkg/datapath/mocks/types.go new file mode 100644 index 000000000..ecf655df0 --- /dev/null +++ b/pkg/datapath/mocks/types.go @@ -0,0 +1,86 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + credentials "github.com/vmware-tanzu/velero/internal/credentials" + datapath "github.com/vmware-tanzu/velero/pkg/datapath" + + mock "github.com/stretchr/testify/mock" + + repository "github.com/vmware-tanzu/velero/pkg/repository" +) + +// AsyncBR is an autogenerated mock type for the AsyncBR type +type AsyncBR struct { + mock.Mock +} + +// Cancel provides a mock function with given fields: +func (_m *AsyncBR) Cancel() { + _m.Called() +} + +// Close provides a mock function with given fields: ctx +func (_m *AsyncBR) Close(ctx context.Context) { + _m.Called(ctx) +} + +// Init provides a mock function with given fields: ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter +func (_m *AsyncBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + ret := _m.Called(ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, string, *repository.Ensurer, *credentials.CredentialGetter) error); ok { + r0 = rf(ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StartBackup provides a mock function with given fields: source, realSource, parentSnapshot, forceFull, tags +func (_m *AsyncBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error { + ret := _m.Called(source, realSource, parentSnapshot, forceFull, tags) + + var r0 error + if rf, ok := ret.Get(0).(func(datapath.AccessPoint, string, string, bool, map[string]string) error); ok { + r0 = rf(source, realSource, parentSnapshot, forceFull, tags) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StartRestore provides a mock function with given fields: snapshotID, target +func (_m *AsyncBR) StartRestore(snapshotID string, target datapath.AccessPoint) error { + ret := _m.Called(snapshotID, target) + + var r0 error + if rf, ok := ret.Get(0).(func(string, datapath.AccessPoint) error); ok { + r0 = rf(snapshotID, target) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewAsyncBR interface { + mock.TestingT + Cleanup(func()) +} + +// NewAsyncBR creates a new instance of AsyncBR. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAsyncBR(t mockConstructorTestingTNewAsyncBR) *AsyncBR { + mock := &AsyncBR{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/exposer/mocks/generic_restore.go b/pkg/exposer/mocks/generic_restore.go new file mode 100644 index 000000000..a7d20f87c --- /dev/null +++ b/pkg/exposer/mocks/generic_restore.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + client "sigs.k8s.io/controller-runtime/pkg/client" + + exposer "github.com/vmware-tanzu/velero/pkg/exposer" + + mock "github.com/stretchr/testify/mock" + + time "time" + + v1 "k8s.io/api/core/v1" +) + +// GenericRestoreExposer is an autogenerated mock type for the GenericRestoreExposer type +type GenericRestoreExposer struct { + mock.Mock +} + +// CleanUp provides a mock function with given fields: _a0, _a1 +func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectReference) { + _m.Called(_a0, _a1) +} + +// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5 +func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetExposed provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *GenericRestoreExposer) GetExposed(_a0 context.Context, _a1 v1.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 *exposer.ExposeResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok { + return rf(_a0, _a1, _a2, _a3, _a4) + } + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) *exposer.ExposeResult); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*exposer.ExposeResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RebindVolume provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *GenericRestoreExposer) RebindVolume(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 time.Duration) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, time.Duration) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewGenericRestoreExposer interface { + mock.TestingT + Cleanup(func()) +} + +// NewGenericRestoreExposer creates a new instance of GenericRestoreExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewGenericRestoreExposer(t mockConstructorTestingTNewGenericRestoreExposer) *GenericRestoreExposer { + mock := &GenericRestoreExposer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/restore/dataupload_retrieve_action.go b/pkg/restore/dataupload_retrieve_action.go new file mode 100644 index 000000000..f42837310 --- /dev/null +++ b/pkg/restore/dataupload_retrieve_action.go @@ -0,0 +1,105 @@ +/* +Copyright 2020 the Velero contributors. + + Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restore + +import ( + "context" + "encoding/json" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/label" + "github.com/vmware-tanzu/velero/pkg/plugin/velero" +) + +type DataUploadRetrieveAction struct { + logger logrus.FieldLogger + configMapClient corev1client.ConfigMapInterface +} + +func NewDataUploadRetrieveAction(logger logrus.FieldLogger, configMapClient corev1client.ConfigMapInterface) *DataUploadRetrieveAction { + return &DataUploadRetrieveAction{ + logger: logger, + configMapClient: configMapClient, + } +} + +func (d *DataUploadRetrieveAction) AppliesTo() (velero.ResourceSelector, error) { + return velero.ResourceSelector{ + IncludedResources: []string{"datauploads.velero.io"}, + }, nil +} + +func (d *DataUploadRetrieveAction) Execute(input *velero.RestoreItemActionExecuteInput) (*velero.RestoreItemActionExecuteOutput, error) { + d.logger.Info("Executing DataUploadRetrieveAction") + + dataUpload := velerov2alpha1.DataUpload{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(input.ItemFromBackup.UnstructuredContent(), &dataUpload); err != nil { + d.logger.Errorf("unable to convert unstructured item to DataUpload: %s", err.Error()) + return nil, errors.Wrap(err, "unable to convert unstructured item to DataUpload.") + } + + dataUploadResult := velerov2alpha1.DataUploadResult{ + BackupStorageLocation: dataUpload.Spec.BackupStorageLocation, + DataMover: dataUpload.Spec.DataMover, + SnapshotID: dataUpload.Status.SnapshotID, + SourceNamespace: dataUpload.Spec.SourceNamespace, + DataMoverResult: dataUpload.Status.DataMoverResult, + } + + jsonBytes, err := json.Marshal(dataUploadResult) + if err != nil { + d.logger.Errorf("fail to convert DataUploadResult to JSON: %s", err.Error()) + return nil, errors.Wrap(err, "fail to convert DataUploadResult to JSON") + } + + cm := corev1api.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: corev1api.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: dataUpload.Name + "-", + Namespace: dataUpload.Namespace, + Labels: map[string]string{ + velerov1api.RestoreUIDLabel: label.GetValidName(string(input.Restore.UID)), + velerov1api.PVCNamespaceNameLabel: dataUpload.Spec.SourceNamespace + "." + dataUpload.Spec.SourcePVC, + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + }, + }, + Data: map[string]string{ + string(input.Restore.UID): string(jsonBytes), + }, + } + + _, err = d.configMapClient.Create(context.Background(), &cm, metav1.CreateOptions{}) + if err != nil { + d.logger.Errorf("fail to create DataUploadResult ConfigMap %s/%s: %s", cm.Namespace, cm.Name, err.Error()) + return nil, errors.Wrap(err, "fail to create DataUploadResult ConfigMap") + } + + return &velero.RestoreItemActionExecuteOutput{ + SkipRestore: true, + }, nil +} diff --git a/pkg/restore/dataupload_retrieve_action_test.go b/pkg/restore/dataupload_retrieve_action_test.go new file mode 100644 index 000000000..e04050e6d --- /dev/null +++ b/pkg/restore/dataupload_retrieve_action_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2020 the Velero contributors. + + Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restore + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/plugin/velero" + velerotest "github.com/vmware-tanzu/velero/pkg/test" +) + +func TestDataUploadRetrieveActionExectue(t *testing.T) { + tests := []struct { + name string + dataUpload *velerov2alpha1.DataUpload + restore *velerov1.Restore + expectedDataUploadResult *corev1.ConfigMap + expectedErr string + }{ + { + name: "DataUploadRetrieve Action test", + dataUpload: builder.ForDataUpload("velero", "testDU").SourceNamespace("testNamespace").SourcePVC("testPVC").Result(), + restore: builder.ForRestore("velero", "testRestore").ObjectMeta(builder.WithUID("testingUID")).Result(), + expectedDataUploadResult: builder.ForConfigMap("velero", "").ObjectMeta(builder.WithGenerateName("testDU-"), builder.WithLabels(velerov1.PVCNamespaceNameLabel, "testNamespace.testPVC", velerov1.RestoreUIDLabel, "testingUID", velerov1.ResourceUsageLabel, string(velerov1.VeleroResourceUsageDataUploadResult))).Data("testingUID", `{"backupStorageLocation":"","sourceNamespace":"testNamespace"}`).Result(), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger := velerotest.NewLogger() + cmClient := fake.NewSimpleClientset() + + var unstructuredDataUpload map[string]interface{} + if tc.dataUpload != nil { + var err error + unstructuredDataUpload, err = runtime.DefaultUnstructuredConverter.ToUnstructured(tc.dataUpload) + require.NoError(t, err) + } + input := velero.RestoreItemActionExecuteInput{ + Restore: tc.restore, + ItemFromBackup: &unstructured.Unstructured{Object: unstructuredDataUpload}, + } + + action := NewDataUploadRetrieveAction(logger, cmClient.CoreV1().ConfigMaps("velero")) + _, err := action.Execute(&input) + if tc.expectedErr != "" { + require.Equal(t, tc.expectedErr, err.Error()) + } + require.NoError(t, err) + + if tc.expectedDataUploadResult != nil { + cmList, err := cmClient.CoreV1().ConfigMaps("velero").List(context.Background(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s=%s", velerov1.RestoreUIDLabel, "testingUID", velerov1.PVCNamespaceNameLabel, tc.dataUpload.Spec.SourceNamespace+"."+tc.dataUpload.Spec.SourcePVC), + }) + require.NoError(t, err) + // debug + fmt.Printf("CM: %s\n", &cmList.Items[0]) + require.Equal(t, *tc.expectedDataUploadResult, cmList.Items[0]) + } + }) + } +} diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 1288f69f8..98eaf90cf 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -535,6 +535,21 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { // Close the progress update channel. quit <- struct{}{} + // Clean the DataUploadResult ConfigMaps + defer func() { + opts := []crclient.DeleteAllOfOption{ + crclient.InNamespace(ctx.restore.Namespace), + crclient.MatchingLabels{ + velerov1api.RestoreUIDLabel: string(ctx.restore.UID), + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + }, + } + err := ctx.kbClient.DeleteAllOf(go_context.Background(), &v1.ConfigMap{}, opts...) + if err != nil { + ctx.log.Errorf("Fail to batch delete DataUploadResult ConfigMaps for restore %s: %s", ctx.restore.Name, err.Error()) + } + }() + // Do a final progress update as stopping the ticker might have left last few // updates from taking place. updated := ctx.restore.DeepCopy() diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index 20bdcc13f..2a399726f 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -1397,6 +1397,18 @@ func TestRestoreActionsRunForCorrectItems(t *testing.T) { new(recordResourcesAction).ForNamespace("ns-2").ForResource("pods"): nil, }, }, + { + name: "actions run for datauploads resource", + restore: defaultRestore().Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("datauploads.velero.io", builder.ForDataUpload("velero", "du").Result()). + Done(), + apiResources: []*test.APIResource{test.DataUploads()}, + actions: map[*recordResourcesAction][]string{ + new(recordResourcesAction).ForNamespace("velero").ForResource("datauploads.velero.io"): {"velero/du"}, + }, + }, } for _, tc := range tests { diff --git a/pkg/test/resources.go b/pkg/test/resources.go index dfe22278d..7c2fa17f6 100644 --- a/pkg/test/resources.go +++ b/pkg/test/resources.go @@ -183,3 +183,13 @@ func Services(items ...metav1.Object) *APIResource { Items: items, } } + +func DataUploads(items ...metav1.Object) *APIResource { + return &APIResource{ + Group: "velero.io", + Version: "v2alpha1", + Name: "datauploads", + Namespaced: true, + Items: items, + } +}