data mover backup expose

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2023-05-22 10:05:53 +08:00
parent 25fb08b3c2
commit 25624d3030
10 changed files with 1538 additions and 0 deletions

View File

@@ -0,0 +1 @@
Add the code for data mover backup expose

385
pkg/exposer/csi_snapshot.go Normal file
View File

@@ -0,0 +1,385 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/vmware-tanzu/velero/pkg/util/csi"
"github.com/vmware-tanzu/velero/pkg/util/kube"
snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/typed/volumesnapshot/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
// CSISnapshotExposeParam define the input param for Expose of CSI snapshots
type CSISnapshotExposeParam struct {
// SourceNamespace is the original namespace of the volume that the snapshot is taken for
SourceNamespace string
// AccessMode defines the mode to access the snapshot
AccessMode string
// StorageClass is the storage class of the volume that the snapshot is taken for
StorageClass string
// HostingPodLabels is the labels that are going to apply to the hosting pod
HostingPodLabels map[string]string
}
// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
type CSISnapshotExposeWaitParam struct {
// NodeClient is the client that is used to find the hosting pod
NodeClient client.Client
NodeName string
}
// NewCSISnapshotExposer create a new instance of CSI snapshot exposer
func NewCSISnapshotExposer(kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, log logrus.FieldLogger) SnapshotExposer {
return &csiSnapshotExposer{
kubeClient: kubeClient,
csiSnapshotClient: csiSnapshotClient,
log: log,
}
}
type csiSnapshotExposer struct {
kubeClient kubernetes.Interface
csiSnapshotClient snapshotter.SnapshotV1Interface
log logrus.FieldLogger
}
func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, snapshotName string, timeout time.Duration, param interface{}) error {
csiExposeParam := param.(*CSISnapshotExposeParam)
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
})
curLog.Info("Exposing CSI snapshot")
volumeSnapshot, err := csi.WaitVolumeSnapshotReady(ctx, e.csiSnapshotClient, snapshotName, csiExposeParam.SourceNamespace, timeout)
if err != nil {
return errors.Wrapf(err, "error wait volume snapshot ready")
}
curLog.Info("Volumesnapshot is ready")
vsc, err := csi.GetVolumeSnapshotContentForVolumeSnapshot(volumeSnapshot, e.csiSnapshotClient)
if err != nil {
return errors.Wrap(err, "error to get volume snapshot content")
}
curLog.WithField("vsc name", vsc.Name).WithField("vs name", volumeSnapshot.Name).Infof("Got VSC from VS in namespace %s", volumeSnapshot.Namespace)
retained, err := csi.RetainVSC(ctx, e.csiSnapshotClient, vsc)
if err != nil {
return errors.Wrap(err, "error to retain volume snapshot content")
}
curLog.WithField("vsc name", vsc.Name).WithField("retained", (retained != nil)).Info("Finished to retain VSC")
defer func() {
if retained != nil {
csi.DeleteVolumeSnapshotContentIfAny(ctx, e.csiSnapshotClient, retained.Name, curLog)
}
}()
err = csi.EnsureDeleteVS(ctx, e.csiSnapshotClient, volumeSnapshot.Name, volumeSnapshot.Namespace, timeout)
if err != nil {
return errors.Wrap(err, "error to delete volume snapshot")
}
curLog.WithField("vs name", volumeSnapshot.Name).Infof("VS is deleted in namespace %s", volumeSnapshot.Namespace)
err = csi.EnsureDeleteVSC(ctx, e.csiSnapshotClient, vsc.Name, timeout)
if err != nil {
return errors.Wrap(err, "error to delete volume snapshot content")
}
curLog.WithField("vsc name", vsc.Name).Infof("VSC is deleted")
retained = nil
backupVS, err := e.createBackupVS(ctx, ownerObject, volumeSnapshot)
if err != nil {
return errors.Wrap(err, "error to create backup volume snapshot")
}
curLog.WithField("vs name", backupVS.Name).Infof("Backup VS is created from %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name)
defer func() {
if err != nil {
csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, backupVS.Name, backupVS.Namespace, curLog)
}
}()
backupVSC, err := e.createBackupVSC(ctx, ownerObject, vsc, backupVS)
if err != nil {
return errors.Wrap(err, "error to create backup volume snapshot content")
}
curLog.WithField("vsc name", backupVSC.Name).Infof("Backup VSC is created from %s", vsc.Name)
backupPVC, err := e.createBackupPVC(ctx, ownerObject, backupVS.Name, csiExposeParam.StorageClass, csiExposeParam.AccessMode, *volumeSnapshot.Status.RestoreSize)
if err != nil {
return errors.Wrap(err, "error to create backup pvc")
}
curLog.WithField("pvc name", backupPVC.Name).Info("Backup PVC is created")
defer func() {
if err != nil {
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), backupPVC.Name, backupPVC.Namespace, curLog)
}
}()
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels)
if err != nil {
return errors.Wrap(err, "error to create backup pod")
}
curLog.WithField("pod name", backupPod.Name).Info("Backup pod is created")
defer func() {
if err != nil {
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), backupPod.Name, backupPod.Namespace, curLog)
}
}()
return nil
}
func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, timeout time.Duration, param interface{}) (*ExposeResult, error) {
exposeWaitParam := param.(*CSISnapshotExposeWaitParam)
backupPodName := ownerObject.Name
backupPVCName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
})
pod := &corev1.Pod{}
err := exposeWaitParam.NodeClient.Get(ctx, types.NamespacedName{
Namespace: ownerObject.Namespace,
Name: backupPodName,
}, pod)
if err != nil {
if apierrors.IsNotFound(err) {
curLog.WithField("backup pod", backupPodName).Errorf("Backup pod is not running in the current node %s", exposeWaitParam.NodeName)
return nil, nil
} else {
return nil, errors.Wrapf(err, "error to get backup pod %s", backupPodName)
}
}
curLog.WithField("pod", pod.Name).Infof("Backup pod is in running state in node %s", pod.Spec.NodeName)
_, err = kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), backupPVCName, ownerObject.Namespace, timeout)
if err != nil {
return nil, errors.Wrapf(err, "error to wait backup PVC bound, %s", backupPVCName)
}
curLog.WithField("backup pvc", backupPVCName).Info("Backup PVC is bound")
return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, PVC: backupPVCName}}, nil
}
func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference, vsName string, sourceNamespace string) {
backupPodName := ownerObject.Name
backupPVCName := ownerObject.Name
backupVSName := ownerObject.Name
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), backupPodName, ownerObject.Namespace, e.log)
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), backupPVCName, ownerObject.Namespace, e.log)
csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, backupVSName, ownerObject.Namespace, e.log)
csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, vsName, sourceNamespace, e.log)
}
func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) {
if accessMode == AccessModeFileSystem {
return corev1.PersistentVolumeFilesystem, nil
} else {
return "", errors.Errorf("unsupported access mode %s", accessMode)
}
}
func (e *csiSnapshotExposer) createBackupVS(ctx context.Context, ownerObject corev1.ObjectReference, snapshotVS *snapshotv1api.VolumeSnapshot) (*snapshotv1api.VolumeSnapshot, error) {
backupVSName := ownerObject.Name
backupVSCName := ownerObject.Name
vs := &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: backupVSName,
Namespace: ownerObject.Namespace,
// Don't add ownerReference to SnapshotBackup.
// The backupPVC should be deleted before backupVS, otherwise, the deletion of backupVS will fail since
// backupPVC has its dataSource referring to it
},
Spec: snapshotv1api.VolumeSnapshotSpec{
Source: snapshotv1api.VolumeSnapshotSource{
VolumeSnapshotContentName: &backupVSCName,
},
VolumeSnapshotClassName: snapshotVS.Spec.VolumeSnapshotClassName,
},
}
return e.csiSnapshotClient.VolumeSnapshots(vs.Namespace).Create(ctx, vs, metav1.CreateOptions{})
}
func (e *csiSnapshotExposer) createBackupVSC(ctx context.Context, ownerObject corev1.ObjectReference, snapshotVSC *snapshotv1api.VolumeSnapshotContent, vs *snapshotv1api.VolumeSnapshot) (*snapshotv1api.VolumeSnapshotContent, error) {
backupVSCName := ownerObject.Name
vsc := &snapshotv1api.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: backupVSCName,
},
Spec: snapshotv1api.VolumeSnapshotContentSpec{
VolumeSnapshotRef: corev1.ObjectReference{
Name: vs.Name,
Namespace: vs.Namespace,
UID: vs.UID,
ResourceVersion: vs.ResourceVersion,
},
Source: snapshotv1api.VolumeSnapshotContentSource{
SnapshotHandle: snapshotVSC.Status.SnapshotHandle,
},
DeletionPolicy: snapshotVSC.Spec.DeletionPolicy,
Driver: snapshotVSC.Spec.Driver,
VolumeSnapshotClassName: snapshotVSC.Spec.VolumeSnapshotClassName,
},
}
return e.csiSnapshotClient.VolumeSnapshotContents().Create(ctx, vsc, metav1.CreateOptions{})
}
func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject corev1.ObjectReference, backupVS string, storageClass string, accessMode string, resource resource.Quantity) (*corev1.PersistentVolumeClaim, error) {
backupVCName := ownerObject.Name
volumeMode, err := getVolumeModeByAccessMode(accessMode)
if err != nil {
return nil, err
}
dataSource := &corev1.TypedLocalObjectReference{
APIGroup: &snapshotv1api.SchemeGroupVersion.Group,
Kind: "VolumeSnapshot",
Name: backupVS,
}
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ownerObject.Namespace,
Name: backupVCName,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
StorageClassName: &storageClass,
VolumeMode: &volumeMode,
DataSource: dataSource,
DataSourceRef: dataSource,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource,
},
},
},
}
created, err := e.kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrap(err, "error to create pvc")
}
return created, err
}
func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, label map[string]string) (*corev1.Pod, error) {
podName := ownerObject.Name
var gracePeriod int64 = 0
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: ownerObject.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
Labels: label,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: podName,
Image: "alpine:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "infinity"},
VolumeMounts: []corev1.VolumeMount{{
Name: backupPVC.Name,
MountPath: "/" + backupPVC.Name,
}},
},
},
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: []corev1.Volume{{
Name: backupPVC.Name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: backupPVC.Name,
},
},
}},
},
}
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}

