Merge pull request #9362 from Lyndon-Li/cache-volume-for-exposer
Some checks failed
Run the E2E test on kind / get-go-version (push) Failing after 58s
Run the E2E test on kind / build (push) Has been skipped
Run the E2E test on kind / setup-test-matrix (push) Successful in 3s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / get-go-version (push) Failing after 13s
Main CI / Build (push) Has been skipped
Close stale issues and PRs / stale (push) Successful in 12s
Trivy Nightly Scan / Trivy nightly scan (velero, main) (push) Failing after 1m7s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-aws, main) (push) Failing after 50s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-gcp, main) (push) Failing after 1m4s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-microsoft-azure, main) (push) Failing after 44s

Exposer supports cache volume
This commit is contained in:
lyndon-li
2025-10-31 17:06:40 +08:00
committed by GitHub
8 changed files with 669 additions and 7 deletions

View File

@@ -0,0 +1 @@
Support cache volume for generic restore exposer and pod volume exposer

View File

@@ -0,0 +1,99 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
type CacheConfigs struct {
Limit int64
StorageClass string
ResidentThreshold int64
}
const (
cacheVolumeName = "cachedir"
cacheVolumeDirSuffix = "-cache"
)
func createCachePVC(ctx context.Context, pvcClient corev1client.CoreV1Interface, ownerObject corev1api.ObjectReference, sc string, size int64, selectedNode string) (*corev1api.PersistentVolumeClaim, error) {
cachePVCName := getCachePVCName(ownerObject)
volumeMode := corev1api.PersistentVolumeFilesystem
pvcObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ownerObject.Namespace,
Name: cachePVCName,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
},
Spec: corev1api.PersistentVolumeClaimSpec{
AccessModes: []corev1api.PersistentVolumeAccessMode{corev1api.ReadWriteOnce},
StorageClassName: &sc,
VolumeMode: &volumeMode,
Resources: corev1api.VolumeResourceRequirements{
Requests: corev1api.ResourceList{
corev1api.ResourceStorage: *resource.NewQuantity(size, resource.BinarySI),
},
},
},
}
if selectedNode != "" {
pvcObj.Annotations = map[string]string{
kube.KubeAnnSelectedNode: selectedNode,
}
}
return pvcClient.PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{})
}
func getCachePVCName(ownerObject corev1api.ObjectReference) string {
return ownerObject.Name + cacheVolumeDirSuffix
}
func getCacheVolumeSize(dataSize int64, info *CacheConfigs) int64 {
if info == nil {
return 0
}
if dataSize != 0 && dataSize < info.ResidentThreshold {
return 0
}
// 20% inflate and round up to GB
volumeSize := (info.Limit*12/10 + (1 << 30) - 1) / (1 << 30) * (1 << 30)
return volumeSize
}

View File

@@ -0,0 +1,80 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestGetCacheVolumeSize(t *testing.T) {
tests := []struct {
name string
dataSize int64
info *CacheConfigs
expected int64
}{
{
name: "nil info",
dataSize: 1024,
expected: 0,
},
{
name: "0 data size",
info: &CacheConfigs{Limit: 1 << 30, ResidentThreshold: 5120},
expected: 2 << 30,
},
{
name: "0 threshold",
dataSize: 2048,
info: &CacheConfigs{Limit: 1 << 30},
expected: 2 << 30,
},
{
name: "data size is smaller",
dataSize: 2048,
info: &CacheConfigs{Limit: 1 << 30, ResidentThreshold: 5120},
expected: 0,
},
{
name: "data size is lager",
dataSize: 2048,
info: &CacheConfigs{Limit: 1 << 30, ResidentThreshold: 1024},
expected: 2 << 30,
},
{
name: "limit smaller than 1G",
dataSize: 2048,
info: &CacheConfigs{Limit: 5120, ResidentThreshold: 1024},
expected: 1 << 30,
},
{
name: "larger than 1G after inflate",
dataSize: 2048,
info: &CacheConfigs{Limit: (1 << 30) - 1024, ResidentThreshold: 1024},
expected: 2 << 30,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
size := getCacheVolumeSize(test.dataSize, test.info)
require.Equal(t, test.expected, size)
})
}
}

View File

