expose supports cache volume

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2025-10-22 19:11:56 +08:00
parent 53e99556ad
commit 8d29051bbe
5 changed files with 263 additions and 4 deletions

View File

@@ -0,0 +1,99 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
type CacheConfigs struct {
Limit int64
StorageClass string
ResidentThreshold int64
}
const (
cacheVolumeName = "cachedir"
cacheVolumeDirSuffix = "-cache"
)
func createCachePVC(ctx context.Context, pvcClient corev1client.CoreV1Interface, ownerObject corev1api.ObjectReference, sc string, size int64, selectedNode string) (*corev1api.PersistentVolumeClaim, error) {
cachePVCName := getCachePVCName(ownerObject)
volumeMode := corev1api.PersistentVolumeFilesystem
pvcObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ownerObject.Namespace,
Name: cachePVCName,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
},
Spec: corev1api.PersistentVolumeClaimSpec{
AccessModes: []corev1api.PersistentVolumeAccessMode{corev1api.ReadWriteOnce},
StorageClassName: &sc,
VolumeMode: &volumeMode,
Resources: corev1api.VolumeResourceRequirements{
Requests: corev1api.ResourceList{
corev1api.ResourceStorage: *resource.NewQuantity(size, resource.BinarySI),
},
},
},
}
if selectedNode != "" {
pvcObj.Annotations = map[string]string{
kube.KubeAnnSelectedNode: selectedNode,
}
}
return pvcClient.PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{})
}
func getCachePVCName(ownerObject corev1api.ObjectReference) string {
return ownerObject.Name + cacheVolumeDirSuffix
}
func getCacheVolumeSize(dataSize int64, info *CacheConfigs) int64 {
if info == nil {
return 0
}
if dataSize != 0 && dataSize < info.ResidentThreshold {
return 0
}
// 20% inflate and round up to GB
volumeSize := (info.Limit*12/10 + (1 << 30) - 1) / (1 << 30) * (1 << 30)
return volumeSize
}

View File

@@ -73,6 +73,12 @@ type GenericRestoreExposeParam struct {
// PriorityClassName is the priority class name for the data mover pod
PriorityClassName string
// RestoreSize specifies the data size for the volume to be restored
RestoreSize int64
// CacheVolume specifies the info for cache volumes
CacheVolume *CacheConfigs
}
// GenericRestoreExposer is the interfaces for a generic restore exposer
@@ -148,6 +154,28 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap
affinity := kube.GetLoadAffinityByStorageClass(param.LoadAffinity, storageClassName, curLog)
var cachePVC *corev1api.PersistentVolumeClaim
if param.CacheVolume != nil {
cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume)
if cacheVolumeSize > 0 {
curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize)
if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, selectedNode); err != nil {
return errors.Wrap(err, "error to create cache pvc")
} else {
cachePVC = pvc
}
defer func() {
if err != nil {
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog)
}
}()
} else {
curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume)
}
}
restorePod, err := e.createRestorePod(
ctx,
ownerObject,
@@ -161,6 +189,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap
param.NodeOS,
affinity,
param.PriorityClassName,
cachePVC,
)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
@@ -287,6 +316,22 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject
diag += fmt.Sprintf("error getting restore pvc %s, err: %v\n", restorePVCName, err)
}
var cachePVC *corev1api.PersistentVolumeClaim
if pod.Spec.Volumes != nil {
for _, v := range pod.Spec.Volumes {
if v.Name == cacheVolumeName {
cachePVC, err = e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{})
if err != nil {
cachePVC = nil
diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err)
}
break
}
}
}
events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
diag += fmt.Sprintf("error listing events, err: %v\n", err)
@@ -314,6 +359,18 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject
}
}
if cachePVC != nil {
diag += kube.DiagnosePVC(cachePVC, events)
if cachePVC.Spec.VolumeName != "" {
if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil {
diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err)
} else {
diag += kube.DiagnosePV(pv)
}
}
}
diag += "end diagnose restore exposer"
return diag
@@ -322,9 +379,11 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject
func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
cachePVCName := getCachePVCName(ownerObject)
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, 0, e.log)
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log)
}
func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1api.ObjectReference, targetPVCName string, targetNamespace string, timeout time.Duration) error {
@@ -433,6 +492,7 @@ func (e *genericRestoreExposer) createRestorePod(
nodeOS string,
affinity *kube.LoadAffinity,
priorityClassName string,
cachePVC *corev1api.PersistentVolumeClaim,
) (*corev1api.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
@@ -461,7 +521,6 @@ func (e *genericRestoreExposer) createRestorePod(
var gracePeriod int64
volumeMounts, volumeDevices, volumePath := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode, false)
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1api.Volume{{
Name: volumeName,
@@ -471,6 +530,25 @@ func (e *genericRestoreExposer) createRestorePod(
},
},
}}
cacheVolumePath := ""
if cachePVC != nil {
mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false)
volumeMounts = append(volumeMounts, mnt...)
volumes = append(volumes, corev1api.Volume{
Name: cacheVolumeName,
VolumeSource: corev1api.VolumeSource{
PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{
ClaimName: cachePVC.Name,
},
},
})
cacheVolumePath = path
}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes = append(volumes, podInfo.volumes...)
if label == nil {
@@ -488,6 +566,7 @@ func (e *genericRestoreExposer) createRestorePod(
fmt.Sprintf("--volume-mode=%s", volumeMode),
fmt.Sprintf("--data-download=%s", ownerObject.Name),
fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()),
fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath),
}
args = append(args, podInfo.logFormatArgs...)