View File

@@ -0,0 +1,298 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"testing"
"time"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
clientTesting "k8s.io/client-go/testing"
corev1 "k8s.io/api/core/v1"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)
type reactor struct {
verb string
resource string
reactorFunc clientTesting.ReactionFunc
}
func TestExpose(t *testing.T) {
vscName := "fake-vsc"
backup := &velerov1.Backup{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Backup",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
UID: "fake-uid",
},
}
vsObject := &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
Status: &snapshotv1api.VolumeSnapshotStatus{
BoundVolumeSnapshotContentName: &vscName,
ReadyToUse: boolptr.True(),
RestoreSize: &resource.Quantity{},
},
}
var restoreSize int64
vscObj := &snapshotv1api.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vsc",
},
Spec: snapshotv1api.VolumeSnapshotContentSpec{
DeletionPolicy: snapshotv1api.VolumeSnapshotContentDelete,
},
Status: &snapshotv1api.VolumeSnapshotContentStatus{
RestoreSize: &restoreSize,
},
}
tests := []struct {
name string
snapshotClientObj []runtime.Object
kubeClientObj []runtime.Object
ownerBackup *velerov1.Backup
snapshotName string
exposeParam CSISnapshotExposeParam
snapReactors []reactor
kubeReactors []reactor
err string
}{
{
name: "wait vs ready fail",
snapshotName: "fake-vs",
ownerBackup: backup,
err: "error wait volume snapshot ready: error to get volumesnapshot /fake-vs: volumesnapshots.snapshot.storage.k8s.io \"fake-vs\" not found",
},
{
name: "get vsc fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
},
snapshotClientObj: []runtime.Object{
vsObject,
},
err: "error to get volume snapshot content: error getting volume snapshot content from API: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found",
},
{
name: "delete vs fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
snapReactors: []reactor{
{
verb: "delete",
resource: "volumesnapshots",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-delete-error")
},
},
},
err: "error to delete volume snapshot: error to delete volume snapshot: fake-delete-error",
},
{
name: "delete vsc fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
snapReactors: []reactor{
{
verb: "delete",
resource: "volumesnapshotcontents",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-delete-error")
},
},
},
err: "error to delete volume snapshot content: error to delete volume snapshot content: fake-delete-error",
},
{
name: "create backup vs fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
snapReactors: []reactor{
{
verb: "create",
resource: "volumesnapshots",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create backup volume snapshot: fake-create-error",
},
{
name: "create backup vsc fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
snapReactors: []reactor{
{
verb: "create",
resource: "volumesnapshotcontents",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create backup volume snapshot content: fake-create-error",
},
{
name: "create backup pvc fail, invalid access mode",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
AccessMode: "fake-mode",
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
err: "error to create backup pvc: unsupported access mode fake-mode",
},
{
name: "create backup pvc fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
AccessMode: AccessModeFileSystem,
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
kubeReactors: []reactor{
{
verb: "create",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create backup pvc: error to create pvc: fake-create-error",
},
{
name: "create backup pod fail",
ownerBackup: backup,
snapshotName: "fake-vs",
exposeParam: CSISnapshotExposeParam{
SourceNamespace: "fake-ns",
AccessMode: AccessModeFileSystem,
},
snapshotClientObj: []runtime.Object{
vsObject,
vscObj,
},
kubeReactors: []reactor{
{
verb: "create",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create backup pod: fake-create-error",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.snapshotClientObj...)
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.snapReactors {
fakeSnapshotClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
exposer := csiSnapshotExposer{
kubeClient: fakeKubeClient,
csiSnapshotClient: fakeSnapshotClient.SnapshotV1(),
log: velerotest.NewLogger(),
}
var ownerObject corev1.ObjectReference
if test.ownerBackup != nil {
ownerObject = corev1.ObjectReference{
Kind: test.ownerBackup.Kind,
Namespace: test.ownerBackup.Namespace,
Name: test.ownerBackup.Name,
UID: test.ownerBackup.UID,
APIVersion: test.ownerBackup.APIVersion,
}
}
err := exposer.Expose(context.Background(), ownerObject, test.snapshotName, time.Millisecond, &test.exposeParam)
assert.EqualError(t, err, test.err)
})
}
}

38
pkg/exposer/snapshot.go Normal file
View File

@@ -0,0 +1,38 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"time"
corev1 "k8s.io/api/core/v1"
)
// SnapshotExposer is the interfaces for a snapshot exposer
type SnapshotExposer interface {
// Expose starts the process to expose a snapshot, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, time.Duration, interface{}) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1.ObjectReference, time.Duration, interface{}) (*ExposeResult, error)
// CleanUp cleans up any objects generated during the snapshot expose
CleanUp(context.Context, corev1.ObjectReference, string, string)
}

37
pkg/exposer/types.go Normal file
View File

@@ -0,0 +1,37 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
corev1 "k8s.io/api/core/v1"
)
const (
AccessModeFileSystem = "by-file-system"
)
// ExposeResult defines the result of expose.
// Varying from the type of the expose, the result may be different.
type ExposeResult struct {
ByPod ExposeByPod
}
// ExposeByPod defines the result for the expose method that a hosting pod is created
type ExposeByPod struct {
HostingPod *corev1.Pod
PVC string
}

View File

@@ -0,0 +1,194 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"encoding/json"
"fmt"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/typed/volumesnapshot/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
waitInternal = 2 * time.Second
)
// WaitVolumeSnapshotReady waits a VS to become ready to use until the timeout reaches
func WaitVolumeSnapshotReady(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface,
volumeSnapshot string, volumeSnapshotNS string, timeout time.Duration) (*snapshotv1api.VolumeSnapshot, error) {
var updated *snapshotv1api.VolumeSnapshot
err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
tmpVS, err := snapshotClient.VolumeSnapshots(volumeSnapshotNS).Get(ctx, volumeSnapshot, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("error to get volumesnapshot %s/%s", volumeSnapshotNS, volumeSnapshot))
}
if tmpVS.Status == nil || tmpVS.Status.BoundVolumeSnapshotContentName == nil || !boolptr.IsSetToTrue(tmpVS.Status.ReadyToUse) || tmpVS.Status.RestoreSize == nil {
return false, nil
}
updated = tmpVS
return true, nil
})
return updated, err
}
// GetVolumeSnapshotContentForVolumeSnapshot returns the volumesnapshotcontent object associated with the volumesnapshot
func GetVolumeSnapshotContentForVolumeSnapshot(volSnap *snapshotv1api.VolumeSnapshot, snapshotClient snapshotter.SnapshotV1Interface) (*snapshotv1api.VolumeSnapshotContent, error) {
if volSnap.Status == nil || volSnap.Status.BoundVolumeSnapshotContentName == nil {
return nil, errors.Errorf("invalid snapshot info in volume snapshot %s", volSnap.Name)
}
vsc, err := snapshotClient.VolumeSnapshotContents().Get(context.TODO(), *volSnap.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "error getting volume snapshot content from API")
}
return vsc, nil
}
// RetainVSC updates the VSC's deletion policy to Retain and return the update VSC
func RetainVSC(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface,
vsc *snapshotv1api.VolumeSnapshotContent) (*snapshotv1api.VolumeSnapshotContent, error) {
if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentRetain {
return nil, nil
}
origBytes, err := json.Marshal(vsc)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original VSC")
}
updated := vsc.DeepCopy()
updated.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated VSC")
}
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for VSC")
}
retained, err := snapshotClient.VolumeSnapshotContents().Patch(ctx, vsc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching VSC")
}
return retained, nil
}
// DeleteVolumeSnapshotContentIfAny deletes a VSC by name if it exists, and log an error when the deletion fails
func DeleteVolumeSnapshotContentIfAny(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, vscName string, log logrus.FieldLogger) {
vsc, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vscName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithError(err).Warnf("Abort deleting VSC, it doesn't exist %s", vscName)
}
} else {
err = snapshotClient.VolumeSnapshotContents().Delete(ctx, vsc.Name, metav1.DeleteOptions{})
if err != nil {
log.WithError(err).Warnf("Failed to delete volume snapshot content %s", vsc.Name)
}
}
}
// EnsureDeleteVS asserts the existence of a VS by name, deletes it and waits for its disappearance and returns errors on any failure
func EnsureDeleteVS(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface,
vsName string, vsNamespace string, timeout time.Duration) error {
err := snapshotClient.VolumeSnapshots(vsNamespace).Delete(ctx, vsName, metav1.DeleteOptions{})
if err != nil {
return errors.Wrap(err, "error to delete volume snapshot")
}
err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
_, err := snapshotClient.VolumeSnapshots(vsNamespace).Get(ctx, vsName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, errors.Wrapf(err, fmt.Sprintf("error to get VolumeSnapshot %s", vsName))
}
return false, nil
})
if err != nil {
return errors.Wrapf(err, "error to assure VolumeSnapshot is deleted, %s", vsName)
}
return nil
}
// EnsureDeleteVSC asserts the existence of a VSC by name, deletes it and waits for its disappearance and returns errors on any failure
func EnsureDeleteVSC(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface,
vscName string, timeout time.Duration) error {
err := snapshotClient.VolumeSnapshotContents().Delete(ctx, vscName, metav1.DeleteOptions{})
if err != nil {
return errors.Wrap(err, "error to delete volume snapshot content")
}
err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
_, err := snapshotClient.VolumeSnapshotContents().Get(ctx, vscName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, errors.Wrapf(err, fmt.Sprintf("error to get VolumeSnapshotContent %s", vscName))
}
return false, nil
})
if err != nil {
return errors.Wrapf(err, "error to assure VolumeSnapshotContent is deleted, %s", vscName)
}
return nil
}
// DeleteVolumeSnapshotIfAny deletes a VS by name if it exists, and log an error when the deletion fails
func DeleteVolumeSnapshotIfAny(ctx context.Context, snapshotClient snapshotter.SnapshotV1Interface, vsName string, vsNamespace string, log logrus.FieldLogger) {
err := snapshotClient.VolumeSnapshots(vsNamespace).Delete(ctx, vsName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debugf("Abort deleting volume snapshot, it doesn't exist %s/%s", vsNamespace, vsName)
} else {
log.WithError(err).Errorf("Failed to delete volume snapshot %s/%s", vsNamespace, vsName)
}
}
}