@@ -73,6 +73,12 @@ type GenericRestoreExposeParam struct {
// PriorityClassName is the priority class name for the data mover pod // PriorityClassName is the priority class name for the data mover pod
PriorityClassName string PriorityClassName string
// RestoreSize specifies the data size for the volume to be restored
RestoreSize int64
// CacheVolume specifies the info for cache volumes
CacheVolume *CacheConfigs
} }
// GenericRestoreExposer is the interfaces for a generic restore exposer // GenericRestoreExposer is the interfaces for a generic restore exposer
@@ -148,6 +154,28 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap
affinity := kube.GetLoadAffinityByStorageClass(param.LoadAffinity, storageClassName, curLog) affinity := kube.GetLoadAffinityByStorageClass(param.LoadAffinity, storageClassName, curLog)
var cachePVC *corev1api.PersistentVolumeClaim
if param.CacheVolume != nil {
cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume)
if cacheVolumeSize > 0 {
curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize)
if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, selectedNode); err != nil {
return errors.Wrap(err, "error to create cache pvc")
} else {
cachePVC = pvc
}
defer func() {
if err != nil {
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog)
}
}()
} else {
curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume)
}
}
restorePod, err := e.createRestorePod( restorePod, err := e.createRestorePod(
ctx, ctx,
ownerObject, ownerObject,
@@ -161,6 +189,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1ap
param.NodeOS, param.NodeOS,
affinity, affinity,
param.PriorityClassName, param.PriorityClassName,
cachePVC,
) )
if err != nil { if err != nil {
return errors.Wrapf(err, "error to create restore pod") return errors.Wrapf(err, "error to create restore pod")
@@ -287,6 +316,15 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject
diag += fmt.Sprintf("error getting restore pvc %s, err: %v\n", restorePVCName, err) diag += fmt.Sprintf("error getting restore pvc %s, err: %v\n", restorePVCName, err)
} }
cachePVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{})
if err != nil {
cachePVC = nil
if !apierrors.IsNotFound(err) {
diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err)
}
}
events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{}) events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
diag += fmt.Sprintf("error listing events, err: %v\n", err) diag += fmt.Sprintf("error listing events, err: %v\n", err)
@@ -314,6 +352,18 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject
} }
} }
if cachePVC != nil {
diag += kube.DiagnosePVC(cachePVC, events)
if cachePVC.Spec.VolumeName != "" {
if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil {
diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err)
} else {
diag += kube.DiagnosePV(pv)
}
}
}
diag += "end diagnose restore exposer" diag += "end diagnose restore exposer"
return diag return diag
@@ -322,9 +372,11 @@ func (e *genericRestoreExposer) DiagnoseExpose(ctx context.Context, ownerObject
func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name restorePVCName := ownerObject.Name
cachePVCName := getCachePVCName(ownerObject)
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, 0, e.log) kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, 0, e.log)
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log)
} }
func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1api.ObjectReference, targetPVCName string, targetNamespace string, timeout time.Duration) error { func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1api.ObjectReference, targetPVCName string, targetNamespace string, timeout time.Duration) error {
@@ -433,6 +485,7 @@ func (e *genericRestoreExposer) createRestorePod(
nodeOS string, nodeOS string,
affinity *kube.LoadAffinity, affinity *kube.LoadAffinity,
priorityClassName string, priorityClassName string,
cachePVC *corev1api.PersistentVolumeClaim,
) (*corev1api.Pod, error) { ) (*corev1api.Pod, error) {
restorePodName := ownerObject.Name restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name restorePVCName := ownerObject.Name
@@ -461,7 +514,6 @@ func (e *genericRestoreExposer) createRestorePod(
var gracePeriod int64 var gracePeriod int64
volumeMounts, volumeDevices, volumePath := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode, false) volumeMounts, volumeDevices, volumePath := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode, false)
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1api.Volume{{ volumes := []corev1api.Volume{{
Name: volumeName, Name: volumeName,
@@ -471,6 +523,25 @@ func (e *genericRestoreExposer) createRestorePod(
}, },
}, },
}} }}
cacheVolumePath := ""
if cachePVC != nil {
mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false)
volumeMounts = append(volumeMounts, mnt...)
volumes = append(volumes, corev1api.Volume{
Name: cacheVolumeName,
VolumeSource: corev1api.VolumeSource{
PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{
ClaimName: cachePVC.Name,
},
},
})
cacheVolumePath = path
}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes = append(volumes, podInfo.volumes...) volumes = append(volumes, podInfo.volumes...)
if label == nil { if label == nil {
@@ -488,6 +559,7 @@ func (e *genericRestoreExposer) createRestorePod(
fmt.Sprintf("--volume-mode=%s", volumeMode), fmt.Sprintf("--volume-mode=%s", volumeMode),
fmt.Sprintf("--data-download=%s", ownerObject.Name), fmt.Sprintf("--data-download=%s", ownerObject.Name),
fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()),
fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath),
} }
args = append(args, podInfo.logFormatArgs...) args = append(args, podInfo.logFormatArgs...)

