Convert Pod Volume Restore resource/controller to the Kubebuilder framework

Convert Pod Volume Restore resource/controller to the Kubebuilder framework

Fixes #4134

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
This commit is contained in:
Wenkai Yin(尹文开)
2021-12-23 14:42:22 +08:00
parent 3ec96e2eac
commit c2c211fefb
13 changed files with 381 additions and 694 deletions

View File

@@ -103,7 +103,7 @@ local_resource(
local_resource(
"restic_binary",
cmd = 'cd ' + '.' + ';mkdir -p _tiltbuild/restic; BIN=velero GOOS=linux GOARCH=amd64 RESTIC_VERSION=0.12.0 OUTPUT_DIR=_tiltbuild/restic ./hack/download-restic.sh',
cmd = 'cd ' + '.' + ';mkdir -p _tiltbuild/restic; BIN=velero GOOS=linux GOARCH=amd64 RESTIC_VERSION=0.13.1 OUTPUT_DIR=_tiltbuild/restic ./hack/download-restic.sh',
)
# Note: we need a distro with a bash shell to exec into the Velero container

View File

@@ -0,0 +1 @@
Convert Pod Volume Restore resource/controller to the Kubebuilder framework

View File

@@ -16,7 +16,37 @@ spec:
singular: podvolumerestore
scope: Namespaced
versions:
- name: v1
- additionalPrinterColumns:
- description: Namespace of the pod containing the volume to be restored
jsonPath: .spec.pod.namespace
name: Namespace
type: string
- description: Name of the pod containing the volume to be restored
jsonPath: .spec.pod.name
name: Pod
type: string
- description: Name of the volume to be restored
jsonPath: .spec.volume
name: Volume
type: string
- description: Pod Volume Restore status such as New/InProgress
jsonPath: .status.phase
name: Status
type: string
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.totalBytes
name: TotalBytes
type: integer
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.bytesDone
name: BytesDone
type: integer
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1
schema:
openAPIV3Schema:
properties:
@@ -136,6 +166,8 @@ spec:
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""

File diff suppressed because one or more lines are too long

View File

@@ -6,6 +6,24 @@ metadata:
creationTimestamp: null
name: velero-perms
rules:
- apiGroups:
- ""
resources:
- persistentvolumerclaims
verbs:
- get
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- apiGroups:
- velero.io
resources:
@@ -72,6 +90,26 @@ rules:
- get
- patch
- update
- apiGroups:
- velero.io
resources:
- podvolumerestores
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- velero.io
resources:
- podvolumerestores/status
verbs:
- get
- patch
- update
- apiGroups:
- velero.io
resources:

View File

@@ -81,8 +81,20 @@ type PodVolumeRestoreStatus struct {
Progress PodVolumeOperationProgress `json:"progress,omitempty"`
}
// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:generate=true
// +kubebuilder:object:root=true
// +kubebuilder:storageversion
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Namespace",type="string",JSONPath=".spec.pod.namespace",description="Namespace of the pod containing the volume to be restored"
// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".spec.pod.name",description="Name of the pod containing the volume to be restored"
// +kubebuilder:printcolumn:name="Volume",type="string",JSONPath=".spec.volume",description="Name of the volume to be restored"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="TotalBytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="BytesDone",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type PodVolumeRestore struct {
metav1.TypeMeta `json:",inline"`
@@ -98,6 +110,8 @@ type PodVolumeRestore struct {
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:generate=true
// +kubebuilder:object:root=true
// PodVolumeRestoreList is a list of PodVolumeRestores.
type PodVolumeRestoreList struct {

View File

@@ -30,27 +30,23 @@ import (
v1 "k8s.io/api/core/v1"
storagev1api "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/util/managercontroller"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/controller"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
@@ -101,45 +97,17 @@ func NewServerCommand(f client.Factory) *cobra.Command {
}
type resticServer struct {
kubeClient kubernetes.Interface
veleroClient clientset.Interface
veleroInformerFactory informers.SharedInformerFactory
kubeInformerFactory kubeinformers.SharedInformerFactory
podInformer cache.SharedIndexInformer
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
fileSystem filesystem.Interface
mgr manager.Manager
metrics *metrics.ServerMetrics
metricsAddress string
namespace string
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
fileSystem filesystem.Interface
mgr manager.Manager
metrics *metrics.ServerMetrics
metricsAddress string
namespace string
}
func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAddress string) (*resticServer, error) {
kubeClient, err := factory.KubeClient()
if err != nil {
return nil, err
}
veleroClient, err := factory.Client()
if err != nil {
return nil, err
}
// use a stand-alone pod informer because we want to use a field selector to
// filter to only pods scheduled on this node.
podInformer := corev1informers.NewFilteredPodInformer(
kubeClient,
metav1.NamespaceAll,
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) {
opts.FieldSelector = fmt.Sprintf("spec.nodeName=%s", os.Getenv("NODE_NAME"))
},
)
ctx, cancelFunc := context.WithCancel(context.Background())
clientConfig, err := factory.ClientConfig()
@@ -152,29 +120,40 @@ func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAd
velerov1api.AddToScheme(scheme)
v1.AddToScheme(scheme)
storagev1api.AddToScheme(scheme)
// use a field selector to filter to only pods scheduled on this node.
cacheOption := cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": os.Getenv("NODE_NAME")}.AsSelector(),
},
},
}
mgr, err := ctrl.NewManager(clientConfig, ctrl.Options{
Scheme: scheme,
Scheme: scheme,
NewCache: cache.BuilderWithOptions(cacheOption),
})
if err != nil {
return nil, err
}
s := &resticServer{
kubeClient: kubeClient,
veleroClient: veleroClient,
veleroInformerFactory: informers.NewFilteredSharedInformerFactory(veleroClient, 0, factory.Namespace(), nil),
kubeInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0),
podInformer: podInformer,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
fileSystem: filesystem.NewFileSystem(),
mgr: mgr,
metricsAddress: metricAddress,
namespace: factory.Namespace(),
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
fileSystem: filesystem.NewFileSystem(),
mgr: mgr,
metricsAddress: metricAddress,
namespace: factory.Namespace(),
}
if err := s.validatePodVolumesHostPath(); err != nil {
// the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot
// be used, so we need the kube client here
client, err := factory.KubeClient()
if err != nil {
return nil, err
}
if err := s.validatePodVolumesHostPath(client); err != nil {
return nil, err
}
@@ -218,38 +197,14 @@ func (s *resticServer) run() {
FileSystem: filesystem.NewFileSystem(),
ResticExec: restic.BackupExec{},
Log: s.logger,
PvLister: s.kubeInformerFactory.Core().V1().PersistentVolumes().Lister(),
PvcLister: s.kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister(),
}
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup)
}
restoreController := controller.NewPodVolumeRestoreController(
s.logger,
s.veleroInformerFactory.Velero().V1().PodVolumeRestores(),
s.veleroClient.VeleroV1(),
s.podInformer,
s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(),
s.kubeInformerFactory.Core().V1().PersistentVolumes(),
s.mgr.GetClient(),
os.Getenv("NODE_NAME"),
credentialFileStore,
)
go s.veleroInformerFactory.Start(s.ctx.Done())
go s.kubeInformerFactory.Start(s.ctx.Done())
go s.podInformer.Run(s.ctx.Done())
// TODO(2.0): presuming all controllers and resources are converted to runtime-controller
// by v2.0, the block from this line and including the `s.mgr.Start() will be
// deprecated, since the manager auto-starts all the caches. Until then, we need to start the
// cache for them manually.
// Adding the controllers to the manager will register them as a (runtime-controller) runnable,
// so the manager will ensure the cache is started and ready before all controller are started
s.mgr.Add(managercontroller.Runnable(restoreController, 1))
if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialFileStore).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
s.logger.Info("Controllers starting...")
@@ -260,7 +215,7 @@ func (s *resticServer) run() {
// validatePodVolumesHostPath validates that the pod volumes path contains a
// directory for each Pod running on this node
func (s *resticServer) validatePodVolumesHostPath() error {
func (s *resticServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
files, err := s.fileSystem.ReadDir("/host_pods/")
if err != nil {
return errors.Wrap(err, "could not read pod volumes host path")
@@ -274,7 +229,7 @@ func (s *resticServer) validatePodVolumesHostPath() error {
}
}
pods, err := s.kubeClient.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", os.Getenv("NODE_NAME"))})
pods, err := client.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", os.Getenv("NODE_NAME"))})
if err != nil {
return errors.WithStack(err)
}

View File

@@ -95,12 +95,11 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
}
s := &resticServer{
kubeClient: kubeClient,
logger: testutil.NewLogger(),
fileSystem: fs,
}
err := s.validatePodVolumesHostPath()
err := s.validatePodVolumesHostPath(kubeClient)
if tt.wantErr {
assert.Error(t, err)
} else {

View File

@@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
corev1listers "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -60,9 +59,6 @@ type PodVolumeBackupReconciler struct {
FileSystem filesystem.Interface
ResticExec BackupExecuter
Log logrus.FieldLogger
PvLister corev1listers.PersistentVolumeLister
PvcLister corev1listers.PersistentVolumeClaimLister
}
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete
@@ -302,7 +298,7 @@ type resticDetails struct {
}
func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log *logrus.Entry, pvb *velerov1api.PodVolumeBackup, pod *corev1.Pod, details *resticDetails) (*restic.Command, error) {
volDir, err := kube.GetVolumeDirectory(log, pod, pvb.Spec.Volume, r.PvcLister, r.PvLister, r.Client)
volDir, err := kube.GetVolumeDirectory(ctx, log, pod, pvb.Spec.Volume, r.Client)
if err != nil {
return nil, errors.Wrap(err, "getting volume directory name")
}

View File

@@ -18,13 +18,11 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
@@ -33,191 +31,176 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
k8scache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
type podVolumeRestoreController struct {
*genericController
podVolumeRestoreClient velerov1client.PodVolumeRestoresGetter
podVolumeRestoreLister listers.PodVolumeRestoreLister
podLister corev1listers.PodLister
pvcLister corev1listers.PersistentVolumeClaimLister
pvLister corev1listers.PersistentVolumeLister
backupLocationInformer k8scache.Informer
kbClient client.Client
nodeName string
credentialsFileStore credentials.FileStore
processRestoreFunc func(*velerov1api.PodVolumeRestore) error
fileSystem filesystem.Interface
clock clock.Clock
func NewPodVolumeRestoreReconciler(logger logrus.FieldLogger, client client.Client, credentialsFileStore credentials.FileStore) *PodVolumeRestoreReconciler {
return &PodVolumeRestoreReconciler{
Client: client,
logger: logger.WithField("controller", "PodVolumeRestore"),
credentialsFileStore: credentialsFileStore,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
}
}
// NewPodVolumeRestoreController creates a new pod volume restore controller.
func NewPodVolumeRestoreController(
logger logrus.FieldLogger,
podVolumeRestoreInformer informers.PodVolumeRestoreInformer,
podVolumeRestoreClient velerov1client.PodVolumeRestoresGetter,
podInformer cache.SharedIndexInformer,
pvcInformer corev1informers.PersistentVolumeClaimInformer,
pvInformer corev1informers.PersistentVolumeInformer,
kbClient client.Client,
nodeName string,
credentialsFileStore credentials.FileStore,
) Interface {
c := &podVolumeRestoreController{
genericController: newGenericController(PodVolumeRestore, logger),
podVolumeRestoreClient: podVolumeRestoreClient,
podVolumeRestoreLister: podVolumeRestoreInformer.Lister(),
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
pvcLister: pvcInformer.Lister(),
pvLister: pvInformer.Lister(),
kbClient: kbClient,
nodeName: nodeName,
credentialsFileStore: credentialsFileStore,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
}
c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(
c.cacheSyncWaiters,
podVolumeRestoreInformer.Informer().HasSynced,
podInformer.HasSynced,
pvcInformer.Informer().HasSynced,
)
c.processRestoreFunc = c.processRestore
podVolumeRestoreInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.pvrHandler,
UpdateFunc: func(_, obj interface{}) {
c.pvrHandler(obj)
},
},
)
podInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.podHandler,
UpdateFunc: func(_, obj interface{}) {
c.podHandler(obj)
},
},
)
return c
type PodVolumeRestoreReconciler struct {
client.Client
logger logrus.FieldLogger
credentialsFileStore credentials.FileStore
fileSystem filesystem.Interface
clock clock.Clock
}
func (c *podVolumeRestoreController) pvrHandler(obj interface{}) {
pvr := obj.(*velerov1api.PodVolumeRestore)
log := loggerForPodVolumeRestore(c.logger, pvr)
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get
// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get
if !isPVRNew(pvr) {
log.Debugf("Restore is not new, not enqueuing")
return
}
func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := c.logger.WithField("PodVolumeRestore", req.NamespacedName.String())
pod, err := c.podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name)
if apierrors.IsNotFound(err) {
log.WithError(err).Debugf("Restore's pod %s/%s not found, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
return
}
if err != nil {
log.WithError(err).Errorf("Unable to get restore's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
return
}
if !isPodOnNode(pod, c.nodeName) {
log.Debugf("Restore's pod is not on this node, not enqueuing")
return
}
if !isResticInitContainerRunning(pod) {
log.Debug("Restore's pod is not running restic-wait init container, not enqueuing")
return
}
resticInitContainerIndex := getResticInitContainerIndex(pod)
if resticInitContainerIndex > 0 {
log.Warnf(`Init containers before the %s container may cause issues
if they interfere with volumes being restored: %s index %d`, restic.InitContainer, restic.InitContainer, resticInitContainerIndex)
}
log.Debug("Enqueueing")
c.enqueue(obj)
}
func (c *podVolumeRestoreController) podHandler(obj interface{}) {
pod := obj.(*corev1api.Pod)
log := c.logger.WithField("key", kube.NamespaceAndName(pod))
// the pod should always be for this node since the podInformer is filtered
// based on node, so this is just a failsafe.
if !isPodOnNode(pod, c.nodeName) {
return
}
if !isResticInitContainerRunning(pod) {
log.Debug("Pod is not running restic-wait init container, not enqueuing restores for pod")
return
}
resticInitContainerIndex := getResticInitContainerIndex(pod)
if resticInitContainerIndex > 0 {
log.Warnf(`Init containers before the %s container may cause issues
if they interfere with volumes being restored: %s index %d`, restic.InitContainer, restic.InitContainer, resticInitContainerIndex)
}
selector := labels.Set(map[string]string{
velerov1api.PodUIDLabel: string(pod.UID),
}).AsSelector()
pvrs, err := c.podVolumeRestoreLister.List(selector)
if err != nil {
log.WithError(err).Error("Unable to list pod volume restores")
return
}
if len(pvrs) == 0 {
return
}
for _, pvr := range pvrs {
log := loggerForPodVolumeRestore(log, pvr)
if !isPVRNew(pvr) {
log.Debug("Restore is not new, not enqueuing")
continue
pvr := &velerov1api.PodVolumeRestore{}
if err := c.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil {
if apierrors.IsNotFound(err) {
log.Warn("PodVolumeRestore not found, skip")
return ctrl.Result{}, nil
}
log.Debug("Enqueuing")
c.enqueue(pvr)
log.WithError(err).Error("Unable to get the PodVolumeRestore")
return ctrl.Result{}, err
}
log = log.WithField("pod", fmt.Sprintf("%s/%s", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name))
if len(pvr.OwnerReferences) == 1 {
log = log.WithField("restore", fmt.Sprintf("%s/%s", pvr.Namespace, pvr.OwnerReferences[0].Name))
}
shouldProcess, pod, err := c.shouldProcess(ctx, log, pvr)
if err != nil {
return ctrl.Result{}, err
}
if !shouldProcess {
return ctrl.Result{}, nil
}
resticInitContainerIndex := getResticInitContainerIndex(pod)
if resticInitContainerIndex > 0 {
log.Warnf(`Init containers before the %s container may cause issues
if they interfere with volumes being restored: %s index %d`, restic.InitContainer, restic.InitContainer, resticInitContainerIndex)
}
patchHelper, err := patch.NewHelper(pvr, c.Client)
if err != nil {
log.WithError(err).Error("Unable to new patch helper")
return ctrl.Result{}, err
}
log.Info("Restore starting")
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress
pvr.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()}
if err = patchHelper.Patch(ctx, pvr); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
}
if err = c.processRestore(ctx, pvr, pod, log); err != nil {
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = err.Error()
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if e := patchHelper.Patch(ctx, pvr); e != nil {
log.WithError(err).Error("Unable to update status to failed")
}
log.WithError(err).Error("Unable to process the PodVolumeRestore")
return ctrl.Result{}, err
}
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if err = patchHelper.Patch(ctx, pvr); err != nil {
log.WithError(err).Error("Unable to update status to completed")
return ctrl.Result{}, err
}
log.Info("Restore completed")
return ctrl.Result{}, nil
}
func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) {
if !isPVRNew(pvr) {
log.Debug("PodVolumeRestore is not new, skip")
return false, nil, nil
}
// we filter the pods during the initialization of cache, if we can get a pod here, the pod must be in the same node with the controller
// so we don't need to compare the node anymore
pod := &corev1api.Pod{}
if err := c.Get(ctx, types.NamespacedName{Namespace: pvr.Spec.Pod.Namespace, Name: pvr.Spec.Pod.Name}, pod); err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debug("Pod not found on this node, skip")
return false, nil, nil
}
log.WithError(err).Error("Unable to get pod")
return false, nil, err
}
if !isResticInitContainerRunning(pod) {
log.Debug("Pod is not running restic-wait init container, skip")
return false, nil, nil
}
return true, pod, nil
}
func (c *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
mgr.GetConfig()
// The pod may not being scheduled at the point when its PVRs are initially reconciled.
// By watching the pods, we can trigger the PVR reconciliation again once the pod is finally scheduled on the node.
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.PodVolumeRestore{}).
Watches(&source.Kind{Type: &corev1api.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.findVolumeRestoresForPod)).
Complete(c)
}
func (c *PodVolumeRestoreReconciler) findVolumeRestoresForPod(pod client.Object) []reconcile.Request {
list := &velerov1api.PodVolumeRestoreList{}
options := &client.ListOptions{
LabelSelector: labels.Set(map[string]string{
velerov1api.PodUIDLabel: string(pod.GetUID()),
}).AsSelector(),
}
if err := c.List(context.TODO(), list, options); err != nil {
c.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err).
Error("unable to list PodVolumeRestores")
return []reconcile.Request{}
}
requests := make([]reconcile.Request, len(list.Items))
for i, item := range list.Items {
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: item.GetNamespace(),
Name: item.GetName(),
},
}
}
return requests
}
func isPVRNew(pvr *velerov1api.PodVolumeRestore) bool {
return pvr.Status.Phase == "" || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseNew
}
func isPodOnNode(pod *corev1api.Pod, node string) bool {
return pod.Spec.NodeName == node
}
func isResticInitContainerRunning(pod *corev1api.Pod) bool {
// Restic wait container can be anywhere in the list of init containers, but must be running.
i := getResticInitContainerIndex(pod)
@@ -237,92 +220,6 @@ func getResticInitContainerIndex(pod *corev1api.Pod) int {
return -1
}
func (c *podVolumeRestoreController) processQueueItem(key string) error {
log := c.logger.WithField("key", key)
log.Debug("Running processQueueItem")
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.WithError(errors.WithStack(err)).Error("error splitting queue key")
return nil
}
req, err := c.podVolumeRestoreLister.PodVolumeRestores(ns).Get(name)
if apierrors.IsNotFound(err) {
log.Debug("Unable to find PodVolumeRestore")
return nil
}
if err != nil {
return errors.Wrap(err, "error getting PodVolumeRestore")
}
// Don't mutate the shared cache
reqCopy := req.DeepCopy()
return c.processRestoreFunc(reqCopy)
}
func loggerForPodVolumeRestore(baseLogger logrus.FieldLogger, req *velerov1api.PodVolumeRestore) logrus.FieldLogger {
log := baseLogger.WithFields(logrus.Fields{
"namespace": req.Namespace,
"name": req.Name,
})
if len(req.OwnerReferences) == 1 {
log = log.WithField("restore", fmt.Sprintf("%s/%s", req.Namespace, req.OwnerReferences[0].Name))
}
return log
}
func (c *podVolumeRestoreController) processRestore(req *velerov1api.PodVolumeRestore) error {
log := loggerForPodVolumeRestore(c.logger, req)
log.Info("Restore starting")
var err error
// update status to InProgress
req, err = c.patchPodVolumeRestore(req, func(r *velerov1api.PodVolumeRestore) {
r.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress
r.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()}
})
if err != nil {
log.WithError(err).Error("Error setting PodVolumeRestore startTimestamp and phase to InProgress")
return errors.WithStack(err)
}
pod, err := c.podLister.Pods(req.Spec.Pod.Namespace).Get(req.Spec.Pod.Name)
if err != nil {
log.WithError(err).Errorf("Error getting pod %s/%s", req.Spec.Pod.Namespace, req.Spec.Pod.Name)
return c.failRestore(req, errors.Wrap(err, "error getting pod").Error(), log)
}
volumeDir, err := kube.GetVolumeDirectory(log, pod, req.Spec.Volume, c.pvcLister, c.pvLister, c.kbClient)
if err != nil {
log.WithError(err).Error("Error getting volume directory name")
return c.failRestore(req, errors.Wrap(err, "error getting volume directory name").Error(), log)
}
// execute the restore process
if err := c.restorePodVolume(req, volumeDir, log); err != nil {
log.WithError(err).Error("Error restoring volume")
return c.failRestore(req, errors.Wrap(err, "error restoring volume").Error(), log)
}
// update status to Completed
if _, err = c.patchPodVolumeRestore(req, func(r *velerov1api.PodVolumeRestore) {
r.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted
r.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
}); err != nil {
log.WithError(err).Error("Error setting PodVolumeRestore completionTimestamp and phase to Completed")
return err
}
log.Info("Restore completed")
return nil
}
func singlePathMatch(path string) (string, error) {
matches, err := filepath.Glob(path)
if err != nil {
@@ -336,7 +233,12 @@ func singlePathMatch(path string) (string, error) {
return matches[0], nil
}
func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolumeRestore, volumeDir string, log logrus.FieldLogger) error {
func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *velerov1api.PodVolumeRestore, pod *corev1api.Pod, log logrus.FieldLogger) error {
volumeDir, err := kube.GetVolumeDirectory(ctx, log, pod, req.Spec.Volume, c.Client)
if err != nil {
return errors.Wrap(err, "error getting volume directory name")
}
// Get the full path of the new volume's directory as mounted in the daemonset pod, which
// will look like: /host_pods/<new-pod-uid>/volumes/<volume-plugin-name>/<volume-dir>
volumePath, err := singlePathMatch(fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir))
@@ -346,8 +248,7 @@ func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolume
credsFile, err := c.credentialsFileStore.Path(restic.RepoKeySelector())
if err != nil {
log.WithError(err).Error("Error creating temp restic credentials file")
return c.failRestore(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log)
return errors.Wrap(err, "error creating temp restic credentials file")
}
// ignore error since there's nothing we can do and it's a temp file.
defer os.Remove(credsFile)
@@ -360,11 +261,11 @@ func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolume
)
backupLocation := &velerov1api.BackupStorageLocation{}
if err := c.kbClient.Get(context.Background(), client.ObjectKey{
if err := c.Get(ctx, client.ObjectKey{
Namespace: req.Namespace,
Name: req.Spec.BackupStorageLocation,
}, backupLocation); err != nil {
return c.failRestore(req, errors.Wrap(err, "error getting backup storage location").Error(), log)
return errors.Wrap(err, "error getting backup storage location")
}
// if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic
@@ -381,7 +282,7 @@ func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolume
env, err := restic.CmdEnv(backupLocation, c.credentialsFileStore)
if err != nil {
return c.failRestore(req, errors.Wrap(err, "error setting restic cmd env").Error(), log)
return errors.Wrap(err, "error setting restic cmd env")
}
resticCmd.Env = env
@@ -432,55 +333,18 @@ func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolume
return nil
}
func (c *podVolumeRestoreController) patchPodVolumeRestore(req *velerov1api.PodVolumeRestore, mutate func(*velerov1api.PodVolumeRestore)) (*velerov1api.PodVolumeRestore, error) {
// Record original json
oldData, err := json.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original PodVolumeRestore")
}
// Mutate
mutate(req)
// Record new json
newData, err := json.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated PodVolumeRestore")
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PodVolumeRestore")
}
req, err = c.podVolumeRestoreClient.PodVolumeRestores(req.Namespace).Patch(context.TODO(), req.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PodVolumeRestore")
}
return req, nil
}
func (c *podVolumeRestoreController) failRestore(req *velerov1api.PodVolumeRestore, msg string, log logrus.FieldLogger) error {
if _, err := c.patchPodVolumeRestore(req, func(pvr *velerov1api.PodVolumeRestore) {
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = msg
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
}); err != nil {
log.WithError(err).Error("Error setting PodVolumeRestore phase to Failed")
return err
}
return nil
}
// updateRestoreProgressFunc returns a func that takes progress info and patches
// the PVR with the new progress
func (c *podVolumeRestoreController) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
if _, err := c.patchPodVolumeRestore(req, func(r *velerov1api.PodVolumeRestore) {
r.Status.Progress = progress
}); err != nil {
log.WithError(err).Error("error updating PodVolumeRestore progress")
helper, err := patch.NewHelper(req, c.Client)
if err != nil {
log.WithError(err).Error("Unable to new patch helper")
return
}
req.Status.Progress = progress
if err = helper.Patch(context.Background(), req); err != nil {
log.WithError(err).Error("Unable to update PodVolumeRestore progress")
}
}
}