View File

@@ -0,0 +1,357 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"errors"
"testing"
"time"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientTesting "k8s.io/client-go/testing"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)
type reactor struct {
verb string
resource string
reactorFunc clientTesting.ReactionFunc
}
func TestWaitVolumeSnapshotReady(t *testing.T) {
vscName := "fake-vsc"
vsObj := &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
Status: &snapshotv1api.VolumeSnapshotStatus{
BoundVolumeSnapshotContentName: &vscName,
ReadyToUse: boolptr.True(),
RestoreSize: &resource.Quantity{},
},
}
tests := []struct {
name string
clientObj []runtime.Object
vsName string
namespace string
err string
expect *snapshotv1api.VolumeSnapshot
}{
{
name: "get vs error",
vsName: "fake-vs-1",
namespace: "fake-ns-1",
err: "error to get volumesnapshot fake-ns-1/fake-vs-1: volumesnapshots.snapshot.storage.k8s.io \"fake-vs-1\" not found",
},
{
name: "vs status is nil",
vsName: "fake-vs",
namespace: "fake-ns",
clientObj: []runtime.Object{
&snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
},
},
err: "timed out waiting for the condition",
},
{
name: "vsc is nil in status",
vsName: "fake-vs",
namespace: "fake-ns",
clientObj: []runtime.Object{
&snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
Status: &snapshotv1api.VolumeSnapshotStatus{},
},
},
err: "timed out waiting for the condition",
},
{
name: "ready to use is nil in status",
vsName: "fake-vs",
namespace: "fake-ns",
clientObj: []runtime.Object{
&snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
Status: &snapshotv1api.VolumeSnapshotStatus{
BoundVolumeSnapshotContentName: &vscName,
},
},
},
err: "timed out waiting for the condition",
},
{
name: "restore size is nil in status",
vsName: "fake-vs",
namespace: "fake-ns",
clientObj: []runtime.Object{
&snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
Status: &snapshotv1api.VolumeSnapshotStatus{
BoundVolumeSnapshotContentName: &vscName,
ReadyToUse: boolptr.True(),
},
},
},
err: "timed out waiting for the condition",
},
{
name: "success",
vsName: "fake-vs",
namespace: "fake-ns",
clientObj: []runtime.Object{
vsObj,
},
expect: vsObj,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...)
vs, err := WaitVolumeSnapshotReady(context.Background(), fakeSnapshotClient.SnapshotV1(), test.vsName, test.namespace, time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, test.expect, vs)
})
}
}
func TestGetVolumeSnapshotContentForVolumeSnapshot(t *testing.T) {
vscName := "fake-vsc"
vsObj := &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
Status: &snapshotv1api.VolumeSnapshotStatus{
BoundVolumeSnapshotContentName: &vscName,
ReadyToUse: boolptr.True(),
RestoreSize: &resource.Quantity{},
},
}
vscObj := &snapshotv1api.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vsc",
},
}
tests := []struct {
name string
snapshotObj *snapshotv1api.VolumeSnapshot
clientObj []runtime.Object
vsName string
namespace string
err string
expect *snapshotv1api.VolumeSnapshotContent
}{
{
name: "vs status is nil",
vsName: "fake-vs",
namespace: "fake-ns",
snapshotObj: &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
},
err: "invalid snapshot info in volume snapshot fake-vs",
},
{
name: "vsc is nil in status",
vsName: "fake-vs",
namespace: "fake-ns",
snapshotObj: &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
},
err: "invalid snapshot info in volume snapshot fake-vs",
},
{
name: "get vsc fail",
vsName: "fake-vs",
namespace: "fake-ns",
snapshotObj: vsObj,
err: "error getting volume snapshot content from API: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found",
},
{
name: "success",
vsName: "fake-vs",
namespace: "fake-ns",
snapshotObj: vsObj,
clientObj: []runtime.Object{vscObj},
expect: vscObj,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...)
vs, err := GetVolumeSnapshotContentForVolumeSnapshot(test.snapshotObj, fakeSnapshotClient.SnapshotV1())
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, test.expect, vs)
})
}
}
func TestEnsureDeleteVS(t *testing.T) {
vsObj := &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vs",
Namespace: "fake-ns",
},
}
tests := []struct {
name string
clientObj []runtime.Object
vsName string
namespace string
reactors []reactor
err string
}{
{
name: "delete fail",
vsName: "fake-vs",
namespace: "fake-ns",
err: "error to delete volume snapshot: volumesnapshots.snapshot.storage.k8s.io \"fake-vs\" not found",
},
{
name: "wait fail",
vsName: "fake-vs",
namespace: "fake-ns",
clientObj: []runtime.Object{vsObj},
reactors: []reactor{
{
verb: "get",
resource: "volumesnapshots",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
err: "error to assure VolumeSnapshot is deleted, fake-vs: error to get VolumeSnapshot fake-vs: fake-get-error",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...)
for _, reactor := range test.reactors {
fakeSnapshotClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
err := EnsureDeleteVS(context.Background(), fakeSnapshotClient.SnapshotV1(), test.vsName, test.namespace, time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
})
}
}
func TestEnsureDeleteVSC(t *testing.T) {
vscObj := &snapshotv1api.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-vsc",
},
}
tests := []struct {
name string
clientObj []runtime.Object
reactors []reactor
vscName string
err string
}{
{
name: "delete fail",
vscName: "fake-vsc",
err: "error to delete volume snapshot content: volumesnapshotcontents.snapshot.storage.k8s.io \"fake-vsc\" not found",
},
{
name: "wait fail",
vscName: "fake-vsc",
clientObj: []runtime.Object{vscObj},
reactors: []reactor{
{
verb: "get",
resource: "volumesnapshotcontents",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
err: "error to assure VolumeSnapshotContent is deleted, fake-vsc: error to get VolumeSnapshotContent fake-vsc: fake-get-error",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeSnapshotClient := snapshotFake.NewSimpleClientset(test.clientObj...)
for _, reactor := range test.reactors {
fakeSnapshotClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
err := EnsureDeleteVSC(context.Background(), fakeSnapshotClient.SnapshotV1(), test.vscName, time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@@ -16,8 +16,14 @@ limitations under the License.
package kube
import (
"context"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
// IsPodRunning does a well-rounded check to make sure the specified pod is running stably.
@@ -63,3 +69,15 @@ func isPodScheduledInStatus(pod *corev1api.Pod, statusCheckFunc func(*corev1api.
return nil
}
// DeletePodIfAny deletes a pod by name if it exists, and log an error when the deletion fails
func DeletePodIfAny(ctx context.Context, podGetter corev1client.CoreV1Interface, podName string, podNamespace string, log logrus.FieldLogger) {
err := podGetter.Pods(podNamespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debugf("Abort deleting pod, it doesn't exist %s/%s", podNamespace, podName)
} else {
log.WithError(err).Errorf("Failed to delete pod %s/%s", podNamespace, podName)
}
}
}

79
pkg/util/kube/pvc_pv.go Normal file
View File

@@ -0,0 +1,79 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
waitInternal = 2 * time.Second
)
// DeletePVCIfAny deletes a PVC by name if it exists, and log an error when the deletion fails
func DeletePVCIfAny(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvcName string, pvcNamespace string, log logrus.FieldLogger) {
err := pvcGetter.PersistentVolumeClaims(pvcNamespace).Delete(ctx, pvcName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debugf("Abort deleting PVC, it doesn't exist, %s/%s", pvcNamespace, pvcName)
} else {
log.WithError(err).Errorf("Failed to delete pvc %s/%s", pvcNamespace, pvcName)
}
}
}
// WaitPVCBound wait for binding of a PVC specified by name and returns the bound PV object
func WaitPVCBound(ctx context.Context, pvcGetter corev1client.CoreV1Interface,
pvGetter corev1client.CoreV1Interface, pvc string, namespace string, timeout time.Duration) (*corev1api.PersistentVolume, error) {
var updated *corev1api.PersistentVolumeClaim
err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
tmpPVC, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("error to get pvc %s/%s", namespace, pvc))
}
if tmpPVC.Spec.VolumeName == "" {
return false, nil
}
updated = tmpPVC
return true, nil
})
if err != nil {
return nil, errors.Wrap(err, "error to wait for rediness of PVC")
}
pv, err := pvGetter.PersistentVolumes().Get(ctx, updated.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "error to get PV")
}
return pv, err
}