View File

@@ -148,6 +148,7 @@ func TestCreateRestorePodWithPriorityClass(t *testing.T) {
kube.NodeOSLinux, kube.NodeOSLinux,
nil, // affinity nil, // affinity
tc.expectedPriorityClass, tc.expectedPriorityClass,
nil,
) )
require.NoError(t, err, tc.description) require.NoError(t, err, tc.description)
@@ -227,6 +228,7 @@ func TestCreateRestorePodWithMissingConfigMap(t *testing.T) {
kube.NodeOSLinux, kube.NodeOSLinux,
nil, // affinity nil, // affinity
"", // empty priority class since config map is missing "", // empty priority class since config map is missing
nil,
) )
// Should succeed even when config map is missing // Should succeed even when config map is missing

View File

@@ -26,6 +26,7 @@ import (
appsv1api "k8s.io/api/apps/v1" appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
storagev1api "k8s.io/api/storage/v1" storagev1api "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
@@ -105,6 +106,10 @@ func TestRestoreExpose(t *testing.T) {
targetPVCName string targetPVCName string
targetNamespace string targetNamespace string
kubeReactors []reactor kubeReactors []reactor
cacheVolume *CacheConfigs
expectBackupPod bool
expectBackupPVC bool
expectCachePVC bool
err string err string
}{ }{
{ {
@@ -167,6 +172,70 @@ func TestRestoreExpose(t *testing.T) {
}, },
err: "error to create restore pvc: fake-create-error", err: "error to create restore pvc: fake-create-error",
}, },
{
name: "succeed",
targetPVCName: "fake-target-pvc",
targetNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
daemonSet,
storageClass,
},
expectBackupPod: true,
expectBackupPVC: true,
},
{
name: "succeed, cache config, no cache volume",
targetPVCName: "fake-target-pvc",
targetNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
daemonSet,
storageClass,
},
cacheVolume: &CacheConfigs{},
expectBackupPod: true,
expectBackupPVC: true,
},
{
name: "create cache volume fail",
targetPVCName: "fake-target-pvc",
targetNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
daemonSet,
storageClass,
},
cacheVolume: &CacheConfigs{Limit: 1024},
kubeReactors: []reactor{
{
verb: "create",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create cache pvc: fake-create-error",
},
{
name: "succeed with cache volume",
targetPVCName: "fake-target-pvc",
targetNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
daemonSet,
storageClass,
},
cacheVolume: &CacheConfigs{Limit: 1024},
expectBackupPod: true,
expectBackupPVC: true,
expectCachePVC: true,
},
} }
for _, test := range tests { for _, test := range tests {
@@ -203,9 +272,36 @@ func TestRestoreExpose(t *testing.T) {
Resources: corev1api.ResourceRequirements{}, Resources: corev1api.ResourceRequirements{},
ExposeTimeout: time.Millisecond, ExposeTimeout: time.Millisecond,
LoadAffinity: nil, LoadAffinity: nil,
CacheVolume: test.cacheVolume,
}, },
) )
require.EqualError(t, err, test.err)
if test.err != "" {
require.EqualError(t, err, test.err)
} else {
require.NoError(t, err)
}
_, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{})
if test.expectBackupPod {
require.NoError(t, err)
} else {
require.True(t, apierrors.IsNotFound(err))
}
_, err = exposer.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{})
if test.expectBackupPVC {
require.NoError(t, err)
} else {
require.True(t, apierrors.IsNotFound(err))
}
_, err = exposer.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(t.Context(), getCachePVCName(ownerObject), metav1.GetOptions{})
if test.expectCachePVC {
require.NoError(t, err)
} else {
require.True(t, apierrors.IsNotFound(err))
}
}) })
} }
} }
@@ -651,6 +747,38 @@ func Test_ReastoreDiagnoseExpose(t *testing.T) {
}, },
} }
cachePVCWithVolumeName := corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore-cache",
UID: "fake-cache-pvc-uid",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: restore.APIVersion,
Kind: restore.Kind,
Name: restore.Name,
UID: restore.UID,
},
},
},
Spec: corev1api.PersistentVolumeClaimSpec{
VolumeName: "fake-pv-cache",
},
Status: corev1api.PersistentVolumeClaimStatus{
Phase: corev1api.ClaimPending,
},
}
cachePV := corev1api.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-pv-cache",
},
Status: corev1api.PersistentVolumeStatus{
Phase: corev1api.VolumePending,
Message: "fake-pv-message",
},
}
nodeAgentPod := corev1api.Pod{ nodeAgentPod := corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace, Namespace: velerov1.DefaultNamespace,
@@ -762,6 +890,44 @@ Pod velero/fake-restore, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message Pod condition Initialized, status True, reason , message fake-pod-message
PVC velero/fake-restore, phase Pending, binding to fake-pv PVC velero/fake-restore, phase Pending, binding to fake-pv
PV fake-pv, phase Pending, reason , message fake-pv-message PV fake-pv, phase Pending, reason , message fake-pv-message
end diagnose restore exposer`,
},
{
name: "cache pvc with volume name, no pv",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
&restorePodWithNodeName,
&restorePVCWithVolumeName,
&cachePVCWithVolumeName,
&nodeAgentPod,
},
expected: `begin diagnose restore exposer
Pod velero/fake-restore, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message
PVC velero/fake-restore, phase Pending, binding to fake-pv
error getting restore pv fake-pv, err: persistentvolumes "fake-pv" not found
PVC velero/fake-restore-cache, phase Pending, binding to fake-pv-cache
error getting cache pv fake-pv-cache, err: persistentvolumes "fake-pv-cache" not found
end diagnose restore exposer`,
},
{
name: "cache pvc with volume name, pv exists",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
&restorePodWithNodeName,
&restorePVCWithVolumeName,
&cachePVCWithVolumeName,
&restorePV,
&cachePV,
&nodeAgentPod,
},
expected: `begin diagnose restore exposer
Pod velero/fake-restore, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message
PVC velero/fake-restore, phase Pending, binding to fake-pv
PV fake-pv, phase Pending, reason , message fake-pv-message
PVC velero/fake-restore-cache, phase Pending, binding to fake-pv-cache
PV fake-pv-cache, phase Pending, reason , message fake-pv-message
end diagnose restore exposer`, end diagnose restore exposer`,
}, },
{ {
@@ -973,6 +1139,7 @@ func TestCreateRestorePod(t *testing.T) {
test.nodeOS, test.nodeOS,
test.affinity, test.affinity,
"", // priority class name "", // priority class name
nil,
) )
require.NoError(t, err) require.NoError(t, err)

View File

@@ -76,6 +76,12 @@ type PodVolumeExposeParam struct {
// Privileged indicates whether to create the pod with a privileged container // Privileged indicates whether to create the pod with a privileged container
Privileged bool Privileged bool
// RestoreSize specifies the data size for the volume to be restored, for restore only
RestoreSize int64
// CacheVolume specifies the info for cache volumes, for restore only
CacheVolume *CacheConfigs
} }
// PodVolumeExposer is the interfaces for a pod volume exposer // PodVolumeExposer is the interfaces for a pod volume exposer
@@ -156,7 +162,29 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.Obj
curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume) curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume)
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged) var cachePVC *corev1api.PersistentVolumeClaim
if param.CacheVolume != nil {
cacheVolumeSize := getCacheVolumeSize(param.RestoreSize, param.CacheVolume)
if cacheVolumeSize > 0 {
curLog.Infof("Creating cache PVC with size %v", cacheVolumeSize)
if pvc, err := createCachePVC(ctx, e.kubeClient.CoreV1(), ownerObject, param.CacheVolume.StorageClass, cacheVolumeSize, pod.Spec.NodeName); err != nil {
return errors.Wrap(err, "error to create cache pvc")
} else {
cachePVC = pvc
}
defer func() {
if err != nil {
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVC.Name, cachePVC.Namespace, 0, curLog)
}
}()
} else {
curLog.Infof("Don't need to create cache volume, restore size %v, cache info %v", param.RestoreSize, param.CacheVolume)
}
}
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName, param.Privileged, cachePVC)
if err != nil { if err != nil {
return errors.Wrapf(err, "error to create hosting pod") return errors.Wrapf(err, "error to create hosting pod")
} }
@@ -251,6 +279,15 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err) diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err)
} }
cachePVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(ctx, getCachePVCName(ownerObject), metav1.GetOptions{})
if err != nil {
cachePVC = nil
if !apierrors.IsNotFound(err) {
diag += fmt.Sprintf("error getting cache pvc %s, err: %v\n", getCachePVCName(ownerObject), err)
}
}
events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{}) events, err := e.kubeClient.CoreV1().Events(ownerObject.Namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
diag += fmt.Sprintf("error listing events, err: %v\n", err) diag += fmt.Sprintf("error listing events, err: %v\n", err)
@@ -266,6 +303,18 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
} }
} }
if cachePVC != nil {
diag += kube.DiagnosePVC(cachePVC, events)
if cachePVC.Spec.VolumeName != "" {
if pv, err := e.kubeClient.CoreV1().PersistentVolumes().Get(ctx, cachePVC.Spec.VolumeName, metav1.GetOptions{}); err != nil {
diag += fmt.Sprintf("error getting cache pv %s, err: %v\n", cachePVC.Spec.VolumeName, err)
} else {
diag += kube.DiagnosePV(pv)
}
}
}
diag += "end diagnose pod volume exposer" diag += "end diagnose pod volume exposer"
return diag return diag
@@ -273,11 +322,14 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name restorePodName := ownerObject.Name
cachePVCName := getCachePVCName(ownerObject)
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
kube.DeletePVAndPVCIfAny(ctx, e.kubeClient.CoreV1(), cachePVCName, ownerObject.Namespace, 0, e.log)
} }
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string, func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string,
operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool) (*corev1api.Pod, error) { operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string, privileged bool, cachePVC *corev1api.PersistentVolumeClaim) (*corev1api.Pod, error) {
hostingPodName := ownerObject.Name hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID) containerName := string(ownerObject.UID)
@@ -301,7 +353,6 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
MountPath: clientVolumePath, MountPath: clientVolumePath,
MountPropagation: &mountPropagation, MountPropagation: &mountPropagation,
}} }}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1api.Volume{{ volumes := []corev1api.Volume{{
Name: clientVolumeName, Name: clientVolumeName,
@@ -311,6 +362,25 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
}, },
}, },
}} }}
cacheVolumePath := ""
if cachePVC != nil {
mnt, _, path := kube.MakePodPVCAttachment(cacheVolumeName, nil, false)
volumeMounts = append(volumeMounts, mnt...)
volumes = append(volumes, corev1api.Volume{
Name: cacheVolumeName,
VolumeSource: corev1api.VolumeSource{
PersistentVolumeClaim: &corev1api.PersistentVolumeClaimVolumeSource{
ClaimName: cachePVC.Name,
},
},
})
cacheVolumePath = path
}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes = append(volumes, podInfo.volumes...) volumes = append(volumes, podInfo.volumes...)
args := []string{ args := []string{
@@ -328,6 +398,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
command = append(command, "backup") command = append(command, "backup")
} else { } else {
args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name))
args = append(args, fmt.Sprintf("--cache-volume-path=%s", cacheVolumePath))
command = append(command, "restore") command = append(command, "restore")
} }

