diff --git a/pkg/exposer/cache_volume.go b/pkg/exposer/cache_volume.go new file mode 100644 index 000000000..85571c243 --- /dev/null +++ b/pkg/exposer/cache_volume.go @@ -0,0 +1,99 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + + corev1api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +type CacheConfigs struct { + Limit int64 + StorageClass string + ResidentThreshold int64 +} + +const ( + cacheVolumeName = "cachedir" + cacheVolumeDirSuffix = "-cache" +) + +func createCachePVC(ctx context.Context, pvcClient corev1client.CoreV1Interface, ownerObject corev1api.ObjectReference, sc string, size int64, selectedNode string) (*corev1api.PersistentVolumeClaim, error) { + cachePVCName := getCachePVCName(ownerObject) + + volumeMode := corev1api.PersistentVolumeFilesystem + + pvcObj := &corev1api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ownerObject.Namespace, + Name: cachePVCName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + }, + Spec: corev1api.PersistentVolumeClaimSpec{ + AccessModes: []corev1api.PersistentVolumeAccessMode{corev1api.ReadWriteOnce}, + StorageClassName: &sc, + VolumeMode: &volumeMode, + Resources: corev1api.VolumeResourceRequirements{ + Requests: corev1api.ResourceList{ + corev1api.ResourceStorage: *resource.NewQuantity(size, resource.BinarySI), + }, + }, + }, + } + + if selectedNode != "" { + pvcObj.Annotations = map[string]string{ + kube.KubeAnnSelectedNode: selectedNode, + } + } + + return pvcClient.PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{}) +} + +func getCachePVCName(ownerObject corev1api.ObjectReference) string { + return ownerObject.Name + cacheVolumeDirSuffix +} + +func getCacheVolumeSize(dataSize int64, info *CacheConfigs) int64 { + if info == nil { + return 0 + } + + if dataSize != 0 && dataSize < info.ResidentThreshold { + return 0 + } + + // 20% inflate and round up to GB + volumeSize := (info.Limit*12/10 + (1 << 30) - 1) / (1 << 30) * (1 << 30) + + return volumeSize +} diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index 8691eedfc..a2d4ab020 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -73,6 +73,12 @@ type GenericRestoreExposeParam struct { // PriorityClassName is the priority class name for the data mover pod PriorityClassName string + + // RestoreSize specifies the data size for the volume to be restored + RestoreSize int64 + + // CacheVolume specifies the info for cache volumes + CacheVolume *CacheConfigs } // GenericRestoreExposer is the interfaces for a generic restore exposer @@ -148,6 +154,28 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap affinity := kube.GetLoadAffinityByStorageClass(param.LoadAffinity, storageClassName, curLog) + var cachePVC *corev1api.PersistentVolumeClaim + if param.CacheVolume != nil { + cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume) + if cacheVolumeSize > 0 { + curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize) + + if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, selectedNode); err != nil { + return errors.Wrap(err, "error to create cache pvc") + } else { + cachePVC = pvc + } + + defer func() { + if err != nil { + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog) + } + }() + } else { + curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume) + } + } + restorePod, err := e.createRestorePod( ctx, ownerObject, @@ -161,6 +189,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap param.NodeOS, affinity, param.PriorityClassName, + cachePVC, ) if err != nil { return errors.Wrapf(err, "error to create restore pod") @@ -287,6 +316,22 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject diag += fmt.Sprintf("error getting restore pvc %s, err: %v\n", restorePVCName, err) } + var cachePVC *corev1api.PersistentVolumeClaim + if pod.Spec.Volumes != nil { + for _, v := range pod.Spec.Volumes { + if v.Name == cacheVolumeName { + cachePVC, err = e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{}) + if err != nil { + cachePVC = nil + diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err) + } + + break + } + + } + } + events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { diag += fmt.Sprintf("error listing events, err: %v\n", err) @@ -314,6 +359,18 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject } } + if cachePVC != nil { + diag += kube.DiagnosePVC(cachePVC, events) + + if cachePVC.Spec.VolumeName != "" { + if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil { + diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err) + } else { + diag += kube.DiagnosePV(pv) + } + } + } + diag += "end diagnose restore exposer" return diag @@ -322,9 +379,11 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name + cachePVCName := getCachePVCName(ownerObject) kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, 0, e.log) + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log) } func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1api.ObjectReference, targetPVCName string, targetNamespace string, timeout time.Duration) error { @@ -433,6 +492,7 @@ func (e *genericRestoreExposer) createRestorePod( nodeOS string, affinity *kube.LoadAffinity, priorityClassName string, + cachePVC *corev1api.PersistentVolumeClaim, ) (*corev1api.Pod, error) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name @@ -461,7 +521,6 @@ func (e *genericRestoreExposer) createRestorePod( var gracePeriod int64 volumeMounts, volumeDevices, volumePath := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode, false) - volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes := []corev1api.Volume{{ Name: volumeName, @@ -471,6 +530,25 @@ func (e *genericRestoreExposer) createRestorePod( }, }, }} + + cacheVolumePath := "" + if cachePVC != nil { + mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false) + volumeMounts = append(volumeMounts, mnt...) + + volumes = append(volumes, corev1api.Volume{ + Name: cacheVolumeName, + VolumeSource: corev1api.VolumeSource{ + PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVC.Name, + }, + }, + }) + + cacheVolumePath = path + } + + volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes = append(volumes, podInfo.volumes...) if label == nil { @@ -488,6 +566,7 @@ func (e *genericRestoreExposer) createRestorePod( fmt.Sprintf("--volume-mode=%s", volumeMode), fmt.Sprintf("--data-download=%s", ownerObject.Name), fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), + fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath), } args = append(args, podInfo.logFormatArgs...) diff --git a/pkg/exposer/generic_restore_priority_test.go b/pkg/exposer/generic_restore_priority_test.go index b67652b93..642e0cc43 100644 --- a/pkg/exposer/generic_restore_priority_test.go +++ b/pkg/exposer/generic_restore_priority_test.go @@ -148,6 +148,7 @@ func TestCreateRestorePodWithPriorityClass(t *testing.T) { kube.NodeOSLinux, nil, // affinity tc.expectedPriorityClass, + nil, ) require.NoError(t, err, tc.description) @@ -227,6 +228,7 @@ func TestCreateRestorePodWithMissingConfigMap(t *testing.T) { kube.NodeOSLinux, nil, // affinity "", // empty priority class since config map is missing + nil, ) // Should succeed even when config map is missing diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index 2e528d6a2..271e4e7d6 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -973,6 +973,7 @@ func TestCreateRestorePod(t *testing.T) { test.nodeOS, test.affinity, "", // priority class name + nil, ) require.NoError(t, err) diff --git a/pkg/exposer/pod_volume.go b/pkg/exposer/pod_volume.go index 591600eb3..1702fb0f5 100644 --- a/pkg/exposer/pod_volume.go +++ b/pkg/exposer/pod_volume.go @@ -76,6 +76,12 @@ type PodVolumeExposeParam struct { // Privileged indicates whether to create the pod with a privileged container Privileged bool + + // RestoreSize specifies the data size for the volume to be restored, for restore only + RestoreSize int64 + + // CacheVolume specifies the info for cache volumes, for restore only + CacheVolume *CacheConfigs } // PodVolumeExposer is the interfaces for a pod volume exposer @@ -156,7 +162,29 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.Obj curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume) - hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged) + var cachePVC *corev1api.PersistentVolumeClaim + if param.CacheVolume != nil { + cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume) + if cacheVolumeSize > 0 { + curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize) + + if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, pod.Spec.NodeName); err != nil { + return errors.Wrap(err, "error to create cache pvc") + } else { + cachePVC = pvc + } + + defer func() { + if err != nil { + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog) + } + }() + } else { + curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume) + } + } + + hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged, cachePVC) if err != nil { return errors.Wrapf(err, "error to create hosting pod") } @@ -251,6 +279,22 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err) } + var cachePVC *corev1api.PersistentVolumeClaim + if pod.Spec.Volumes != nil { + for _, v := range pod.Spec.Volumes { + if v.Name == cacheVolumeName { + cachePVC, err = e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{}) + if err != nil { + cachePVC = nil + diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err) + } + + break + } + + } + } + events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { diag += fmt.Sprintf("error listing events, err: %v\n", err) @@ -266,6 +310,18 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev } } + if cachePVC != nil { + diag += kube.DiagnosePVC(cachePVC, events) + + if cachePVC.Spec.VolumeName != "" { + if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil { + diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err) + } else { + diag += kube.DiagnosePV(pv) + } + } + } + diag += "end diagnose pod volume exposer" return diag @@ -273,11 +329,14 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { restorePodName := ownerObject.Name + cachePVCName := getCachePVCName(ownerObject) + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) + kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log) } func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string, - operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool) (*corev1api.Pod, error) { + operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool, cachePVC *corev1api.PersistentVolumeClaim) (*corev1api.Pod, error) { hostingPodName := ownerObject.Name containerName := string(ownerObject.UID) @@ -301,7 +360,6 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor MountPath: clientVolumePath, MountPropagation: &mountPropagation, }} - volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes := []corev1api.Volume{{ Name: clientVolumeName, @@ -311,6 +369,25 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor }, }, }} + + cacheVolumePath := "" + if cachePVC != nil { + mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false) + volumeMounts = append(volumeMounts, mnt...) + + volumes = append(volumes, corev1api.Volume{ + Name: cacheVolumeName, + VolumeSource: corev1api.VolumeSource{ + PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVC.Name, + }, + }, + }) + + cacheVolumePath = path + } + + volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes = append(volumes, podInfo.volumes...) args := []string{ @@ -328,6 +405,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor command = append(command, "backup") } else { args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) + args = append(args, fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath)) command = append(command, "restore") }