diff --git a/changelogs/unreleased/6337-qiuming-best b/changelogs/unreleased/6337-qiuming-best new file mode 100644 index 000000000..d769d0553 --- /dev/null +++ b/changelogs/unreleased/6337-qiuming-best @@ -0,0 +1 @@ +Add dataupload controller diff --git a/pkg/apis/velero/v1/labels_annotations.go b/pkg/apis/velero/v1/labels_annotations.go index 64c83525a..394f99d3f 100644 --- a/pkg/apis/velero/v1/labels_annotations.go +++ b/pkg/apis/velero/v1/labels_annotations.go @@ -54,6 +54,12 @@ const ( // RepositoryTypeLabel is the label key used to identify the type of a repository RepositoryTypeLabel = "velero.io/repository-type" + // DataUploadLabel is the label key used to identify the dataupload for snapshot backup pod + DataUploadLabel = "velero.io/data-upload" + + // DataDownloadLabel is the label key used to identify the datadownload for snapshot restore pod + DataDownloadLabel = "velero.io/data-download" + // SourceClusterK8sVersionAnnotation is the label key used to identify the k8s // git version of the backup , i.e. v1.16.4 SourceClusterK8sGitVersionAnnotation = "velero.io/source-cluster-k8s-gitversion" diff --git a/pkg/builder/data_upload_builder.go b/pkg/builder/data_upload_builder.go new file mode 100644 index 000000000..a844ef6ef --- /dev/null +++ b/pkg/builder/data_upload_builder.go @@ -0,0 +1,115 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" +) + +// DataUploadBuilder builds DataUpload objects +type DataUploadBuilder struct { + object *velerov2alpha1api.DataUpload +} + +// ForDataUpload is the constructor for a DataUploadBuilder. +func ForDataUpload(ns, name string) *DataUploadBuilder { + return &DataUploadBuilder{ + object: &velerov2alpha1api.DataUpload{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov2alpha1api.SchemeGroupVersion.String(), + Kind: "DataUpload", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + }, + } +} + +// Result returns the built DataUpload. +func (d *DataUploadBuilder) Result() *velerov2alpha1api.DataUpload { + return d.object +} + +// BackupStorageLocation sets the DataUpload's backup storage location. +func (d *DataUploadBuilder) BackupStorageLocation(name string) *DataUploadBuilder { + d.object.Spec.BackupStorageLocation = name + return d +} + +// Phase sets the DataUpload's phase. +func (d *DataUploadBuilder) Phase(phase velerov2alpha1api.DataUploadPhase) *DataUploadBuilder { + d.object.Status.Phase = phase + return d +} + +// SnapshotID sets the DataUpload's SnapshotID. +func (d *DataUploadBuilder) SnapshotID(id string) *DataUploadBuilder { + d.object.Status.SnapshotID = id + return d +} + +// DataMover sets the DataUpload's DataMover. +func (d *DataUploadBuilder) DataMover(dataMover string) *DataUploadBuilder { + d.object.Spec.DataMover = dataMover + return d +} + +// SourceNamespace sets the DataUpload's SourceNamespace. +func (d *DataUploadBuilder) SourceNamespace(sourceNamespace string) *DataUploadBuilder { + d.object.Spec.SourceNamespace = sourceNamespace + return d +} + +// SourcePVC sets the DataUpload's SourcePVC. +func (d *DataUploadBuilder) SourcePVC(sourcePVC string) *DataUploadBuilder { + d.object.Spec.SourcePVC = sourcePVC + return d +} + +// SnapshotType sets the DataUpload's SnapshotType. +func (d *DataUploadBuilder) SnapshotType(SnapshotType velerov2alpha1api.SnapshotType) *DataUploadBuilder { + d.object.Spec.SnapshotType = SnapshotType + return d +} + +// Cancel sets the DataUpload's Cancel. +func (d *DataUploadBuilder) Cancel(cancel bool) *DataUploadBuilder { + d.object.Spec.Cancel = cancel + return d +} + +// OperationTimeout sets the DataUpload's OperationTimeout. +func (d *DataUploadBuilder) OperationTimeout(timeout metav1.Duration) *DataUploadBuilder { + d.object.Spec.OperationTimeout = timeout + return d +} + +// DataMoverConfig sets the DataUpload's DataMoverConfig. +func (d *DataUploadBuilder) DataMoverConfig(config *map[string]string) *DataUploadBuilder { + d.object.Spec.DataMoverConfig = config + return d +} + +// CSISnapshot sets the DataUpload's CSISnapshot. +func (d *DataUploadBuilder) CSISnapshot(cSISnapshot *velerov2alpha1api.CSISnapshotSpec) *DataUploadBuilder { + d.object.Spec.CSISnapshot = cSISnapshot + return d +} diff --git a/pkg/builder/pod_builder.go b/pkg/builder/pod_builder.go index de8376f85..8931c14b9 100644 --- a/pkg/builder/pod_builder.go +++ b/pkg/builder/pod_builder.go @@ -76,6 +76,11 @@ func (b *PodBuilder) NodeName(val string) *PodBuilder { return b } +func (b *PodBuilder) Labels(labels map[string]string) *PodBuilder { + b.object.Labels = labels + return b +} + func (b *PodBuilder) InitContainers(containers ...*corev1api.Container) *PodBuilder { for _, c := range containers { b.object.Spec.InitContainers = append(b.object.Spec.InitContainers, *c) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 167109c04..5e138b250 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -36,12 +36,15 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" + "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" + snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" + "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -112,16 +115,18 @@ func NewServerCommand(f client.Factory) *cobra.Command { } type nodeAgentServer struct { - logger logrus.FieldLogger - ctx context.Context - cancelFunc context.CancelFunc - fileSystem filesystem.Interface - mgr manager.Manager - metrics *metrics.ServerMetrics - metricsAddress string - namespace string - nodeName string - config nodeAgentServerConfig + logger logrus.FieldLogger + ctx context.Context + cancelFunc context.CancelFunc + fileSystem filesystem.Interface + mgr manager.Manager + metrics *metrics.ServerMetrics + metricsAddress string + namespace string + nodeName string + config nodeAgentServerConfig + kubeClient kubernetes.Interface + csiSnapshotClient *snapshotv1client.Clientset } func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { @@ -184,14 +189,18 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi // the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot // be used, so we need the kube client here - client, err := factory.KubeClient() + s.kubeClient, err = factory.KubeClient() if err != nil { return nil, err } - if err := s.validatePodVolumesHostPath(client); err != nil { + if err := s.validatePodVolumesHostPath(s.kubeClient); err != nil { return nil, err } + s.csiSnapshotClient, err = snapshotv1client.NewForConfig(clientConfig) + if err != nil { + return nil, err + } return s, nil } @@ -247,6 +256,10 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } + if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil { + s.logger.WithError(err).Fatal("Unable to create the data upload controller") + } + s.logger.Info("Controllers starting...") if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go new file mode 100644 index 000000000..5e687b673 --- /dev/null +++ b/pkg/controller/data_upload_controller.go @@ -0,0 +1,528 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + clocks "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/typed/volumesnapshot/v1" + + "github.com/vmware-tanzu/velero/internal/credentials" + "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +const dataMoverType string = "velero" +const dataUploadDownloadRequestor string = "snapshot-data-upload-download" + +// DataUploadReconciler reconciles a DataUpload object +type DataUploadReconciler struct { + client client.Client + kubeClient kubernetes.Interface + csiSnapshotClient snapshotter.SnapshotV1Interface + repoEnsurer *repository.Ensurer + clock clocks.WithTickerAndDelayedExecution + credentialGetter *credentials.CredentialGetter + nodeName string + fileSystem filesystem.Interface + logger logrus.FieldLogger + snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer + dataPathMgr *datapath.Manager +} + +func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, + csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, + cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, log logrus.FieldLogger) *DataUploadReconciler { + return &DataUploadReconciler{ + client: client, + kubeClient: kubeClient, + csiSnapshotClient: csiSnapshotClient, + clock: clock, + credentialGetter: cred, + nodeName: nodeName, + fileSystem: fs, + logger: log, + repoEnsurer: repoEnsurer, + snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, + dataPathMgr: datapath.NewManager(1), + } +} + +// +kubebuilder:rbac:groups=velero.io,resources=datauploads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=velero.io,resources=datauploads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get +// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get +// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get + +func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.logger.WithFields(logrus.Fields{ + "controller": "dataupload", + "dataupload": req.NamespacedName, + }) + log.Infof("Reconcile %s", req.Name) + var du velerov2alpha1api.DataUpload + if err := r.client.Get(ctx, req.NamespacedName, &du); err != nil { + if apierrors.IsNotFound(err) { + log.Debug("Unable to find DataUpload") + return ctrl.Result{}, nil + } + return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") + } + + if du.Spec.DataMover != "" && du.Spec.DataMover != dataMoverType { + log.WithField("Data mover", du.Spec.DataMover).Debug("it is not one built-in data mover which is not supported by Velero") + return ctrl.Result{}, nil + } + + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] + if !ok { + return r.errorOut(ctx, &du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log) + } + + if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew { + log.Info("Data upload starting") + + accepted, err := r.acceptDataUpload(ctx, &du) + if err != nil { + return r.errorOut(ctx, &du, err, "error to accept the data upload", log) + } + + if !accepted { + log.Debug("Data upload is not accepted") + return ctrl.Result{}, nil + } + + log.Info("Data upload is accepted") + + exposeParam := r.setupExposeParam(&du) + + if err := ep.Expose(ctx, getOwnerObject(&du), exposeParam); err != nil { + return r.errorOut(ctx, &du, err, "error to expose snapshot", log) + } + log.Info("Snapshot is exposed") + // ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, + // but the pod maybe is not in the same node of the current controller, so we need to return it here. + // And then only the controller who is in the same node could do the rest work. + return ctrl.Result{}, nil + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { + log.Info("Data upload is prepared") + fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) + if fsBackup != nil { + log.Info("Cancellable data path is already started") + return ctrl.Result{}, nil + } + waitExposePara := r.setupWaitExposePara(&du) + res, err := ep.GetExposed(ctx, getOwnerObject(&du), du.Spec.OperationTimeout.Duration, waitExposePara) + if err != nil { + return r.errorOut(ctx, &du, err, "exposed snapshot is not ready", log) + } else if res == nil { + log.Debug("Get empty exposer") + return ctrl.Result{}, nil + } + + log.Info("Exposed snapshot is ready") + + // Update status to InProgress + original := du.DeepCopy() + du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress + du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + return r.errorOut(ctx, &du, err, "error updating dataupload status", log) + } + + log.Info("Data upload is marked as in progress") + return r.runCancelableDataUpload(ctx, &du, res, log) + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + log.Info("Data upload is in progress") + if du.Spec.Cancel { + fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) + if fsBackup == nil { + return ctrl.Result{}, nil + } + log.Info("Data upload is being canceled") + + // Update status to Canceling. + original := du.DeepCopy() + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling + if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data upload into canceling status") + return ctrl.Result{}, err + } + fsBackup.Cancel() + return ctrl.Result{}, nil + } + return ctrl.Result{}, nil + } else { + log.Debugf("Data upload now is in %s phase and do nothing by current %s controller", du.Status.Phase, r.nodeName) + return ctrl.Result{}, nil + } +} + +func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { + log.Info("Creating data path routine") + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataUploadCompleted, + OnFailed: r.OnDataUploadFailed, + OnCancelled: r.OnDataUploadCancelled, + OnProgress: r.OnDataUploadProgress, + } + + fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log) // VAE + if err != nil { + if err == datapath.ConcurrentLimitExceed { + log.Info("runCancelableDataUpload is concurrent limited") + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return r.errorOut(ctx, du, err, "error to create data path", log) + } + } + + path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log) + if err != nil { + return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log) + } + + log.WithField("path", path.ByPath).Debug("Found host path") + if err := fsBackup.Init(ctx, du.Spec.BackupStorageLocation, du.Namespace, exposer.GetUploaderType(du.Spec.DataMover), + velerov1api.BackupRepositoryTypeKopia, "", r.repoEnsurer, r.credentialGetter); err != nil { + return r.errorOut(ctx, du, err, "error to initialize data path", log) + } + log.WithField("path", path.ByPath).Info("fs init") + if err := fsBackup.StartBackup(path, "", false, nil); err != nil { + return r.errorOut(ctx, du, err, "error starting data path backup", log) + } + + log.WithField("path", path.ByPath).Info("Async fs backup data path started") + return ctrl.Result{}, nil +} + +func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { + defer r.closeDataPath(ctx, duName) + + log := r.logger.WithField("dataupload", duName) + + log.Info("Async fs backup data path completed") + + var du velerov2alpha1api.DataUpload + if err := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); err != nil { + log.WithError(err).Warn("Failed to get dataupload on completion") + return + } + + // cleans up any objects generated during the snapshot expose + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] + if !ok { + log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). + Warn("Failed to clean up resources on completion") + } else { + var volumeSnapshotName string + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition + volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot + } + ep.CleanUp(ctx, getOwnerObject(&du), volumeSnapshotName, du.Spec.SourceNamespace) + } + + // Update status to Completed with path & snapshot ID. + original := du.DeepCopy() + du.Status.Path = result.Backup.Source.ByPath + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted + du.Status.SnapshotID = result.Backup.SnapshotID + du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if result.Backup.EmptySnapshot { + du.Status.Message = "volume was empty so no data was upload" + } + + if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating DataUpload status") + } + log.Info("Data upload completed") +} + +func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { + defer r.closeDataPath(ctx, duName) + + log := r.logger.WithField("dataupload", duName) + + log.WithError(err).Error("Async fs backup data path failed") + + var du velerov2alpha1api.DataUpload + if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { + log.WithError(getErr).Warn("Failed to get dataupload on failure") + } else { + if _, errOut := r.errorOut(ctx, &du, err, "data path backup failed", log); err != nil { + log.WithError(err).Warnf("Failed to patch dataupload with err %v", errOut) + } + } +} + +func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { + defer r.closeDataPath(ctx, duName) + + log := r.logger.WithField("dataupload", duName) + + log.Warn("Async fs backup data path canceled") + + var du velerov2alpha1api.DataUpload + if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { + log.WithError(getErr).Warn("Failed to get dataupload on cancel") + } else { + // cleans up any objects generated during the snapshot expose + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] + if !ok { + log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). + Warn("Failed to clean up resources on canceled") + } else { + var volumeSnapshotName string + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition + volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot + } + ep.CleanUp(ctx, getOwnerObject(&du), volumeSnapshotName, du.Spec.SourceNamespace) + } + original := du.DeepCopy() + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled + if du.Status.StartTimestamp.IsZero() { + du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + } + du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating DataUpload status") + } + } +} + +func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) { + log := r.logger.WithField("dataupload", duName) + + var du velerov2alpha1api.DataUpload + if err := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); err != nil { + log.WithError(err).Warn("Failed to get dataupload on progress") + return + } + + original := du.DeepCopy() + du.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + + if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Failed to update progress") + } +} + +// SetupWithManager registers the DataUpload controller. +// The fresh new DataUpload CR first created will trigger to create one pod (long time, maybe failure or unknown status) by one of the dataupload controllers +// then the request will get out of the Reconcile queue immediately by not blocking others' CR handling, in order to finish the rest data upload process we need to +// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted +// pod status and also avoid block others CR handling +func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&velerov2alpha1api.DataUpload{}). + Watches(&source.Kind{Type: &corev1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findDataUploadForPod), + builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + newObj := ue.ObjectNew.(*corev1.Pod) + + if _, ok := newObj.Labels[velerov1api.DataUploadLabel]; !ok { + return false + } + + if newObj.Status.Phase != corev1.PodRunning { + return false + } + + if newObj.Spec.NodeName != r.nodeName { + return false + } + + return true + }, + CreateFunc: func(event.CreateEvent) bool { + return false + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + })). + Complete(r) +} + +func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request { + pod := podObj.(*corev1.Pod) + + du := &velerov2alpha1api.DataUpload{} + err := r.client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Labels[velerov1api.DataUploadLabel], + }, du) + + if err != nil { + r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload") + return []reconcile.Request{} + } + + if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted { + return []reconcile.Request{} + } + + r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name) + if err := r.patchDataUpload(context.Background(), du, prepareDataUpload); err != nil { + r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("failed to patch dataupload") + return []reconcile.Request{} + } + + requests := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: du.Namespace, + Name: du.Name, + }, + } + return []reconcile.Request{requests} +} + +func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error { + original := req.DeepCopy() + mutate(req) + if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil { + return errors.Wrap(err, "error patching DataUpload") + } + + return nil +} + +func prepareDataUpload(du *velerov2alpha1api.DataUpload) { + du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared +} + +func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { + if se, ok := r.snapshotExposerList[du.Spec.SnapshotType]; ok { + var volumeSnapshotName string + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition + volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot + } + se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) + } else { + err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType) + } + return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) +} + +func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) error { + original := du.DeepCopy() + du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed + du.Status.Message = errors.WithMessage(err, msg).Error() + if du.Status.StartTimestamp.IsZero() { + du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + } + + du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if err = r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating DataUpload status") + return err + } + + return nil +} + +func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) { + updated := du.DeepCopy() + updated.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted + + r.logger.Infof("Accepting snapshot backup %s", du.Name) + + time.Sleep(2 * time.Second) + // For all data upload controller in each node-agent will try to update dataupload CR, and only one controller will success, + // and the success one could handle later logic + err := r.client.Update(ctx, updated) + if err == nil { + r.logger.WithField("Dataupload", du.Name).Infof("This datauplod backup has been accepted by %s", r.nodeName) + return true, nil + } else if apierrors.IsConflict(err) { + r.logger.WithField("Dataupload", du.Name).Info("This datauplod backup has been accepted by others") + return false, nil + } else { + return false, err + } +} + +func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string) { + fsBackup := r.dataPathMgr.GetAsyncBR(duName) + if fsBackup != nil { + fsBackup.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(duName) +} + +func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} { + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { + return &exposer.CSISnapshotExposeParam{ + SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot, + SourceNamespace: du.Spec.SourceNamespace, + StorageClass: du.Spec.CSISnapshot.StorageClass, + HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name}, + AccessMode: exposer.AccessModeFileSystem, + Timeout: du.Spec.OperationTimeout.Duration, + } + } + return nil +} + +func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} { + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { + return &exposer.CSISnapshotExposeWaitParam{ + NodeClient: r.client, + NodeName: r.nodeName, + } + } + return nil +} + +func getOwnerObject(du *velerov2alpha1api.DataUpload) corev1.ObjectReference { + return corev1.ObjectReference{ + Kind: du.Kind, + Namespace: du.Namespace, + Name: du.Name, + UID: du.UID, + APIVersion: du.APIVersion, + } +} diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go new file mode 100644 index 000000000..7c98c788d --- /dev/null +++ b/pkg/controller/data_upload_controller_test.go @@ -0,0 +1,601 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + "time" + + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" + snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clientgofake "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/clock" + testclocks "k8s.io/utils/clock/testing" + ctrl "sigs.k8s.io/controller-runtime" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" + "github.com/vmware-tanzu/velero/pkg/repository" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" +) + +const dataUploadName = "dataupload-1" + +const fakeSnapshotType velerov2alpha1api.SnapshotType = "fake-snapshot" + +type FakeClient struct { + kbclient.Client + getError bool + createError bool + updateError bool + patchError bool +} + +func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error { + if c.getError { + return fmt.Errorf("Create error") + } + + return c.Client.Get(ctx, key, obj) +} + +func (c *FakeClient) Create(ctx context.Context, obj kbclient.Object, opts ...kbclient.CreateOption) error { + if c.createError { + return fmt.Errorf("Create error") + } + + return c.Client.Create(ctx, obj, opts...) +} + +func (c *FakeClient) Update(ctx context.Context, obj kbclient.Object, opts ...kbclient.UpdateOption) error { + if c.updateError { + return fmt.Errorf("Update error") + } + + return c.Client.Update(ctx, obj, opts...) +} + +func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbclient.Patch, opts ...kbclient.PatchOption) error { + if c.patchError { + return fmt.Errorf("Patch error") + } + + return c.Client.Patch(ctx, obj, patch, opts...) +} + +func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) { + vscName := "fake-vsc" + vsObject := &snapshotv1api.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-volume-snapshot", + Namespace: "fake-ns", + }, + Status: &snapshotv1api.VolumeSnapshotStatus{ + BoundVolumeSnapshotContentName: &vscName, + ReadyToUse: boolptr.True(), + RestoreSize: &resource.Quantity{}, + }, + } + var restoreSize int64 + vscObj := &snapshotv1api.VolumeSnapshotContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-vsc", + }, + Spec: snapshotv1api.VolumeSnapshotContentSpec{ + DeletionPolicy: snapshotv1api.VolumeSnapshotContentDelete, + }, + Status: &snapshotv1api.VolumeSnapshotContentStatus{ + RestoreSize: &restoreSize, + }, + } + now, err := time.Parse(time.RFC1123, time.RFC1123) + if err != nil { + return nil, err + } + now = now.Local() + scheme := runtime.NewScheme() + err = velerov1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + err = velerov2alpha1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + err = corev1.AddToScheme(scheme) + if err != nil { + return nil, err + } + + fakeClient := &FakeClient{ + Client: fake.NewClientBuilder().WithScheme(scheme).Build(), + } + + if len(needError) == 4 { + fakeClient.getError = needError[0] + fakeClient.createError = needError[1] + fakeClient.updateError = needError[2] + fakeClient.patchError = needError[3] + } + + fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj) + fakeKubeClient := clientgofake.NewSimpleClientset() + fakeFS := velerotest.NewFakeFileSystem() + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataUploadName) + _, err = fakeFS.Create(pathGlob) + if err != nil { + return nil, err + } + + credentialFileStore, err := credentials.NewNamespacedFileStore( + fakeClient, + velerov1api.DefaultNamespace, + "/tmp/credentials", + fakeFS, + ) + if err != nil { + return nil, err + } + return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil, + testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, velerotest.NewLogger()), nil +} + +func dataUploadBuilder() *builder.DataUploadBuilder { + csi := &velerov2alpha1api.CSISnapshotSpec{ + SnapshotClass: "csi-azuredisk-vsc", + StorageClass: "default", + VolumeSnapshot: "fake-volume-snapshot", + } + return builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName). + BackupStorageLocation("bsl-loc"). + DataMover("velero"). + SnapshotType("CSI").SourceNamespace("fake-ns").SourcePVC("test-pvc").CSISnapshot(csi) +} + +type fakeSnapshotExposer struct { + kubeClient kbclient.Client + clock clock.WithTickerAndDelayedExecution +} + +func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param interface{}) error { + du := velerov2alpha1api.DataUpload{} + err := f.kubeClient.Get(ctx, kbclient.ObjectKey{ + Name: dataUploadName, + Namespace: velerov1api.DefaultNamespace, + }, &du) + if err != nil { + return err + } + + original := du + du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared + du.Status.StartTimestamp = &metav1.Time{Time: f.clock.Now()} + f.kubeClient.Patch(ctx, &du, kbclient.MergeFrom(&original)) + return nil +} + +func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1.ObjectReference, tm time.Duration, para interface{}) (*exposer.ExposeResult, error) { + pod := &corev1.Pod{} + err := f.kubeClient.Get(ctx, kbclient.ObjectKey{ + Name: dataUploadName, + Namespace: velerov1api.DefaultNamespace, + }, pod) + if err != nil { + return nil, err + } + return &exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: pod, PVC: dataUploadName}}, nil +} + +func (f *fakeSnapshotExposer) CleanUp(context.Context, corev1.ObjectReference, string, string) { +} + +type fakeDataUploadFSBR struct { + du *velerov2alpha1api.DataUpload + kubeClient kbclient.Client + clock clock.WithTickerAndDelayedExecution +} + +func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + return nil +} + +func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { + du := f.du + original := f.du.DeepCopy() + du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted + du.Status.CompletionTimestamp = &metav1.Time{Time: f.clock.Now()} + f.kubeClient.Patch(context.Background(), du, kbclient.MergeFrom(original)) + + return nil +} + +func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint) error { + return nil +} + +func (b *fakeDataUploadFSBR) Cancel() { +} + +func (b *fakeDataUploadFSBR) Close(ctx context.Context) { +} + +func TestReconcile(t *testing.T) { + tests := []struct { + name string + du *velerov2alpha1api.DataUpload + pod *corev1.Pod + snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer + dataMgr *datapath.Manager + expectedProcessed bool + expected *velerov2alpha1api.DataUpload + expectedRequeue ctrl.Result + expectedErrMsg string + needErrs []bool + }{ + { + name: "Dataupload is not initialized", + du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), + expectedProcessed: false, + expected: nil, + expectedRequeue: ctrl.Result{}, + }, { + name: "Error get Dataupload", + du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), + expectedProcessed: false, + expected: nil, + expectedRequeue: ctrl.Result{}, + expectedErrMsg: "getting DataUpload: Create error", + needErrs: []bool{true, false, false, false}, + }, { + name: "Unsupported data mover type", + du: dataUploadBuilder().DataMover("unknown type").Result(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase("").Result(), + expectedRequeue: ctrl.Result{}, + }, { + name: "Unknown type of snapshot exposer is not initialized", + du: dataUploadBuilder().SnapshotType("unknown type").Result(), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be accepted", + du: dataUploadBuilder().Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be prepared", + du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{}, + }, { + name: "Dataupload prepared should be completed", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCompleted).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload with not enabled cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "runCancelableDataUpload is concurrent limited", + dataMgr: datapath.NewManager(0), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + expectedProcessed: false, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r, err := initDataUploaderReconciler(test.needErrs...) + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + }() + ctx := context.Background() + if test.du.Namespace == velerov1api.DefaultNamespace { + err = r.client.Create(ctx, test.du) + require.NoError(t, err) + } + + if test.pod != nil { + err = r.client.Create(ctx, test.pod) + require.NoError(t, err) + } + + if test.dataMgr != nil { + r.dataPathMgr = test.dataMgr + } else { + r.dataPathMgr = datapath.NewManager(1) + } + + if test.du.Spec.SnapshotType == fakeSnapshotType { + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.clock}} + } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} + } + + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return &fakeDataUploadFSBR{ + du: test.du, + kubeClient: r.client, + clock: r.clock, + } + } + + if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { + _, err := r.dataPathMgr.CreateFileSystemBR(test.du.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, velerotest.NewLogger()) + require.NoError(t, err) + } + } + + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: velerov1api.DefaultNamespace, + Name: test.du.Name, + }, + }) + + assert.Equal(t, actualResult, test.expectedRequeue) + if test.expectedErrMsg == "" { + require.NoError(t, err) + } else { + assert.Equal(t, err.Error(), test.expectedErrMsg) + } + + du := velerov2alpha1api.DataUpload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.du.Name, + Namespace: test.du.Namespace, + }, &du) + t.Logf("%s: \n %v \n", test.name, du) + // Assertions + if test.expected == nil { + assert.Equal(t, err != nil, true) + } else { + require.NoError(t, err) + assert.Equal(t, du.Status.Phase, test.expected.Status.Phase) + } + + if test.expectedProcessed { + assert.Equal(t, du.Status.CompletionTimestamp.IsZero(), false) + } + + if !test.expectedProcessed { + assert.Equal(t, du.Status.CompletionTimestamp.IsZero(), true) + } + }) + } +} + +func TestOnDataUploadCancelled(t *testing.T) { + ctx := context.TODO() + r, err := initDataUploaderReconciler() + require.NoError(t, err) + // Create a DataUpload object + du := dataUploadBuilder().Result() + namespace := du.Namespace + duName := du.Name + // Add the DataUpload object to the fake client + assert.NoError(t, r.client.Create(ctx, du)) + + r.OnDataUploadCancelled(ctx, namespace, duName) + updatedDu := &velerov2alpha1api.DataUpload{} + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, updatedDu)) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, updatedDu.Status.Phase) + assert.Equal(t, updatedDu.Status.CompletionTimestamp.IsZero(), false) + assert.Equal(t, updatedDu.Status.StartTimestamp.IsZero(), false) +} + +func TestOnDataUploadProgress(t *testing.T) { + totalBytes := int64(1024) + bytesDone := int64(512) + tests := []struct { + name string + du *velerov2alpha1api.DataUpload + progress uploader.Progress + needErrs []bool + }{ + { + name: "patch in progress phase success", + du: dataUploadBuilder().Result(), + progress: uploader.Progress{ + TotalBytes: totalBytes, + BytesDone: bytesDone, + }, + }, + { + name: "failed to get dataupload", + du: dataUploadBuilder().Result(), + needErrs: []bool{true, false, false, false}, + }, + { + name: "failed to patch dataupload", + du: dataUploadBuilder().Result(), + needErrs: []bool{false, false, false, true}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.TODO() + + r, err := initDataUploaderReconciler(test.needErrs...) + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + }() + // Create a DataUpload object + du := dataUploadBuilder().Result() + namespace := du.Namespace + duName := du.Name + // Add the DataUpload object to the fake client + assert.NoError(t, r.client.Create(context.Background(), du)) + + // Create a Progress object + progress := &uploader.Progress{ + TotalBytes: totalBytes, + BytesDone: bytesDone, + } + + // Call the OnDataUploadProgress function + r.OnDataUploadProgress(ctx, namespace, duName, progress) + if len(test.needErrs) != 0 && !test.needErrs[0] { + // Get the updated DataUpload object from the fake client + updatedDu := &velerov2alpha1api.DataUpload{} + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, updatedDu)) + // Assert that the DataUpload object has been updated with the progress + assert.Equal(t, test.progress.TotalBytes, updatedDu.Status.Progress.TotalBytes) + assert.Equal(t, test.progress.BytesDone, updatedDu.Status.Progress.BytesDone) + } + }) + } +} + +func TestOnDataUploadFailed(t *testing.T) { + ctx := context.TODO() + r, err := initDataUploaderReconciler() + require.NoError(t, err) + + // Create a DataUpload object + du := dataUploadBuilder().Result() + namespace := du.Namespace + duName := du.Name + // Add the DataUpload object to the fake client + assert.NoError(t, r.client.Create(ctx, du)) + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} + r.OnDataUploadFailed(ctx, namespace, duName, fmt.Errorf("Failed to handle %v", duName)) + updatedDu := &velerov2alpha1api.DataUpload{} + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, updatedDu)) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseFailed, updatedDu.Status.Phase) + assert.Equal(t, updatedDu.Status.CompletionTimestamp.IsZero(), false) + assert.Equal(t, updatedDu.Status.StartTimestamp.IsZero(), false) +} + +func TestOnDataUploadCompleted(t *testing.T) { + ctx := context.TODO() + r, err := initDataUploaderReconciler() + require.NoError(t, err) + // Create a DataUpload object + du := dataUploadBuilder().Result() + namespace := du.Namespace + duName := du.Name + // Add the DataUpload object to the fake client + assert.NoError(t, r.client.Create(ctx, du)) + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} + r.OnDataUploadCompleted(ctx, namespace, duName, datapath.Result{}) + updatedDu := &velerov2alpha1api.DataUpload{} + assert.NoError(t, r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, updatedDu)) + assert.Equal(t, velerov2alpha1api.DataUploadPhaseCompleted, updatedDu.Status.Phase) + assert.Equal(t, updatedDu.Status.CompletionTimestamp.IsZero(), false) +} + +func TestFindDataUploadForPod(t *testing.T) { + r, err := initDataUploaderReconciler() + require.NoError(t, err) + tests := []struct { + name string + du *velerov2alpha1api.DataUpload + pod *corev1.Pod + checkFunc func(*velerov2alpha1api.DataUpload, []reconcile.Request) + }{ + { + name: "find dataUpload for pod", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(), + checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) { + // Assert that the function returns a single request + assert.Len(t, requests, 1) + // Assert that the request contains the correct namespaced name + assert.Equal(t, du.Namespace, requests[0].Namespace) + assert.Equal(t, du.Name, requests[0].Name) + }, + }, { + name: "no matched pod", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: "non-existing-dataupload"}).Result(), + checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) { + assert.Empty(t, requests) + }, + }, + { + name: "dataUpload not accepte", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(), + checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) { + assert.Empty(t, requests) + }, + }, + } + for _, test := range tests { + ctx := context.Background() + assert.NoError(t, r.client.Create(ctx, test.pod)) + assert.NoError(t, r.client.Create(ctx, test.du)) + // Call the findDataUploadForPod function + requests := r.findDataUploadForPod(test.pod) + test.checkFunc(test.du, requests) + r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{}) + if test.pod != nil { + r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{}) + } + } +} diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index d6482f2a9..9c984de53 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -44,6 +44,9 @@ import ( // CSISnapshotExposeParam define the input param for Expose of CSI snapshots type CSISnapshotExposeParam struct { + // SnapshotName is the original volume snapshot name + SnapshotName string + // SourceNamespace is the original namespace of the volume that the snapshot is taken for SourceNamespace string @@ -55,6 +58,9 @@ type CSISnapshotExposeParam struct { // HostingPodLabels is the labels that are going to apply to the hosting pod HostingPodLabels map[string]string + + // Timeout specifies the time wait for resources operations in Expose + Timeout time.Duration } // CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots @@ -79,7 +85,7 @@ type csiSnapshotExposer struct { log logrus.FieldLogger } -func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, snapshotName string, timeout time.Duration, param interface{}) error { +func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param interface{}) error { csiExposeParam := param.(*CSISnapshotExposeParam) curLog := e.log.WithFields(logrus.Fields{ @@ -88,7 +94,7 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje curLog.Info("Exposing CSI snapshot") - volumeSnapshot, err := csi.WaitVolumeSnapshotReady(ctx, e.csiSnapshotClient, snapshotName, csiExposeParam.SourceNamespace, timeout) + volumeSnapshot, err := csi.WaitVolumeSnapshotReady(ctx, e.csiSnapshotClient, csiExposeParam.SnapshotName, csiExposeParam.SourceNamespace, csiExposeParam.Timeout) if err != nil { return errors.Wrapf(err, "error wait volume snapshot ready") } @@ -115,14 +121,14 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje } }() - err = csi.EnsureDeleteVS(ctx, e.csiSnapshotClient, volumeSnapshot.Name, volumeSnapshot.Namespace, timeout) + err = csi.EnsureDeleteVS(ctx, e.csiSnapshotClient, volumeSnapshot.Name, volumeSnapshot.Namespace, csiExposeParam.Timeout) if err != nil { return errors.Wrap(err, "error to delete volume snapshot") } curLog.WithField("vs name", volumeSnapshot.Name).Infof("VS is deleted in namespace %s", volumeSnapshot.Namespace) - err = csi.EnsureDeleteVSC(ctx, e.csiSnapshotClient, vsc.Name, timeout) + err = csi.EnsureDeleteVSC(ctx, e.csiSnapshotClient, vsc.Name, csiExposeParam.Timeout) if err != nil { return errors.Wrap(err, "error to delete volume snapshot content") } diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go index 20a450441..e538e721e 100644 --- a/pkg/exposer/csi_snapshot_test.go +++ b/pkg/exposer/csi_snapshot_test.go @@ -87,24 +87,27 @@ func TestExpose(t *testing.T) { snapshotClientObj []runtime.Object kubeClientObj []runtime.Object ownerBackup *velerov1.Backup - snapshotName string exposeParam CSISnapshotExposeParam snapReactors []reactor kubeReactors []reactor err string }{ { - name: "wait vs ready fail", - snapshotName: "fake-vs", - ownerBackup: backup, - err: "error wait volume snapshot ready: error to get volumesnapshot /fake-vs: volumesnapshots.snapshot.storage.k8s.io \"fake-vs\" not found", + name: "wait vs ready fail", + ownerBackup: backup, + exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", + Timeout: time.Millisecond, + }, + err: "error wait volume snapshot ready: error to get volumesnapshot /fake-vs: volumesnapshots.snapshot.storage.k8s.io \"fake-vs\" not found", }, { - name: "get vsc fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "get vsc fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", + Timeout: time.Millisecond, }, snapshotClientObj: []runtime.Object{ vsObject, @@ -112,11 +115,12 @@ func TestExpose(t *testing.T) { err: "error to get volume snapshot content: error getting volume snapshot content from API: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found", }, { - name: "delete vs fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "delete vs fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", + Timeout: time.Millisecond, }, snapshotClientObj: []runtime.Object{ vsObject, @@ -134,11 +138,12 @@ func TestExpose(t *testing.T) { err: "error to delete volume snapshot: error to delete volume snapshot: fake-delete-error", }, { - name: "delete vsc fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "delete vsc fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", + Timeout: time.Millisecond, }, snapshotClientObj: []runtime.Object{ vsObject, @@ -156,11 +161,12 @@ func TestExpose(t *testing.T) { err: "error to delete volume snapshot content: error to delete volume snapshot content: fake-delete-error", }, { - name: "create backup vs fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "create backup vs fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", + Timeout: time.Millisecond, }, snapshotClientObj: []runtime.Object{ vsObject, @@ -178,11 +184,12 @@ func TestExpose(t *testing.T) { err: "error to create backup volume snapshot: fake-create-error", }, { - name: "create backup vsc fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "create backup vsc fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", + Timeout: time.Millisecond, }, snapshotClientObj: []runtime.Object{ vsObject, @@ -200,10 +207,10 @@ func TestExpose(t *testing.T) { err: "error to create backup volume snapshot content: fake-create-error", }, { - name: "create backup pvc fail, invalid access mode", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "create backup pvc fail, invalid access mode", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", AccessMode: "fake-mode", }, @@ -214,11 +221,12 @@ func TestExpose(t *testing.T) { err: "error to create backup pvc: unsupported access mode fake-mode", }, { - name: "create backup pvc fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "create backup pvc fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", + Timeout: time.Millisecond, AccessMode: AccessModeFileSystem, }, snapshotClientObj: []runtime.Object{ @@ -237,12 +245,13 @@ func TestExpose(t *testing.T) { err: "error to create backup pvc: error to create pvc: fake-create-error", }, { - name: "create backup pod fail", - ownerBackup: backup, - snapshotName: "fake-vs", + name: "create backup pod fail", + ownerBackup: backup, exposeParam: CSISnapshotExposeParam{ + SnapshotName: "fake-vs", SourceNamespace: "fake-ns", AccessMode: AccessModeFileSystem, + Timeout: time.Millisecond, }, snapshotClientObj: []runtime.Object{ vsObject, @@ -291,7 +300,7 @@ func TestExpose(t *testing.T) { } } - err := exposer.Expose(context.Background(), ownerObject, test.snapshotName, time.Millisecond, &test.exposeParam) + err := exposer.Expose(context.Background(), ownerObject, &test.exposeParam) assert.EqualError(t, err, test.err) }) } diff --git a/pkg/exposer/snapshot.go b/pkg/exposer/snapshot.go index 505cd0079..193684044 100644 --- a/pkg/exposer/snapshot.go +++ b/pkg/exposer/snapshot.go @@ -26,8 +26,7 @@ import ( // SnapshotExposer is the interfaces for a snapshot exposer type SnapshotExposer interface { // Expose starts the process to expose a snapshot, the expose process may take long time - Expose(context.Context, corev1.ObjectReference, string, time.Duration, interface{}) error - + Expose(context.Context, corev1.ObjectReference, interface{}) error // GetExposed polls the status of the expose. // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. // Otherwise, it returns nil as the expose result without an error. diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go index 91fe0d066..670b03828 100644 --- a/pkg/exposer/types.go +++ b/pkg/exposer/types.go @@ -35,3 +35,11 @@ type ExposeByPod struct { HostingPod *corev1.Pod PVC string } + +func GetUploaderType(dataMover string) string { + if dataMover == "" || dataMover == "velero" { + return "kopia" + } else { + return dataMover + } +}