View File

@@ -0,0 +1,131 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corev1api "k8s.io/api/core/v1"
clientTesting "k8s.io/client-go/testing"
)
type reactor struct {
verb string
resource string
reactorFunc clientTesting.ReactionFunc
}
func TestWaitPVCBound(t *testing.T) {
pvcObject := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc",
},
}
pvcObjectBound := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc",
},
Spec: corev1api.PersistentVolumeClaimSpec{
VolumeName: "fake-pv",
},
}
pvObj := &corev1api.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-pv",
},
}
tests := []struct {
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expected *corev1api.PersistentVolume
err string
}{
{
name: "wait pvc error",
pvcName: "fake-pvc",
pvcNamespace: "fake-namespace",
err: "error to wait for rediness of PVC: error to get pvc fake-namespace/fake-pvc: persistentvolumeclaims \"fake-pvc\" not found",
},
{
name: "wait pvc timeout",
pvcName: "fake-pvc",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObject,
},
err: "error to wait for rediness of PVC: timed out waiting for the condition",
},
{
name: "get pv fail",
pvcName: "fake-pvc",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectBound,
},
err: "error to get PV: persistentvolumes \"fake-pv\" not found",
},
{
name: "success",
pvcName: "fake-pvc",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectBound,
pvObj,
},
expected: pvObj,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
var kubeClient kubernetes.Interface = fakeKubeClient
pv, err := WaitPVCBound(context.Background(), kubeClient.CoreV1(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, test.expected, pv)
})
}
}