Convert PodVolumeBackup to Kubebuilder framework

Signed-off-by: F. Gold <fgold@vmware.com>
This commit is contained in:
F. Gold
2022-01-14 16:24:59 -08:00
parent b40bbda2d6
commit 2dd85c9c42
21 changed files with 747 additions and 546 deletions

View File

@@ -1,5 +1,5 @@
/*
Copyright the Velero contributors.
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -18,422 +18,166 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type podVolumeBackupController struct {
*genericController
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter
podVolumeBackupLister listers.PodVolumeBackupLister
podLister corev1listers.PodLister
pvcLister corev1listers.PersistentVolumeClaimLister
pvLister corev1listers.PersistentVolumeLister
kbClient client.Client
nodeName string
metrics *metrics.ServerMetrics
credentialsFileStore credentials.FileStore
processBackupFunc func(*velerov1api.PodVolumeBackup) error
fileSystem filesystem.Interface
clock clock.Clock
// BackupExecuter runs backups.
type BackupExecuter interface {
RunBackup(*restic.Command, logrus.FieldLogger, func(velerov1api.PodVolumeOperationProgress)) (string, string, error)
GetSnapshotID(*restic.Command) (string, error)
}
// NewPodVolumeBackupController creates a new pod volume backup controller.
func NewPodVolumeBackupController(
logger logrus.FieldLogger,
podVolumeBackupInformer informers.PodVolumeBackupInformer,
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter,
podInformer cache.SharedIndexInformer,
pvcInformer corev1informers.PersistentVolumeClaimInformer,
pvInformer corev1informers.PersistentVolumeInformer,
metrics *metrics.ServerMetrics,
kbClient client.Client,
nodeName string,
credentialsFileStore credentials.FileStore,
) Interface {
c := &podVolumeBackupController{
genericController: newGenericController(PodVolumeBackup, logger),
podVolumeBackupClient: podVolumeBackupClient,
podVolumeBackupLister: podVolumeBackupInformer.Lister(),
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
pvcLister: pvcInformer.Lister(),
pvLister: pvInformer.Lister(),
kbClient: kbClient,
nodeName: nodeName,
metrics: metrics,
credentialsFileStore: credentialsFileStore,
// PodVolumeBackupReconciler reconciles a PodVolumeBackup object
type PodVolumeBackupReconciler struct {
Scheme *runtime.Scheme
Client client.Client
Clock clock.Clock
Metrics *metrics.ServerMetrics
CredsFileStore credentials.FileStore
NodeName string
FileSystem filesystem.Interface
ResticExec BackupExecuter
Log logrus.FieldLogger
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
}
c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(
c.cacheSyncWaiters,
podVolumeBackupInformer.Informer().HasSynced,
podInformer.HasSynced,
pvcInformer.Informer().HasSynced,
)
c.processBackupFunc = c.processBackup
podVolumeBackupInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.pvbHandler,
UpdateFunc: func(_, obj interface{}) { c.pvbHandler(obj) },
},
)
return c
PvLister corev1listers.PersistentVolumeLister
PvcLister corev1listers.PersistentVolumeClaimLister
}
func (c *podVolumeBackupController) pvbHandler(obj interface{}) {
req := obj.(*velerov1api.PodVolumeBackup)
// only enqueue items for this node
if req.Spec.Node != c.nodeName {
return
}
log := loggerForPodVolumeBackup(c.logger, req)
if req.Status.Phase != "" && req.Status.Phase != velerov1api.PodVolumeBackupPhaseNew {
log.Debug("Backup is not new, not enqueuing")
return
}
c.metrics.RegisterPodVolumeBackupEnqueue(c.nodeName)
log.Debug("Enqueueing")
c.enqueue(obj)
}
func (c *podVolumeBackupController) processQueueItem(key string) error {
log := c.logger.WithField("key", key)
log.Debug("Running processQueueItem")
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.WithError(err).Error("error splitting queue key")
return nil
}
req, err := c.podVolumeBackupLister.PodVolumeBackups(ns).Get(name)
if apierrors.IsNotFound(err) {
log.Debug("Unable to find PodVolumeBackup")
return nil
}
if err != nil {
return errors.Wrap(err, "error getting PodVolumeBackup")
}
// only process new items
switch req.Status.Phase {
case "", velerov1api.PodVolumeBackupPhaseNew:
default:
return nil
}
// Don't mutate the shared cache
reqCopy := req.DeepCopy()
return c.processBackupFunc(reqCopy)
}
func loggerForPodVolumeBackup(baseLogger logrus.FieldLogger, req *velerov1api.PodVolumeBackup) logrus.FieldLogger {
log := baseLogger.WithFields(logrus.Fields{
"namespace": req.Namespace,
"name": req.Name,
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups/status,verbs=get;update;patch
func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithFields(logrus.Fields{
"controller": "podvolumebackup",
"podvolumebackup": req.NamespacedName,
})
if len(req.OwnerReferences) == 1 {
log = log.WithField("backup", fmt.Sprintf("%s/%s", req.Namespace, req.OwnerReferences[0].Name))
}
return log
}
func getOwningBackup(req *velerov1api.PodVolumeBackup) string {
return fmt.Sprintf("%s/%s", req.Namespace, req.OwnerReferences[0].Name)
}
func (c *podVolumeBackupController) processBackup(req *velerov1api.PodVolumeBackup) error {
log := loggerForPodVolumeBackup(c.logger, req)
log.Info("Backup starting")
var err error
// update status to InProgress
req, err = c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) {
r.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress
r.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()}
})
if err != nil {
log.WithError(err).Error("Error setting PodVolumeBackup StartTimestamp and phase to InProgress")
return errors.WithStack(err)
}
pod, err := c.podLister.Pods(req.Spec.Pod.Namespace).Get(req.Spec.Pod.Name)
if err != nil {
log.WithError(err).Errorf("Error getting pod %s/%s", req.Spec.Pod.Namespace, req.Spec.Pod.Name)
return c.fail(req, errors.Wrap(err, "error getting pod").Error(), log)
}
volumeDir, err := kube.GetVolumeDirectory(log, pod, req.Spec.Volume, c.pvcLister, c.pvLister, c.kbClient)
if err != nil {
log.WithError(err).Error("Error getting volume directory name")
return c.fail(req, errors.Wrap(err, "error getting volume directory name").Error(), log)
}
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir)
log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := singlePathMatch(pathGlob)
if err != nil {
log.WithError(err).Error("Error uniquely identifying volume path")
return c.fail(req, errors.Wrap(err, "error getting volume path on host").Error(), log)
}
log.WithField("path", path).Debugf("Found path matching glob")
// temp creds
credentialsFile, err := c.credentialsFileStore.Path(restic.RepoKeySelector())
if err != nil {
log.WithError(err).Error("Error creating temp restic credentials file")
return c.fail(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log)
}
// ignore error since there's nothing we can do and it's a temp file.
defer os.Remove(credentialsFile)
resticCmd := restic.BackupCommand(
req.Spec.RepoIdentifier,
credentialsFile,
path,
req.Spec.Tags,
)
backupLocation := &velerov1api.BackupStorageLocation{}
if err := c.kbClient.Get(context.Background(), client.ObjectKey{
Namespace: req.Namespace,
Name: req.Spec.BackupStorageLocation,
}, backupLocation); err != nil {
return c.fail(req, errors.Wrap(err, "error getting backup storage location").Error(), log)
}
// if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic
var caCertFile string
if backupLocation.Spec.ObjectStorage != nil && backupLocation.Spec.ObjectStorage.CACert != nil {
caCertFile, err = restic.TempCACertFile(backupLocation.Spec.ObjectStorage.CACert, req.Spec.BackupStorageLocation, c.fileSystem)
if err != nil {
log.WithError(err).Error("Error creating temp cacert file")
var pvb velerov1api.PodVolumeBackup
if err := r.Client.Get(ctx, req.NamespacedName, &pvb); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find PodVolumeBackup")
return ctrl.Result{}, nil
}
// ignore error since there's nothing we can do and it's a temp file.
defer os.Remove(caCertFile)
return ctrl.Result{}, errors.Wrap(err, "getting PodVolumeBackup")
}
resticCmd.CACertFile = caCertFile
env, err := restic.CmdEnv(backupLocation, c.credentialsFileStore)
if len(pvb.OwnerReferences) == 1 {
log = log.WithField(
"backup",
fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name),
)
}
log.Info("PodVolumeBackup starting")
// Only process items for this node.
if pvb.Spec.Node != r.NodeName {
return ctrl.Result{}, nil
}
// Only process new items.
if pvb.Status.Phase != "" && pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseNew {
log.Debug("PodVolumeBackup is not new, not processing")
return ctrl.Result{}, nil
}
r.Metrics.RegisterPodVolumeBackupEnqueue(r.NodeName)
// Update status to InProgress.
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress
pvb.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
var pod corev1.Pod
podNamespacedName := client.ObjectKey{
Namespace: pvb.Spec.Pod.Namespace,
Name: pvb.Spec.Pod.Name,
}
if err := r.Client.Get(ctx, podNamespacedName, &pod); err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name))
}
var resticDetails resticDetails
resticCmd, err := r.buildResticCommand(ctx, log, &pvb, &pod, &resticDetails)
if err != nil {
return c.fail(req, errors.Wrap(err, "error setting restic cmd env").Error(), log)
return r.updateStatusToFailed(ctx, &pvb, err, "building Restic command")
}
resticCmd.Env = env
// If this is a PVC, look for the most recent completed pod volume backup for it and get
// its restic snapshot ID to use as the value of the `--parent` flag. Without this,
// if the pod using the PVC (and therefore the directory path under /host_pods/) has
// changed since the PVC's last backup, restic will not be able to identify a suitable
// parent snapshot to use, and will have to do a full rescan of the contents of the PVC.
if pvcUID, ok := req.Labels[velerov1api.PVCUIDLabel]; ok {
parentSnapshotID := getParentSnapshot(log, pvcUID, req.Spec.BackupStorageLocation, c.podVolumeBackupLister.PodVolumeBackups(req.Namespace))
if parentSnapshotID == "" {
log.Info("No parent snapshot found for PVC, not using --parent flag for this backup")
} else {
log.WithField("parentSnapshotID", parentSnapshotID).Info("Setting --parent flag for this backup")
resticCmd.ExtraFlags = append(resticCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshotID))
}
}
var stdout, stderr string
defer os.Remove(resticDetails.credsFile)
var emptySnapshot bool
if stdout, stderr, err = restic.RunBackup(resticCmd, log, c.updateBackupProgressFunc(req, log)); err != nil {
stdout, stderr, err := r.ResticExec.RunBackup(resticCmd, log, r.updateBackupProgressFunc(&pvb, log))
if err != nil {
if strings.Contains(stderr, "snapshot is empty") {
emptySnapshot = true
} else {
log.WithError(errors.WithStack(err)).Errorf("Error running command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
return c.fail(req, fmt.Sprintf("error running restic backup, stderr=%s: %s", stderr, err.Error()), log)
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%s", stderr))
}
}
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
var snapshotID string
if !emptySnapshot {
cmd := restic.GetSnapshotCommand(req.Spec.RepoIdentifier, credentialsFile, req.Spec.Tags)
cmd.Env = env
cmd.CACertFile = caCertFile
cmd := restic.GetSnapshotCommand(pvb.Spec.RepoIdentifier, resticDetails.credsFile, pvb.Spec.Tags)
cmd.Env = resticDetails.envs
cmd.CACertFile = resticDetails.caCertFile
snapshotID, err = restic.GetSnapshotID(cmd)
snapshotID, err = r.ResticExec.GetSnapshotID(cmd)
if err != nil {
log.WithError(err).Error("Error getting SnapshotID")
return c.fail(req, errors.Wrap(err, "error getting snapshot id").Error(), log)
return r.updateStatusToFailed(ctx, &pvb, err, "getting snapshot id")
}
}
// update status to Completed with path & snapshot id
req, err = c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) {
r.Status.Path = path
r.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted
r.Status.SnapshotID = snapshotID
r.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
if emptySnapshot {
r.Status.Message = "volume was empty so no snapshot was taken"
}
})
if err != nil {
log.WithError(err).Error("Error setting PodVolumeBackup phase to Completed")
return err
// Update status to Completed with path & snapshot ID.
pvb.Status.Path = resticDetails.path
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted
pvb.Status.SnapshotID = snapshotID
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if emptySnapshot {
pvb.Status.Message = "volume was empty so no snapshot was taken"
}
latencyDuration := req.Status.CompletionTimestamp.Time.Sub(req.Status.StartTimestamp.Time)
latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time)
latencySeconds := float64(latencyDuration / time.Second)
backupName := getOwningBackup(req)
c.metrics.ObserveResticOpLatency(c.nodeName, req.Name, resticCmd.Command, backupName, latencySeconds)
c.metrics.RegisterResticOpLatencyGauge(c.nodeName, req.Name, resticCmd.Command, backupName, latencySeconds)
c.metrics.RegisterPodVolumeBackupDequeue(c.nodeName)
log.Info("Backup completed")
backupName := fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name)
r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, resticCmd.Command, backupName, latencySeconds)
r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, resticCmd.Command, backupName, latencySeconds)
r.Metrics.RegisterPodVolumeBackupDequeue(r.NodeName)
log.Info("PodVolumeBackup completed")
return nil
if err := r.Client.Update(ctx, &pvb); err != nil {
log.WithError(err).Error("updating PodVolumeBackup resource")
}
return ctrl.Result{}, nil
}
// getParentSnapshot finds the most recent completed pod volume backup for the specified PVC and returns its
// restic snapshot ID. Any errors encountered are logged but not returned since they do not prevent a backup
// from proceeding.
func getParentSnapshot(log logrus.FieldLogger, pvcUID, backupStorageLocation string, podVolumeBackupLister listers.PodVolumeBackupNamespaceLister) string {
log = log.WithField("pvcUID", pvcUID)
log.Infof("Looking for most recent completed pod volume backup for this PVC")
pvcBackups, err := podVolumeBackupLister.List(labels.SelectorFromSet(map[string]string{velerov1api.PVCUIDLabel: pvcUID}))
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing pod volume backups for PVC")
return ""
}
// go through all the pod volume backups for the PVC and look for the most recent completed one
// to use as the parent.
var mostRecentBackup *velerov1api.PodVolumeBackup
for _, backup := range pvcBackups {
if backup.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted {
continue
}
if backupStorageLocation != backup.Spec.BackupStorageLocation {
// Check the backup storage location is the same as spec in order to support backup to multiple backup-locations.
// Otherwise, there exists a case that backup volume snapshot to the second location would failed, since the founded
// parent ID is only valid for the first backup location, not the second backup location.
// Also, the second backup should not use the first backup parent ID since its for the first backup location only.
continue
}
if mostRecentBackup == nil || backup.Status.StartTimestamp.After(mostRecentBackup.Status.StartTimestamp.Time) {
mostRecentBackup = backup
}
}
if mostRecentBackup == nil {
log.Info("No completed pod volume backup found for PVC")
return ""
}
log.WithFields(map[string]interface{}{
"parentPodVolumeBackup": mostRecentBackup.Name,
"parentSnapshotID": mostRecentBackup.Status.SnapshotID,
}).Info("Found most recent completed pod volume backup for PVC")
return mostRecentBackup.Status.SnapshotID
// SetupWithManager registers the PVB controller.
func (r *PodVolumeBackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.PodVolumeBackup{}).
Complete(r)
}
func (c *podVolumeBackupController) patchPodVolumeBackup(req *velerov1api.PodVolumeBackup, mutate func(*velerov1api.PodVolumeBackup)) (*velerov1api.PodVolumeBackup, error) {
// Record original json
oldData, err := json.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original PodVolumeBackup")
}
// Mutate
mutate(req)
// Record new json
newData, err := json.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated PodVolumeBackup")
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PodVolumeBackup")
}
req, err = c.podVolumeBackupClient.PodVolumeBackups(req.Namespace).Patch(context.TODO(), req.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PodVolumeBackup")
}
return req, nil
}
// updateBackupProgressFunc returns a func that takes progress info and patches
// the PVB with the new progress
func (c *podVolumeBackupController) updateBackupProgressFunc(req *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
if _, err := c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) {
r.Status.Progress = progress
}); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup progress")
}
}
}
func (c *podVolumeBackupController) fail(req *velerov1api.PodVolumeBackup, msg string, log logrus.FieldLogger) error {
if _, err := c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) {
r.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
r.Status.Message = msg
r.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
}); err != nil {
log.WithError(err).Error("Error setting PodVolumeBackup phase to Failed")
return err
}
return nil
}
func singlePathMatch(path string) (string, error) {
matches, err := filepath.Glob(path)
func (r *PodVolumeBackupReconciler) singlePathMatch(path string) (string, error) {
matches, err := r.FileSystem.Glob(path)
if err != nil {
return "", errors.WithStack(err)
}
@@ -444,3 +188,154 @@ func singlePathMatch(path string) (string, error) {
return matches[0], nil
}
// getParentSnapshot finds the most recent completed PodVolumeBackup for the
// specified PVC and returns its Restic snapshot ID. Any errors encountered are
// logged but not returned since they do not prevent a backup from proceeding.
func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log logrus.FieldLogger, pvbNamespace, pvcUID, bsl string) string {
log = log.WithField("pvcUID", pvcUID)
log.Infof("Looking for most recent completed PodVolumeBackup for this PVC")
listOpts := &client.ListOptions{
Namespace: pvbNamespace,
}
matchingLabels := client.MatchingLabels(map[string]string{velerov1api.PVCUIDLabel: pvcUID})
matchingLabels.ApplyToList(listOpts)
var pvbList velerov1api.PodVolumeBackupList
if err := r.Client.List(ctx, &pvbList, listOpts); err != nil {
log.WithError(errors.WithStack(err)).Error("getting list of podvolumebackups for this PVC")
}
// Go through all the podvolumebackups for the PVC and look for the most
// recent completed one to use as the parent.
var mostRecentPVB *velerov1api.PodVolumeBackup
for _, pvb := range pvbList.Items {
if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted {
continue
}
if bsl != pvb.Spec.BackupStorageLocation {
// Check the backup storage location is the same as spec in order to
// support backup to multiple backup-locations. Otherwise, there exists
// a case that backup volume snapshot to the second location would
// failed, since the founded parent ID is only valid for the first
// backup location, not the second backup location. Also, the second
// backup should not use the first backup parent ID since its for the
// first backup location only.
continue
}
if mostRecentPVB == nil || pvb.Status.StartTimestamp.After(mostRecentPVB.Status.StartTimestamp.Time) {
mostRecentPVB = &pvb
}
}
if mostRecentPVB == nil {
log.Info("No completed PodVolumeBackup found for PVC")
return ""
}
log.WithFields(map[string]interface{}{
"parentPodVolumeBackup": mostRecentPVB.Name,
"parentSnapshotID": mostRecentPVB.Status.SnapshotID,
}).Info("Found most recent completed PodVolumeBackup for PVC")
return mostRecentPVB.Status.SnapshotID
}
// updateBackupProgressFunc returns a func that takes progress info and patches
// the PVB with the new progress.
func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
pvb.Status.Progress = progress
}
}
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string) (ctrl.Result, error) {
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = msg
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.Client.Update(ctx, pvb); err != nil {
return ctrl.Result{}, errors.Wrap(err, "updating PodVolumeBackup resource with failed status")
}
return ctrl.Result{}, errors.Wrap(err, msg)
}
type resticDetails struct {
credsFile, caCertFile string
envs []string
path string
}
func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log *logrus.Entry, pvb *velerov1api.PodVolumeBackup, pod *corev1.Pod, details *resticDetails) (*restic.Command, error) {
volDir, err := kube.GetVolumeDirectory(log, pod, pvb.Spec.Volume, r.PvcLister, r.PvLister, r.Client)
if err != nil {
return nil, errors.Wrap(err, "getting volume directory name")
}
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir)
log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := r.singlePathMatch(pathGlob)
if err != nil {
return nil, errors.Wrap(err, "identifying unique volume path on host")
}
log.WithField("path", path).Debugf("Found path matching glob")
// Temporary credentials.
details.credsFile, err = r.CredsFileStore.Path(restic.RepoKeySelector())
if err != nil {
return nil, errors.Wrap(err, "creating temporary Restic credentials file")
}
cmd := restic.BackupCommand(pvb.Spec.RepoIdentifier, details.credsFile, path, pvb.Spec.Tags)
backupLocation := &velerov1api.BackupStorageLocation{}
if err := r.Client.Get(context.Background(), client.ObjectKey{
Namespace: pvb.Namespace,
Name: pvb.Spec.BackupStorageLocation,
}, backupLocation); err != nil {
return nil, errors.Wrap(err, "getting backup storage location")
}
// If there's a caCert on the ObjectStorage, write it to disk so that it can
// be passed to Restic.
if backupLocation.Spec.ObjectStorage != nil &&
backupLocation.Spec.ObjectStorage.CACert != nil {
details.caCertFile, err = restic.TempCACertFile(backupLocation.Spec.ObjectStorage.CACert, pvb.Spec.BackupStorageLocation, r.FileSystem)
if err != nil {
log.WithError(err).Error("creating temporary caCert file")
}
defer os.Remove(details.caCertFile)
}
cmd.CACertFile = details.caCertFile
details.envs, err = restic.CmdEnv(backupLocation, r.CredsFileStore)
if err != nil {
return nil, errors.Wrap(err, "setting Restic command environment")
}
cmd.Env = details.envs
// If this is a PVC, look for the most recent completed PodVolumeBackup for
// it and get its Restic snapshot ID to use as the value of the `--parent`
// flag. Without this, if the pod using the PVC (and therefore the directory
// path under /host_pods/) has changed since the PVC's last backup, Restic
// will not be able to identify a suitable parent snapshot to use, and will
// have to do a full rescan of the contents of the PVC.
if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok {
parentSnapshotID := r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation)
if parentSnapshotID == "" {
log.Info("No parent snapshot found for PVC, not using --parent flag for this backup")
} else {
log.WithField("parentSnapshotID", parentSnapshotID).
Info("Setting --parent flag for this backup")
cmd.ExtraFlags = append(cmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshotID))
}
}
return cmd, nil
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2018 the Velero contributors.
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -17,157 +17,277 @@ limitations under the License.
package controller
import (
"testing"
"context"
"fmt"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestPVBHandler(t *testing.T) {
controllerNode := "foo"
const name = "pvb-1"
tests := []struct {
name string
obj *velerov1api.PodVolumeBackup
shouldEnqueue bool
}{
{
name: "Empty phase pvb on same node should be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: controllerNode,
},
func pvbBuilder() *builder.PodVolumeBackupBuilder {
return builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, name).
PodNamespace(velerov1api.DefaultNamespace).
PodName(name).
Volume("pvb-1-volume").
BackupStorageLocation("bsl-loc").
ObjectMeta(
func(obj metav1.Object) {
obj.SetOwnerReferences([]metav1.OwnerReference{{Name: name}})
},
shouldEnqueue: true,
},
{
name: "New phase pvb on same node should be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: controllerNode,
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseNew,
},
},
shouldEnqueue: true,
},
{
name: "InProgress phase pvb on same node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: controllerNode,
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseInProgress,
},
},
shouldEnqueue: false,
},
{
name: "Completed phase pvb on same node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: controllerNode,
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseCompleted,
},
},
shouldEnqueue: false,
},
{
name: "Failed phase pvb on same node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: controllerNode,
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseFailed,
},
},
shouldEnqueue: false,
},
{
name: "Empty phase pvb on different node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: "some-other-node",
},
},
shouldEnqueue: false,
},
{
name: "New phase pvb on different node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: "some-other-node",
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseNew,
},
},
shouldEnqueue: false,
},
{
name: "InProgress phase pvb on different node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: "some-other-node",
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseInProgress,
},
},
shouldEnqueue: false,
},
{
name: "Completed phase pvb on different node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: "some-other-node",
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseCompleted,
},
},
shouldEnqueue: false,
},
{
name: "Failed phase pvb on different node should not be enqueued",
obj: &velerov1api.PodVolumeBackup{
Spec: velerov1api.PodVolumeBackupSpec{
Node: "some-other-node",
},
Status: velerov1api.PodVolumeBackupStatus{
Phase: velerov1api.PodVolumeBackupPhaseFailed,
},
},
shouldEnqueue: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := &podVolumeBackupController{
genericController: newGenericController(PodVolumeBackup, velerotest.NewLogger()),
nodeName: controllerNode,
metrics: metrics.NewResticServerMetrics(),
}
c.pvbHandler(test.obj)
if !test.shouldEnqueue {
assert.Equal(t, 0, c.queue.Len())
return
}
require.Equal(t, 1, c.queue.Len())
})
}
)
}
func podBuilder() *builder.PodBuilder {
return builder.
ForPod(velerov1api.DefaultNamespace, name).
Volumes(&corev1.Volume{Name: "pvb-1-volume"})
}
func bslBuilder() *builder.BackupStorageLocationBuilder {
return builder.
ForBackupStorageLocation(velerov1api.DefaultNamespace, "bsl-loc")
}
var _ = Describe("PodVolumeBackup Reconciler", func() {
type request struct {
pvb *velerov1api.PodVolumeBackup
pod *corev1.Pod
bsl *velerov1api.BackupStorageLocation
expectedProcessed bool
expected *velerov1api.PodVolumeBackup
expectedRequeue ctrl.Result
expectedErrMsg string
}
// `now` will be used to set the fake clock's time; capture
// it here so it can be referenced in the test case defs.
now, err := time.Parse(time.RFC1123, time.RFC1123)
Expect(err).To(BeNil())
now = now.Local()
DescribeTable("a pod volume backup",
func(test request) {
ctx := context.Background()
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
err = fakeClient.Create(ctx, test.pvb)
Expect(err).To(BeNil())
err = fakeClient.Create(ctx, test.pod)
Expect(err).To(BeNil())
err = fakeClient.Create(ctx, test.bsl)
Expect(err).To(BeNil())
fakeFS := velerotest.NewFakeFileSystem()
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", "pvb-1-volume")
_, err = fakeFS.Create(pathGlob)
Expect(err).To(BeNil())
// Setup reconciler
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
r := PodVolumeBackupReconciler{
Client: fakeClient,
Clock: clock.NewFakeClock(now),
Metrics: metrics.NewResticServerMetrics(),
CredsFileStore: fakeCredsFileStore{},
NodeName: "test_node",
FileSystem: fakeFS,
ResticExec: mocks.FakeResticBackupExec{},
Log: velerotest.NewLogger(),
}
actualResult, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: velerov1api.DefaultNamespace,
Name: test.pvb.Name,
},
})
Expect(actualResult).To(BeEquivalentTo(test.expectedRequeue))
if test.expectedErrMsg == "" {
Expect(err).To(BeNil())
} else {
Expect(err.Error()).To(BeEquivalentTo(test.expectedErrMsg))
}
pvb := velerov1api.PodVolumeBackup{}
err = r.Client.Get(ctx, kbclient.ObjectKey{
Name: test.pvb.Name,
Namespace: test.pvb.Namespace,
}, &pvb)
// Assertions
if test.expected == nil {
Expect(apierrors.IsNotFound(err)).To(BeTrue())
} else {
Expect(err).To(BeNil())
Eventually(pvb.Status.Phase).Should(Equal(test.expected.Status.Phase))
}
// Processed PVBs will have completion timestamps.
if test.expectedProcessed == true {
Expect(pvb.Status.CompletionTimestamp).ToNot(BeNil())
}
// Unprocessed PVBs will not have completion timestamps.
if test.expectedProcessed == false {
Expect(pvb.Status.CompletionTimestamp).To(BeNil())
}
},
Entry("empty phase pvb on same node should be processed", request{
pvb: pvbBuilder().Phase("").Node("test_node").Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: true,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("new phase pvb on same node should be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseNew).
Node("test_node").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: true,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("in progress phase pvb on same node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Node("test_node").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("completed phase pvb on same node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Node("test_node").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("failed phase pvb on same node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Node("test_node").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("empty phase pvb on different node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Node("test_node_2").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("new phase pvb on different node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseNew).
Node("test_node_2").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseNew).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("in progress phase pvb on different node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Node("test_node_2").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("completed phase pvb on different node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Node("test_node_2").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Result(),
expectedRequeue: ctrl.Result{},
}),
Entry("failed phase pvb on different node should not be processed", request{
pvb: pvbBuilder().
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Node("test_node_2").
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
Result(),
expectedRequeue: ctrl.Result{},
}),
)
})
type fakeCredsFileStore struct{}
func (f fakeCredsFileStore) Path(selector *corev1.SecretKeySelector) (string, error) {
return "/fake/path", nil
}

View File

@@ -1,5 +1,5 @@
/*
Copyright the Velero contributors.
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -323,6 +323,19 @@ func (c *podVolumeRestoreController) processRestore(req *velerov1api.PodVolumeRe
return nil
}
func singlePathMatch(path string) (string, error) {
matches, err := filepath.Glob(path)
if err != nil {
return "", errors.WithStack(err)
}
if len(matches) != 1 {
return "", errors.Errorf("expected one matching path, got %d", len(matches))
}
return matches[0], nil
}
func (c *podVolumeRestoreController) restorePodVolume(req *velerov1api.PodVolumeRestore, volumeDir string, log logrus.FieldLogger) error {
// Get the full path of the new volume's directory as mounted in the daemonset pod, which
// will look like: /host_pods/<new-pod-uid>/volumes/<volume-plugin-name>/<volume-dir>

View File

@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,5 +1,5 @@
/*
Copyright 2017 the Velero contributors.
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.