View File

@@ -17,64 +17,64 @@ limitations under the License.
package controller
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/stretchr/testify/assert"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
veleroinformers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/restic"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestPVRHandler(t *testing.T) {
func TestShouldProcess(t *testing.T) {
controllerNode := "foo"
tests := []struct {
name string
obj *velerov1api.PodVolumeRestore
pod *corev1api.Pod
shouldEnqueue bool
name string
obj *velerov1api.PodVolumeRestore
pod *corev1api.Pod
shouldProcessed bool
}{
{
name: "InProgress phase pvr should not be enqueued",
name: "InProgress phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseInProgress,
},
},
shouldEnqueue: false,
shouldProcessed: false,
},
{
name: "Completed phase pvr should not be enqueued",
name: "Completed phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseCompleted,
},
},
shouldEnqueue: false,
shouldProcessed: false,
},
{
name: "Failed phase pvr should not be enqueued",
name: "Failed phase pvr should not be processed",
obj: &velerov1api.PodVolumeRestore{
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseFailed,
},
},
shouldEnqueue: false,
shouldProcessed: false,
},
{
name: "Unable to get pvr's pod should not be enqueued",
name: "Unable to get pvr's pod should not be processed",
obj: &velerov1api.PodVolumeRestore{
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
@@ -86,50 +86,10 @@ func TestPVRHandler(t *testing.T) {
Phase: "",
},
},
shouldEnqueue: false,
shouldProcessed: false,
},
{
name: "Empty phase pvr with pod not on node running init container should not be enqueued",
obj: &velerov1api.PodVolumeRestore{
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
Namespace: "ns-1",
Name: "pod-1",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: "",
},
},
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
},
Spec: corev1api.PodSpec{
NodeName: "some-other-node",
InitContainers: []corev1api.Container{
{
Name: restic.InitContainer,
},
},
},
Status: corev1api.PodStatus{
InitContainerStatuses: []corev1api.ContainerStatus{
{
State: corev1api.ContainerState{
Running: &corev1api.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
},
},
},
},
},
},
shouldEnqueue: false,
},
{
name: "Empty phase pvr with pod on node not running init container should not be enqueued",
name: "Empty phase pvr with pod on node not running init container should not be processed",
obj: &velerov1api.PodVolumeRestore{
Spec: velerov1api.PodVolumeRestoreSpec{
Pod: corev1api.ObjectReference{
@@ -162,7 +122,7 @@ func TestPVRHandler(t *testing.T) {
},
},
},
shouldEnqueue: false,
shouldProcessed: false,
},
{
name: "Empty phase pvr with pod on node running init container should be enqueued",
@@ -202,220 +162,23 @@ func TestPVRHandler(t *testing.T) {
},
},
},
shouldEnqueue: true,
shouldProcessed: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
podInformer = cache.NewSharedIndexInformer(nil, new(corev1api.Pod), 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
c = &podVolumeRestoreController{
genericController: newGenericController(PodVolumeRestore, velerotest.NewLogger()),
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
nodeName: controllerNode,
}
)
builder := fake.NewClientBuilder()
if test.pod != nil {
require.NoError(t, podInformer.GetStore().Add(test.pod))
builder.WithObjects(test.pod)
}
c := &PodVolumeRestoreReconciler{
logger: logrus.New(),
Client: builder.Build(),
}
c.pvrHandler(test.obj)
if !test.shouldEnqueue {
assert.Equal(t, 0, c.queue.Len())
return
}
require.Equal(t, 1, c.queue.Len())
})
}
}
func TestPodHandler(t *testing.T) {
controllerNode := "foo"
tests := []struct {
name string
pod *corev1api.Pod
podVolumeRestores []*velerov1api.PodVolumeRestore
expectedEnqueues sets.String
}{
{
name: "pod on controller node running restic init container with multiple PVRs has new ones enqueued",
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
UID: types.UID("uid"),
},
Spec: corev1api.PodSpec{
NodeName: controllerNode,
InitContainers: []corev1api.Container{
{
Name: restic.InitContainer,
},
},
},
Status: corev1api.PodStatus{
InitContainerStatuses: []corev1api.ContainerStatus{
{
State: corev1api.ContainerState{
Running: &corev1api.ContainerStateRunning{StartedAt: metav1.Time{Time: time.Now()}},
},
},
},
},
},
podVolumeRestores: []*velerov1api.PodVolumeRestore{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pvr-1",
Labels: map[string]string{
velerov1api.PodUIDLabel: "uid",
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pvr-2",
Labels: map[string]string{
velerov1api.PodUIDLabel: "uid",
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pvr-3",
Labels: map[string]string{
velerov1api.PodUIDLabel: "uid",
},
},
Status: velerov1api.PodVolumeRestoreStatus{
Phase: velerov1api.PodVolumeRestorePhaseInProgress,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pvr-4",
Labels: map[string]string{
velerov1api.PodUIDLabel: "some-other-pod",
},
},
},
},
expectedEnqueues: sets.NewString("ns-1/pvr-1", "ns-1/pvr-2"),
},
{
name: "pod on controller node not running restic init container doesn't have PVRs enqueued",
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
UID: types.UID("uid"),
},
Spec: corev1api.PodSpec{
NodeName: controllerNode,
InitContainers: []corev1api.Container{
{
Name: restic.InitContainer,
},
},
},
Status: corev1api.PodStatus{
InitContainerStatuses: []corev1api.ContainerStatus{
{
State: corev1api.ContainerState{},
},
},
},
},
podVolumeRestores: []*velerov1api.PodVolumeRestore{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pvr-1",
Labels: map[string]string{
velerov1api.PodUIDLabel: "uid",
},
},
},
},
},
{
name: "pod not running on controller node doesn't have PVRs enqueued",
pod: &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pod-1",
UID: types.UID("uid"),
},
Spec: corev1api.PodSpec{
NodeName: "some-other-node",
InitContainers: []corev1api.Container{
{
Name: restic.InitContainer,
},
},
},
Status: corev1api.PodStatus{
InitContainerStatuses: []corev1api.ContainerStatus{
{
State: corev1api.ContainerState{
Running: &corev1api.ContainerStateRunning{StartedAt: metav1.Time{Time: time.Now()}},
},
},
},
},
},
podVolumeRestores: []*velerov1api.PodVolumeRestore{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns-1",
Name: "pvr-1",
Labels: map[string]string{
velerov1api.PodUIDLabel: "uid",
},
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = velerofake.NewSimpleClientset()
informers = veleroinformers.NewSharedInformerFactory(client, 0)
pvrInformer = informers.Velero().V1().PodVolumeRestores()
c = &podVolumeRestoreController{
genericController: newGenericController(PodVolumeRestore, velerotest.NewLogger()),
podVolumeRestoreLister: velerov1listers.NewPodVolumeRestoreLister(pvrInformer.Informer().GetIndexer()),
nodeName: controllerNode,
}
)
if len(test.podVolumeRestores) > 0 {
for _, pvr := range test.podVolumeRestores {
require.NoError(t, pvrInformer.Informer().GetStore().Add(pvr))
}
}
c.podHandler(test.pod)
require.Equal(t, len(test.expectedEnqueues), c.queue.Len())
itemCount := c.queue.Len()
for i := 0; i < itemCount; i++ {
item, _ := c.queue.Get()
assert.True(t, test.expectedEnqueues.Has(item.(string)))
}
shouldProcess, _, _ := c.shouldProcess(context.Background(), c.logger, test.obj)
require.Equal(t, test.shouldProcessed, shouldProcess)
})
}
}
@@ -437,17 +200,6 @@ func TestIsPVRNew(t *testing.T) {
}
}
func TestIsPodOnNode(t *testing.T) {
pod := &corev1api.Pod{}
assert.False(t, isPodOnNode(pod, "bar"))
pod.Spec.NodeName = "foo"
assert.False(t, isPodOnNode(pod, "bar"))
pod.Spec.NodeName = "bar"
assert.True(t, isPodOnNode(pod, "bar"))
}
func TestIsResticContainerRunning(t *testing.T) {
tests := []struct {
name string
@@ -720,3 +472,44 @@ func TestGetResticInitContainerIndex(t *testing.T) {
})
}
}
func TestFindVolumeRestoresForPod(t *testing.T) {
pod := &corev1api.Pod{}
pod.UID = "uid"
scheme := runtime.NewScheme()
scheme.AddKnownTypes(velerov1api.SchemeGroupVersion, &velerov1api.PodVolumeRestore{}, &velerov1api.PodVolumeRestoreList{})
clientBuilder := fake.NewClientBuilder().WithScheme(scheme)
// no matching PVR
reconciler := &PodVolumeRestoreReconciler{
Client: clientBuilder.Build(),
logger: logrus.New(),
}
requests := reconciler.findVolumeRestoresForPod(pod)
assert.Len(t, requests, 0)
// contain one matching PVR
reconciler.Client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{
Items: []velerov1api.PodVolumeRestore{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvr1",
Labels: map[string]string{
velerov1api.PodUIDLabel: string(pod.GetUID()),
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvr2",
Labels: map[string]string{
velerov1api.PodUIDLabel: "non-matching-uid",
},
},
},
},
}).Build()
requests = reconciler.findVolumeRestoresForPod(pod)
assert.Len(t, requests, 1)
}