View File

@@ -148,6 +148,7 @@ func TestCreateRestorePodWithPriorityClass(t *testing.T) {
kube.NodeOSLinux,
nil, // affinity
tc.expectedPriorityClass,
nil,
)
require.NoError(t, err, tc.description)
@@ -227,6 +228,7 @@ func TestCreateRestorePodWithMissingConfigMap(t *testing.T) {
kube.NodeOSLinux,
nil, // affinity
"", // empty priority class since config map is missing
nil,
)
// Should succeed even when config map is missing

View File

@@ -973,6 +973,7 @@ func TestCreateRestorePod(t *testing.T) {
test.nodeOS,
test.affinity,
"", // priority class name
nil,
)
require.NoError(t, err)

View File

@@ -76,6 +76,12 @@ type PodVolumeExposeParam struct {
// Privileged indicates whether to create the pod with a privileged container
Privileged bool
// RestoreSize specifies the data size for the volume to be restored, for restore only
RestoreSize int64
// CacheVolume specifies the info for cache volumes, for restore only
CacheVolume *CacheConfigs
}
// PodVolumeExposer is the interfaces for a pod volume exposer
@@ -156,7 +162,29 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.Obj
curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume)
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged)
var cachePVC *corev1api.PersistentVolumeClaim
if param.CacheVolume != nil {
cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume)
if cacheVolumeSize > 0 {
curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize)
if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, pod.Spec.NodeName); err != nil {
return errors.Wrap(err, "error to create cache pvc")
} else {
cachePVC = pvc
}
defer func() {
if err != nil {
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog)
}
}()
} else {
curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume)
}
}
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged, cachePVC)
if err != nil {
return errors.Wrapf(err, "error to create hosting pod")
}
@@ -251,6 +279,22 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err)
}
var cachePVC *corev1api.PersistentVolumeClaim
if pod.Spec.Volumes != nil {
for _, v := range pod.Spec.Volumes {
if v.Name == cacheVolumeName {
cachePVC, err = e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{})
if err != nil {
cachePVC = nil
diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err)
}
break
}
}
}
events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
diag += fmt.Sprintf("error listing events, err: %v\n", err)
@@ -266,6 +310,18 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
}
}
if cachePVC != nil {
diag += kube.DiagnosePVC(cachePVC, events)
if cachePVC.Spec.VolumeName != "" {
if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil {
diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err)
} else {
diag += kube.DiagnosePV(pv)
}
}
}
diag += "end diagnose pod volume exposer"
return diag
@@ -273,11 +329,14 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name
cachePVCName := getCachePVCName(ownerObject)
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log)
}
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string,
operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool) (*corev1api.Pod, error) {
operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool, cachePVC *corev1api.PersistentVolumeClaim) (*corev1api.Pod, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
@@ -301,7 +360,6 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
MountPath: clientVolumePath,
MountPropagation: &mountPropagation,
}}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1api.Volume{{
Name: clientVolumeName,
@@ -311,6 +369,25 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
},
},
}}
cacheVolumePath := ""
if cachePVC != nil {
mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false)
volumeMounts = append(volumeMounts, mnt...)
volumes = append(volumes, corev1api.Volume{
Name: cacheVolumeName,
VolumeSource: corev1api.VolumeSource{
PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{
ClaimName: cachePVC.Name,
},
},
})
cacheVolumePath = path
}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes = append(volumes, podInfo.volumes...)
args := []string{
@@ -328,6 +405,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
command = append(command, "backup")
} else {
args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name))
args = append(args, fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath))
command = append(command, "restore")
}