diff --git a/changelogs/unreleased/9362-Lyndon-Li b/changelogs/unreleased/9362-Lyndon-Li new file mode 100644 index 000000000..791b3836d --- /dev/null +++ b/changelogs/unreleased/9362-Lyndon-Li @@ -0,0 +1 @@ +Support cache volume for generic restore exposer and pod volume exposer \ No newline at end of file diff --git a/pkg/exposer/cache_volume.go b/pkg/exposer/cache_volume.go new file mode 100644 index 000000000..85571c243 --- /dev/null +++ b/pkg/exposer/cache_volume.go @@ -0,0 +1,99 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + + corev1api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +type CacheConfigs struct { + Limit int64 + StorageClass string + ResidentThreshold int64 +} + +const ( + cacheVolumeName = "cachedir" + cacheVolumeDirSuffix = "-cache" +) + +func createCachePVC(ctx context.Context, pvcClient corev1client.CoreV1Interface, ownerObject corev1api.ObjectReference, sc string, size int64, selectedNode string) (*corev1api.PersistentVolumeClaim, error) { + cachePVCName := getCachePVCName(ownerObject) + + volumeMode := corev1api.PersistentVolumeFilesystem + + pvcObj := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ownerObject.Namespace, + Name: cachePVCName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + AccessModes: []corev1api.PersistentVolumeAccessMode{corev1api.ReadWriteOnce}, + StorageClassName: &sc, + VolumeMode: &volumeMode, + Resources: corev1api.VolumeResourceRequirements{ + Requests: corev1api.ResourceList{ + corev1api.ResourceStorage: *resource.NewQuantity(size, resource.BinarySI), + }, + }, + }, + } + + if selectedNode != "" { + pvcObj.Annotations = map[string]string{ + kube.KubeAnnSelectedNode: selectedNode, + } + } + + return pvcClient.PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{}) +} + +func getCachePVCName(ownerObject corev1api.ObjectReference) string { + return ownerObject.Name + cacheVolumeDirSuffix +} + +func getCacheVolumeSize(dataSize int64, info *CacheConfigs) int64 { + if info == nil { + return 0 + } + + if dataSize != 0 && dataSize < info.ResidentThreshold { + return 0 + } + + // 20% inflate and round up to GB + volumeSize := (info.Limit*12/10 + (1 << 30) - 1) / (1 << 30) * (1 << 30) + + return volumeSize +} diff --git a/pkg/exposer/cache_volume_test.go b/pkg/exposer/cache_volume_test.go new file mode 100644 index 000000000..03e3a797c --- /dev/null +++ b/pkg/exposer/cache_volume_test.go @@ -0,0 +1,80 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetCacheVolumeSize(t *testing.T) { + tests := []struct { + name string + dataSize int64 + info *CacheConfigs + expected int64 + }{ + { + name: "nil info", + dataSize: 1024, + expected: 0, + }, + { + name: "0 data size", + info: &CacheConfigs{Limit: 1 << 30, ResidentThreshold: 5120}, + expected: 2 << 30, + }, + { + name: "0 threshold", + dataSize: 2048, + info: &CacheConfigs{Limit: 1 << 30}, + expected: 2 << 30, + }, + { + name: "data size is smaller", + dataSize: 2048, + info: &CacheConfigs{Limit: 1 << 30, ResidentThreshold: 5120}, + expected: 0, + }, + { + name: "data size is lager", + dataSize: 2048, + info: &CacheConfigs{Limit: 1 << 30, ResidentThreshold: 1024}, + expected: 2 << 30, + }, + { + name: "limit smaller than 1G", + dataSize: 2048, + info: &CacheConfigs{Limit: 5120, ResidentThreshold: 1024}, + expected: 1 << 30, + }, + { + name: "larger than 1G after inflate", + dataSize: 2048, + info: &CacheConfigs{Limit: (1 << 30) - 1024, ResidentThreshold: 1024}, + expected: 2 << 30, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + size := getCacheVolumeSize(test.dataSize, test.info) + require.Equal(t, test.expected, size) + }) + } +} diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index 8691eedfc..c10370072 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -73,6 +73,12 @@ type GenericRestoreExposeParam struct { // PriorityClassName is the priority class name for the data mover pod PriorityClassName string + + // RestoreSize specifies the data size for the volume to be restored + RestoreSize int64 + + // CacheVolume specifies the info for cache volumes + CacheVolume *CacheConfigs } // GenericRestoreExposer is the interfaces for a generic restore exposer @@ -148,6 +154,28 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap affinity := kube.GetLoadAffinityByStorageClass(param.LoadAffinity, storageClassName, curLog) + var cachePVC *corev1api.PersistentVolumeClaim + if param.CacheVolume != nil { + cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume) + if cacheVolumeSize > 0 { + curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize) + + if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, selectedNode); err != nil { + return errors.Wrap(err, "error to create cache pvc") + } else { + cachePVC = pvc + } + + defer func() { + if err != nil { + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog) + } + }() + } else { + curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume) + } + } + restorePod, err := e.createRestorePod( ctx, ownerObject, @@ -161,6 +189,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap param.NodeOS, affinity, param.PriorityClassName, + cachePVC, ) if err != nil { return errors.Wrapf(err, "error to create restore pod") @@ -287,6 +316,15 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject diag += fmt.Sprintf("error getting restore pvc %s, err: %v\n", restorePVCName, err) } + cachePVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{}) + if err != nil { + cachePVC = nil + + if !apierrors.IsNotFound(err) { + diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err) + } + } + events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { diag += fmt.Sprintf("error listing events, err: %v\n", err) @@ -314,6 +352,18 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject } } + if cachePVC != nil { + diag += kube.DiagnosePVC(cachePVC, events) + + if cachePVC.Spec.VolumeName != "" { + if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil { + diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err) + } else { + diag += kube.DiagnosePV(pv) + } + } + } + diag += "end diagnose restore exposer" return diag @@ -322,9 +372,11 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name + cachePVCName := getCachePVCName(ownerObject) kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, 0, e.log) + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log) } func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1api.ObjectReference, targetPVCName string, targetNamespace string, timeout time.Duration) error { @@ -433,6 +485,7 @@ func (e *genericRestoreExposer) createRestorePod( nodeOS string, affinity *kube.LoadAffinity, priorityClassName string, + cachePVC *corev1api.PersistentVolumeClaim, ) (*corev1api.Pod, error) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name @@ -461,7 +514,6 @@ func (e *genericRestoreExposer) createRestorePod( var gracePeriod int64 volumeMounts, volumeDevices, volumePath := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode, false) - volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes := []corev1api.Volume{{ Name: volumeName, @@ -471,6 +523,25 @@ func (e *genericRestoreExposer) createRestorePod( }, }, }} + + cacheVolumePath := "" + if cachePVC != nil { + mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false) + volumeMounts = append(volumeMounts, mnt...) + + volumes = append(volumes, corev1api.Volume{ + Name: cacheVolumeName, + VolumeSource: corev1api.VolumeSource{ + PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVC.Name, + }, + }, + }) + + cacheVolumePath = path + } + + volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes = append(volumes, podInfo.volumes...) if label == nil { @@ -488,6 +559,7 @@ func (e *genericRestoreExposer) createRestorePod( fmt.Sprintf("--volume-mode=%s", volumeMode), fmt.Sprintf("--data-download=%s", ownerObject.Name), fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), + fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath), } args = append(args, podInfo.logFormatArgs...) diff --git a/pkg/exposer/generic_restore_priority_test.go b/pkg/exposer/generic_restore_priority_test.go index b67652b93..642e0cc43 100644 --- a/pkg/exposer/generic_restore_priority_test.go +++ b/pkg/exposer/generic_restore_priority_test.go @@ -148,6 +148,7 @@ func TestCreateRestorePodWithPriorityClass(t *testing.T) { kube.NodeOSLinux, nil, // affinity tc.expectedPriorityClass, + nil, ) require.NoError(t, err, tc.description) @@ -227,6 +228,7 @@ func TestCreateRestorePodWithMissingConfigMap(t *testing.T) { kube.NodeOSLinux, nil, // affinity "", // empty priority class since config map is missing + nil, ) // Should succeed even when config map is missing diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index 2e528d6a2..9f32ce1d1 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -26,6 +26,7 @@ import ( appsv1api "k8s.io/api/apps/v1" corev1api "k8s.io/api/core/v1" storagev1api "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" @@ -105,6 +106,10 @@ func TestRestoreExpose(t *testing.T) { targetPVCName string targetNamespace string kubeReactors []reactor + cacheVolume *CacheConfigs + expectBackupPod bool + expectBackupPVC bool + expectCachePVC bool err string }{ { @@ -167,6 +172,70 @@ func TestRestoreExpose(t *testing.T) { }, err: "error to create restore pvc: fake-create-error", }, + { + name: "succeed", + targetPVCName: "fake-target-pvc", + targetNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + daemonSet, + storageClass, + }, + expectBackupPod: true, + expectBackupPVC: true, + }, + { + name: "succeed, cache config, no cache volume", + targetPVCName: "fake-target-pvc", + targetNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + daemonSet, + storageClass, + }, + cacheVolume: &CacheConfigs{}, + expectBackupPod: true, + expectBackupPVC: true, + }, + { + name: "create cache volume fail", + targetPVCName: "fake-target-pvc", + targetNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + daemonSet, + storageClass, + }, + cacheVolume: &CacheConfigs{Limit: 1024}, + kubeReactors: []reactor{ + { + verb: "create", + resource: "persistentvolumeclaims", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create cache pvc: fake-create-error", + }, + { + name: "succeed with cache volume", + targetPVCName: "fake-target-pvc", + targetNamespace: "fake-ns", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + targetPVCObj, + daemonSet, + storageClass, + }, + cacheVolume: &CacheConfigs{Limit: 1024}, + expectBackupPod: true, + expectBackupPVC: true, + expectCachePVC: true, + }, } for _, test := range tests { @@ -203,9 +272,36 @@ func TestRestoreExpose(t *testing.T) { Resources: corev1api.ResourceRequirements{}, ExposeTimeout: time.Millisecond, LoadAffinity: nil, + CacheVolume: test.cacheVolume, }, ) - require.EqualError(t, err, test.err) + + if test.err != "" { + require.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + } + + _, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{}) + if test.expectBackupPod { + require.NoError(t, err) + } else { + require.True(t, apierrors.IsNotFound(err)) + } + + _, err = exposer.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{}) + if test.expectBackupPVC { + require.NoError(t, err) + } else { + require.True(t, apierrors.IsNotFound(err)) + } + + _, err = exposer.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(t.Context(), getCachePVCName(ownerObject), metav1.GetOptions{}) + if test.expectCachePVC { + require.NoError(t, err) + } else { + require.True(t, apierrors.IsNotFound(err)) + } }) } } @@ -651,6 +747,38 @@ func Test_ReastoreDiagnoseExpose(t *testing.T) { }, } + cachePVCWithVolumeName := corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-restore-cache", + UID: "fake-cache-pvc-uid", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: restore.APIVersion, + Kind: restore.Kind, + Name: restore.Name, + UID: restore.UID, + }, + }, + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + VolumeName: "fake-pv-cache", + }, + Status: corev1api.PersistentVolumeClaimStatus{ + Phase: corev1api.ClaimPending, + }, + } + + cachePV := corev1api.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-pv-cache", + }, + Status: corev1api.PersistentVolumeStatus{ + Phase: corev1api.VolumePending, + Message: "fake-pv-message", + }, + } + nodeAgentPod := corev1api.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: velerov1.DefaultNamespace, @@ -762,6 +890,44 @@ Pod velero/fake-restore, phase Pending, node name fake-node Pod condition Initialized, status True, reason , message fake-pod-message PVC velero/fake-restore, phase Pending, binding to fake-pv PV fake-pv, phase Pending, reason , message fake-pv-message +end diagnose restore exposer`, + }, + { + name: "cache pvc with volume name, no pv", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + &restorePodWithNodeName, + &restorePVCWithVolumeName, + &cachePVCWithVolumeName, + &nodeAgentPod, + }, + expected: `begin diagnose restore exposer +Pod velero/fake-restore, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +PVC velero/fake-restore, phase Pending, binding to fake-pv +error getting restore pv fake-pv, err: persistentvolumes "fake-pv" not found +PVC velero/fake-restore-cache, phase Pending, binding to fake-pv-cache +error getting cache pv fake-pv-cache, err: persistentvolumes "fake-pv-cache" not found +end diagnose restore exposer`, + }, + { + name: "cache pvc with volume name, pv exists", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + &restorePodWithNodeName, + &restorePVCWithVolumeName, + &cachePVCWithVolumeName, + &restorePV, + &cachePV, + &nodeAgentPod, + }, + expected: `begin diagnose restore exposer +Pod velero/fake-restore, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +PVC velero/fake-restore, phase Pending, binding to fake-pv +PV fake-pv, phase Pending, reason , message fake-pv-message +PVC velero/fake-restore-cache, phase Pending, binding to fake-pv-cache +PV fake-pv-cache, phase Pending, reason , message fake-pv-message end diagnose restore exposer`, }, { @@ -973,6 +1139,7 @@ func TestCreateRestorePod(t *testing.T) { test.nodeOS, test.affinity, "", // priority class name + nil, ) require.NoError(t, err) diff --git a/pkg/exposer/pod_volume.go b/pkg/exposer/pod_volume.go index 591600eb3..1f18056d0 100644 --- a/pkg/exposer/pod_volume.go +++ b/pkg/exposer/pod_volume.go @@ -76,6 +76,12 @@ type PodVolumeExposeParam struct { // Privileged indicates whether to create the pod with a privileged container Privileged bool + + // RestoreSize specifies the data size for the volume to be restored, for restore only + RestoreSize int64 + + // CacheVolume specifies the info for cache volumes, for restore only + CacheVolume *CacheConfigs } // PodVolumeExposer is the interfaces for a pod volume exposer @@ -156,7 +162,29 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.Obj curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume) - hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged) + var cachePVC *corev1api.PersistentVolumeClaim + if param.CacheVolume != nil { + cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume) + if cacheVolumeSize > 0 { + curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize) + + if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, pod.Spec.NodeName); err != nil { + return errors.Wrap(err, "error to create cache pvc") + } else { + cachePVC = pvc + } + + defer func() { + if err != nil { + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog) + } + }() + } else { + curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume) + } + } + + hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged, cachePVC) if err != nil { return errors.Wrapf(err, "error to create hosting pod") } @@ -251,6 +279,15 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err) } + cachePVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{}) + if err != nil { + cachePVC = nil + + if !apierrors.IsNotFound(err) { + diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err) + } + } + events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { diag += fmt.Sprintf("error listing events, err: %v\n", err) @@ -266,6 +303,18 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev } } + if cachePVC != nil { + diag += kube.DiagnosePVC(cachePVC, events) + + if cachePVC.Spec.VolumeName != "" { + if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil { + diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err) + } else { + diag += kube.DiagnosePV(pv) + } + } + } + diag += "end diagnose pod volume exposer" return diag @@ -273,11 +322,14 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { restorePodName := ownerObject.Name + cachePVCName := getCachePVCName(ownerObject) + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log) } func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string, - operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool) (*corev1api.Pod, error) { + operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool, cachePVC *corev1api.PersistentVolumeClaim) (*corev1api.Pod, error) { hostingPodName := ownerObject.Name containerName := string(ownerObject.UID) @@ -301,7 +353,6 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor MountPath: clientVolumePath, MountPropagation: &mountPropagation, }} - volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes := []corev1api.Volume{{ Name: clientVolumeName, @@ -311,6 +362,25 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor }, }, }} + + cacheVolumePath := "" + if cachePVC != nil { + mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false) + volumeMounts = append(volumeMounts, mnt...) + + volumes = append(volumes, corev1api.Volume{ + Name: cacheVolumeName, + VolumeSource: corev1api.VolumeSource{ + PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVC.Name, + }, + }, + }) + + cacheVolumePath = path + } + + volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes = append(volumes, podInfo.volumes...) args := []string{ @@ -328,6 +398,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor command = append(command, "backup") } else { args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) + args = append(args, fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath)) command = append(command, "restore") } diff --git a/pkg/exposer/pod_volume_test.go b/pkg/exposer/pod_volume_test.go index f48e9376b..ebf76efe9 100644 --- a/pkg/exposer/pod_volume_test.go +++ b/pkg/exposer/pod_volume_test.go @@ -11,10 +11,12 @@ import ( "github.com/stretchr/testify/require" appsv1api "k8s.io/api/apps/v1" corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + clientTesting "k8s.io/client-go/testing" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -72,6 +74,9 @@ func TestPodVolumeExpose(t *testing.T) { exposeParam PodVolumeExposeParam funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error) + kubeReactors []reactor + expectBackupPod bool + expectCachePVC bool err string }{ { @@ -189,6 +194,7 @@ func TestPodVolumeExpose(t *testing.T) { funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil }, + expectBackupPod: true, }, { name: "succeed with privileged pod", @@ -212,6 +218,89 @@ func TestPodVolumeExpose(t *testing.T) { funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil }, + expectBackupPod: true, + }, + { + name: "succeed, cache config, no cache volume", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + CacheVolume: &CacheConfigs{}, + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + daemonSet, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + expectBackupPod: true, + }, + { + name: "create cache volume fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + CacheVolume: &CacheConfigs{Limit: 1024}, + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + daemonSet, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + kubeReactors: []reactor{ + { + verb: "create", + resource: "persistentvolumeclaims", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-create-error") + }, + }, + }, + err: "error to create cache pvc: fake-create-error", + }, + { + name: "succeed with cache volume", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + CacheVolume: &CacheConfigs{Limit: 1024}, + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + daemonSet, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + expectBackupPod: true, + expectCachePVC: true, }, } @@ -219,6 +308,10 @@ func TestPodVolumeExpose(t *testing.T) { t.Run(test.name, func(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + exposer := podVolumeExposer{ kubeClient: fakeKubeClient, log: velerotest.NewLogger(), @@ -248,9 +341,23 @@ func TestPodVolumeExpose(t *testing.T) { require.NoError(t, err) _, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{}) - assert.NoError(t, err) + require.NoError(t, err) } else { - assert.EqualError(t, err, test.err) + require.EqualError(t, err, test.err) + } + + _, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{}) + if test.expectBackupPod { + require.NoError(t, err) + } else { + require.True(t, apierrors.IsNotFound(err)) + } + + _, err = exposer.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(t.Context(), getCachePVCName(ownerObject), metav1.GetOptions{}) + if test.expectCachePVC { + require.NoError(t, err) + } else { + require.True(t, apierrors.IsNotFound(err)) } }) } @@ -517,6 +624,38 @@ func TestPodVolumeDiagnoseExpose(t *testing.T) { }, } + cachePVCWithVolumeName := corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup-cache", + UID: "fake-cache-pvc-uid", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backup.APIVersion, + Kind: backup.Kind, + Name: backup.Name, + UID: backup.UID, + }, + }, + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + VolumeName: "fake-pv-cache", + }, + Status: corev1api.PersistentVolumeClaimStatus{ + Phase: corev1api.ClaimPending, + }, + } + + cachePV := corev1api.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-pv-cache", + }, + Status: corev1api.PersistentVolumeStatus{ + Phase: corev1api.VolumePending, + Message: "fake-pv-message", + }, + } + nodeAgentPod := corev1api.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: velerov1.DefaultNamespace, @@ -589,6 +728,37 @@ end diagnose pod volume exposer`, expected: `begin diagnose pod volume exposer Pod velero/fake-backup, phase Pending, node name fake-node Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + { + name: "cache pvc with volume name, no pv", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithNodeName, + &cachePVCWithVolumeName, + &nodeAgentPod, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +PVC velero/fake-backup-cache, phase Pending, binding to fake-pv-cache +error getting cache pv fake-pv-cache, err: persistentvolumes "fake-pv-cache" not found +end diagnose pod volume exposer`, + }, + { + name: "cache pvc with volume name, pv exists", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithNodeName, + &cachePVCWithVolumeName, + &cachePV, + &nodeAgentPod, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +PVC velero/fake-backup-cache, phase Pending, binding to fake-pv-cache +PV fake-pv-cache, phase Pending, reason , message fake-pv-message end diagnose pod volume exposer`, }, {