View File

@@ -11,10 +11,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
appsv1api "k8s.io/api/apps/v1" appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
clientTesting "k8s.io/client-go/testing"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@@ -72,6 +74,9 @@ func TestPodVolumeExpose(t *testing.T) {
exposeParam PodVolumeExposeParam exposeParam PodVolumeExposeParam
funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error)
funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error) funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error)
kubeReactors []reactor
expectBackupPod bool
expectCachePVC bool
err string err string
}{ }{
{ {
@@ -189,6 +194,7 @@ func TestPodVolumeExpose(t *testing.T) {
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
}, },
expectBackupPod: true,
}, },
{ {
name: "succeed with privileged pod", name: "succeed with privileged pod",
@@ -212,6 +218,89 @@ func TestPodVolumeExpose(t *testing.T) {
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
}, },
expectBackupPod: true,
},
{
name: "succeed, cache config, no cache volume",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
CacheVolume: &CacheConfigs{},
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
daemonSet,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
}, nil
},
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
},
expectBackupPod: true,
},
{
name: "create cache volume fail",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
CacheVolume: &CacheConfigs{Limit: 1024},
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
daemonSet,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
}, nil
},
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
},
kubeReactors: []reactor{
{
verb: "create",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create cache pvc: fake-create-error",
},
{
name: "succeed with cache volume",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
CacheVolume: &CacheConfigs{Limit: 1024},
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
daemonSet,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
}, nil
},
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
},
expectBackupPod: true,
expectCachePVC: true,
}, },
} }
@@ -219,6 +308,10 @@ func TestPodVolumeExpose(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
exposer := podVolumeExposer{ exposer := podVolumeExposer{
kubeClient: fakeKubeClient, kubeClient: fakeKubeClient,
log: velerotest.NewLogger(), log: velerotest.NewLogger(),
@@ -248,9 +341,23 @@ func TestPodVolumeExpose(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
_, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{}) _, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{})
assert.NoError(t, err) require.NoError(t, err)
} else { } else {
assert.EqualError(t, err, test.err) require.EqualError(t, err, test.err)
}
_, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(t.Context(), ownerObject.Name, metav1.GetOptions{})
if test.expectBackupPod {
require.NoError(t, err)
} else {
require.True(t, apierrors.IsNotFound(err))
}
_, err = exposer.kubeClient.CoreV1().PersistentVolumeClaims(ownerObject.Namespace).Get(t.Context(), getCachePVCName(ownerObject), metav1.GetOptions{})
if test.expectCachePVC {
require.NoError(t, err)
} else {
require.True(t, apierrors.IsNotFound(err))
} }
}) })
} }
@@ -517,6 +624,38 @@ func TestPodVolumeDiagnoseExpose(t *testing.T) {
}, },
} }
cachePVCWithVolumeName := corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup-cache",
UID: "fake-cache-pvc-uid",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: backup.APIVersion,
Kind: backup.Kind,
Name: backup.Name,
UID: backup.UID,
},
},
},
Spec: corev1api.PersistentVolumeClaimSpec{
VolumeName: "fake-pv-cache",
},
Status: corev1api.PersistentVolumeClaimStatus{
Phase: corev1api.ClaimPending,
},
}
cachePV := corev1api.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-pv-cache",
},
Status: corev1api.PersistentVolumeStatus{
Phase: corev1api.VolumePending,
Message: "fake-pv-message",
},
}
nodeAgentPod := corev1api.Pod{ nodeAgentPod := corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace, Namespace: velerov1.DefaultNamespace,
@@ -589,6 +728,37 @@ end diagnose pod volume exposer`,
expected: `begin diagnose pod volume exposer expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name fake-node Pod velero/fake-backup, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message Pod condition Initialized, status True, reason , message fake-pod-message
end diagnose pod volume exposer`,
},
{
name: "cache pvc with volume name, no pv",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
&backupPodWithNodeName,
&cachePVCWithVolumeName,
&nodeAgentPod,
},
expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message
PVC velero/fake-backup-cache, phase Pending, binding to fake-pv-cache
error getting cache pv fake-pv-cache, err: persistentvolumes "fake-pv-cache" not found
end diagnose pod volume exposer`,
},
{
name: "cache pvc with volume name, pv exists",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
&backupPodWithNodeName,
&cachePVCWithVolumeName,
&cachePV,
&nodeAgentPod,
},
expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message
PVC velero/fake-backup-cache, phase Pending, binding to fake-pv-cache
PV fake-pv-cache, phase Pending, reason , message fake-pv-message
end diagnose pod volume exposer`, end diagnose pod volume exposer`,
}, },
{ {