View File

@@ -21,8 +21,6 @@ import (
"fmt"
"time"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
@@ -35,7 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// These annotations are taken from the Kubernetes persistent volume/persistent volume claim controller.
@@ -117,8 +115,7 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
// GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods/<podUID>/volumes/,
// where the specified volume lives.
// For volumes with a CSIVolumeSource, append "/mount" to the directory name.
func GetVolumeDirectory(log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, pvcLister corev1listers.PersistentVolumeClaimLister,
pvLister corev1listers.PersistentVolumeLister, client client.Client) (string, error) {
func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) {
var volume *corev1api.Volume
for _, item := range pod.Spec.Volumes {
@@ -142,18 +139,20 @@ func GetVolumeDirectory(log logrus.FieldLogger, pod *corev1api.Pod, volumeName s
}
// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
pvc, err := pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.VolumeSource.PersistentVolumeClaim.ClaimName)
pvc := &corev1api.PersistentVolumeClaim{}
err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc)
if err != nil {
return "", errors.WithStack(err)
}
pv, err := pvLister.Get(pvc.Spec.VolumeName)
pv := &corev1api.PersistentVolume{}
err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
if err != nil {
return "", errors.WithStack(err)
}
// PV's been created with a CSI source.
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, client)
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
if err != nil {
return "", errors.WithStack(err)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package kube
import (
"context"
"encoding/json"
"testing"
"time"
@@ -33,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeinformers "k8s.io/client-go/informers"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
@@ -202,22 +202,18 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
csiDriver := storagev1api.CSIDriver{
ObjectMeta: metav1.ObjectMeta{Name: "csi.test.com"},
}
kbClient := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}}).Build()
for _, tc := range tests {
h := newHarness(t)
pvcInformer := kubeinformers.NewSharedInformerFactoryWithOptions(h.KubeClient, 0, kubeinformers.WithNamespace("ns-1")).Core().V1().PersistentVolumeClaims()
pvInformer := kubeinformers.NewSharedInformerFactory(h.KubeClient, 0).Core().V1().PersistentVolumes()
clientBuilder := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}})
if tc.pvc != nil {
require.NoError(t, pvcInformer.Informer().GetStore().Add(tc.pvc))
clientBuilder = clientBuilder.WithObjects(tc.pvc)
}
if tc.pv != nil {
require.NoError(t, pvInformer.Informer().GetStore().Add(tc.pv))
clientBuilder = clientBuilder.WithObjects(tc.pv)
}
// Function under test
dir, err := GetVolumeDirectory(logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, pvcInformer.Lister(), pvInformer.Lister(), kbClient)
dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
require.NoError(t, err)
assert.Equal(t, tc.want, dir)