Merge pull request #5864 from blackpiglet/restore_controller_use_controller_runtime

This commit is contained in:
Shubham Pampattiwar
2023-02-25 13:25:15 -08:00
committed by GitHub
5 changed files with 268 additions and 294 deletions

View File

@@ -0,0 +1 @@
Make restore controller adopting the controller-runtime framework.

View File

@@ -27,7 +27,7 @@ import (
"strings"
"time"
"github.com/bombsimon/logrusr/v3"
logrusr "github.com/bombsimon/logrusr/v3"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
@@ -47,7 +47,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
clocks "k8s.io/utils/clock"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -665,50 +665,12 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}
restoreControllerRunInfo := func() controllerRunInfo {
restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
s.config.restoreResourcePriorities,
s.kubeClient.CoreV1().Namespaces(),
podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.kubeClient, s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.resourceTerminatingTimeout,
s.logger,
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.kubeClient.CoreV1().RESTClient(),
s.credentialFileStore,
s.mgr.GetClient(),
)
cmd.CheckError(err)
restoreController := controller.NewRestoreController(
s.namespace,
s.sharedInformerFactory.Velero().V1().Restores(),
restorer,
s.mgr.GetClient(),
s.logger,
s.logLevel,
newPluginManager,
backupStoreGetter,
s.metrics,
s.config.formatFlag.Parse(),
)
return controllerRunInfo{
controller: restoreController,
numWorkers: defaultControllerWorkers,
}
}
// By far, PodVolumeBackup, PodVolumeRestore, BackupStorageLocation controllers
// are not included in --disable-controllers list.
// This is because of PVB and PVR are used by node agent DaemonSet,
// and BSL controller is mandatory for Velero to work.
enabledControllers := map[string]func() controllerRunInfo{
controller.Backup: backupControllerRunInfo,
controller.Restore: restoreControllerRunInfo,
controller.Backup: backupControllerRunInfo,
}
// Note: all runtime type controllers that can be disabled are grouped separately, below:
enabledRuntimeControllers := map[string]struct{}{
@@ -719,6 +681,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
controller.BackupDeletion: {},
controller.GarbageCollection: {},
controller.BackupSync: {},
controller.Restore: {},
}
if s.config.restoreOnly {
@@ -815,7 +778,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.mgr.GetClient(),
s.ctx,
s.pluginRegistry,
clocks.RealClock{},
clock.RealClock{},
s.logger,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ServerStatusRequest)
@@ -825,7 +788,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
if _, ok := enabledRuntimeControllers[controller.DownloadRequest]; ok {
r := controller.NewDownloadRequestReconciler(
s.mgr.GetClient(),
clocks.RealClock{},
clock.RealClock{},
newPluginManager,
backupStoreGetter,
s.logger,
@@ -861,6 +824,43 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}
if _, ok := enabledRuntimeControllers[controller.Restore]; ok {
restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
s.config.restoreResourcePriorities,
s.kubeClient.CoreV1().Namespaces(),
podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.kubeClient, s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.resourceTerminatingTimeout,
s.logger,
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.kubeClient.CoreV1().RESTClient(),
s.credentialFileStore,
s.mgr.GetClient(),
)
cmd.CheckError(err)
r := controller.NewRestoreReconciler(
s.ctx,
s.namespace,
restorer,
s.mgr.GetClient(),
s.logger,
s.logLevel,
newPluginManager,
backupStoreGetter,
s.metrics,
s.config.formatFlag.Parse(),
)
if err = r.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "fail to create controller", "controller", controller.Restore)
}
}
// TODO(2.0): presuming all controllers and resources are converted to runtime-controller
// by v2.0, the block from this line and including the `s.mgr.Start() will be
// deprecated, since the manager auto-starts all the caches. Until then, we need to start the

View File

@@ -34,14 +34,13 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
clocks "k8s.io/utils/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/hook"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
@@ -85,24 +84,30 @@ var nonRestorableResources = []string{
"backuprepositories.velero.io",
}
type restoreController struct {
*genericController
type restoreReconciler struct {
ctx context.Context
namespace string
restorer pkgrestore.Restorer
kbClient client.Client
restoreLogLevel logrus.Level
logger logrus.FieldLogger
metrics *metrics.ServerMetrics
logFormat logging.Format
clock clocks.WithTickerAndDelayedExecution
clock clock.WithTickerAndDelayedExecution
newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
}
func NewRestoreController(
type backupInfo struct {
backup *api.Backup
location *api.BackupStorageLocation
backupStore persistence.BackupStore
}
func NewRestoreReconciler(
ctx context.Context,
namespace string,
restoreInformer velerov1informers.RestoreInformer,
restorer pkgrestore.Restorer,
kbClient client.Client,
logger logrus.FieldLogger,
@@ -111,16 +116,17 @@ func NewRestoreController(
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
logFormat logging.Format,
) Interface {
c := &restoreController{
genericController: newGenericController(Restore, logger),
namespace: namespace,
restorer: restorer,
kbClient: kbClient,
restoreLogLevel: restoreLogLevel,
metrics: metrics,
logFormat: logFormat,
clock: &clocks.RealClock{},
) *restoreReconciler {
r := &restoreReconciler{
ctx: ctx,
namespace: namespace,
restorer: restorer,
kbClient: kbClient,
logger: logger,
restoreLogLevel: restoreLogLevel,
metrics: metrics,
logFormat: logFormat,
clock: &clock.RealClock{},
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
@@ -128,95 +134,28 @@ func NewRestoreController(
backupStoreGetter: backupStoreGetter,
}
c.syncHandler = c.processQueueItem
c.resyncFunc = c.resync
c.resyncPeriod = time.Minute
// Move the periodical backup and restore metrics computing logic from controllers to here.
// This is due to, after controllers using controller-runtime, controllers doesn't have a
// timer as the before generic-controller, and the backup and restore controller only have
// one length queue, furthermore the backup and restore process could last for a long time.
// Compute the metric here is a better choice.
r.updateTotalRestoreMetric()
// restore informer cannot be removed, until restore controller adopt the controller-runtime framework.
restoreInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
restore := obj.(*api.Restore)
switch restore.Status.Phase {
case "", api.RestorePhaseNew:
// only process new restores
default:
c.logger.WithFields(logrus.Fields{
"restore": kubeutil.NamespaceAndName(restore),
"phase": restore.Status.Phase,
}).Debug("Restore is not new, skipping")
return
}
key, err := cache.MetaNamespaceKeyFunc(restore)
if err != nil {
c.logger.WithError(errors.WithStack(err)).WithField("restore", restore).Error("Error creating queue key, item not added to queue")
return
}
c.queue.Add(key)
},
},
)
return c
return r
}
func (c *restoreController) resync() {
restoreList := &velerov1api.RestoreList{}
err := c.kbClient.List(context.Background(), restoreList, &client.ListOptions{})
if err != nil {
c.logger.Error(err, "Error computing restore_total metric")
} else {
c.metrics.SetRestoreTotal(int64(len(restoreList.Items)))
}
}
func (c *restoreController) processQueueItem(key string) error {
log := c.logger.WithField("key", key)
log.Debug("Running processQueueItem")
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.WithError(err).Error("unable to process queue item: error splitting queue key")
// Return nil here so we don't try to process the key any more
return nil
}
log.Debug("Getting Restore")
restore := &velerov1api.Restore{}
err = c.kbClient.Get(context.Background(), client.ObjectKey{
Namespace: ns,
Name: name,
}, restore)
if err != nil {
return errors.Wrap(err, "error getting Restore")
}
// TODO I think this is now unnecessary. We only initially place
// item with Phase = ("" | New) into the queue. Items will only get
// re-queued if syncHandler returns an error, which will only
// happen if there's an error updating Phase from its initial
// state to something else. So any time it's re-queued it will
// still have its initial state, which we've already confirmed
// is ("" | New)
switch restore.Status.Phase {
case "", api.RestorePhaseNew:
// only process new restores
default:
return nil
}
// Deep-copy the restore so the copy from the lister is not modified.
// Any errors returned by processRestore will be bubbled up, meaning
// the key will be re-enqueued by the controller.
return c.processRestore(restore.DeepCopy())
}
func (c *restoreController) processRestore(restore *api.Restore) error {
func (r *restoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Developer note: any error returned by this method will
// cause the restore to be re-enqueued and re-processed by
// the controller.
log := r.logger.WithField("Restore", req.NamespacedName.String())
restore := &api.Restore{}
err := r.kbClient.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, restore)
if err != nil {
log.Infof("Fail to get restore %s: %s", req.NamespacedName.String(), err.Error())
return ctrl.Result{}, err
}
// store a copy of the original restore for creating patch
original := restore.DeepCopy()
@@ -225,67 +164,86 @@ func (c *restoreController) processRestore(restore *api.Restore) error {
// manager used here is not the same one used by c.runValidatedRestore,
// since within that function we want the plugin manager to log to
// our per-restore log (which is instantiated within c.runValidatedRestore).
pluginManager := c.newPluginManager(c.logger)
pluginManager := r.newPluginManager(r.logger)
defer pluginManager.CleanupClients()
info := c.validateAndComplete(restore, pluginManager)
info := r.validateAndComplete(restore, pluginManager)
// Register attempts after validation so we don't have to fetch the backup multiple times
backupScheduleName := restore.Spec.ScheduleName
c.metrics.RegisterRestoreAttempt(backupScheduleName)
r.metrics.RegisterRestoreAttempt(backupScheduleName)
if len(restore.Status.ValidationErrors) > 0 {
restore.Status.Phase = api.RestorePhaseFailedValidation
c.metrics.RegisterRestoreValidationFailed(backupScheduleName)
r.metrics.RegisterRestoreValidationFailed(backupScheduleName)
} else {
restore.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()}
restore.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
restore.Status.Phase = api.RestorePhaseInProgress
}
// patch to update status and persist to API
err := kubeutil.PatchResource(original, restore, c.kbClient)
err = kubeutil.PatchResource(original, restore, r.kbClient)
if err != nil {
// return the error so the restore can be re-processed; it's currently
// still in phase = New.
return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
log.Errorf("fail to update restore %s status to %s: %s",
req.NamespacedName.String(), restore.Status.Phase, err.Error())
return ctrl.Result{}, errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
}
// store ref to just-updated item for creating patch
original = restore.DeepCopy()
if restore.Status.Phase == api.RestorePhaseFailedValidation {
return nil
return ctrl.Result{}, nil
}
if err := c.runValidatedRestore(restore, info); err != nil {
c.logger.WithError(err).Debug("Restore failed")
if err := r.runValidatedRestore(restore, info); err != nil {
log.WithError(err).Debug("Restore failed")
restore.Status.Phase = api.RestorePhaseFailed
restore.Status.FailureReason = err.Error()
c.metrics.RegisterRestoreFailed(backupScheduleName)
r.metrics.RegisterRestoreFailed(backupScheduleName)
} else if restore.Status.Errors > 0 {
c.logger.Debug("Restore partially failed")
log.Debug("Restore partially failed")
restore.Status.Phase = api.RestorePhasePartiallyFailed
c.metrics.RegisterRestorePartialFailure(backupScheduleName)
r.metrics.RegisterRestorePartialFailure(backupScheduleName)
} else {
c.logger.Debug("Restore completed")
log.Debug("Restore completed")
restore.Status.Phase = api.RestorePhaseCompleted
c.metrics.RegisterRestoreSuccess(backupScheduleName)
r.metrics.RegisterRestoreSuccess(backupScheduleName)
}
restore.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
c.logger.Debug("Updating restore's final status")
if err = kubeutil.PatchResource(original, restore, c.kbClient); err != nil {
c.logger.WithError(errors.WithStack(err)).Info("Error updating restore's final status")
restore.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
log.Debug("Updating restore's final status")
if err = kubeutil.PatchResource(original, restore, r.kbClient); err != nil {
log.WithError(errors.WithStack(err)).Info("Error updating restore's final status")
// No need to re-enqueue here, because restore's already set to InProgress before.
// Controller only handle New restore.
}
return nil
return ctrl.Result{}, nil
}
type backupInfo struct {
backup *api.Backup
location *velerov1api.BackupStorageLocation
backupStore persistence.BackupStore
func (r *restoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithEventFilter(kubeutil.NewCreateEventPredicate(func(obj client.Object) bool {
restore := obj.(*api.Restore)
switch restore.Status.Phase {
case "", api.RestorePhaseNew:
// only process new restores
return true
default:
r.logger.WithFields(logrus.Fields{
"restore": kubeutil.NamespaceAndName(restore),
"phase": restore.Status.Phase,
}).Debug("Restore is not new, skipping")
return false
}
})).
For(&api.Restore{}).
Complete(r)
}
func (c *restoreController) validateAndComplete(restore *api.Restore, pluginManager clientmgmt.Manager) backupInfo {
func (r *restoreReconciler) validateAndComplete(restore *api.Restore, pluginManager clientmgmt.Manager) backupInfo {
// add non-restorable resources to restore's excluded resources
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
for _, nonrestorable := range nonRestorableResources {
@@ -345,11 +303,11 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana
// the schedule
if restore.Spec.ScheduleName != "" {
selector := labels.SelectorFromSet(labels.Set(map[string]string{
velerov1api.ScheduleNameLabel: restore.Spec.ScheduleName,
api.ScheduleNameLabel: restore.Spec.ScheduleName,
}))
backupList := &velerov1api.BackupList{}
c.kbClient.List(context.Background(), backupList, &client.ListOptions{
backupList := &api.BackupList{}
r.kbClient.List(context.Background(), backupList, &client.ListOptions{
LabelSelector: selector,
})
if err != nil {
@@ -368,7 +326,7 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana
}
}
info, err := c.fetchBackupInfo(restore.Spec.BackupName, pluginManager)
info, err := r.fetchBackupInfo(restore.Spec.BackupName, pluginManager)
if err != nil {
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
return backupInfo{}
@@ -376,7 +334,7 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana
// Fill in the ScheduleName so it's easier to consume for metrics.
if restore.Spec.ScheduleName == "" {
restore.Spec.ScheduleName = info.backup.GetLabels()[velerov1api.ScheduleNameLabel]
restore.Spec.ScheduleName = info.backup.GetLabels()[api.ScheduleNameLabel]
}
return info
@@ -423,22 +381,22 @@ func mostRecentCompletedBackup(backups []api.Backup) api.Backup {
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
// find it, it returns an error.
func (c *restoreController) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) {
backup := &velerov1api.Backup{}
err := c.kbClient.Get(context.Background(), types.NamespacedName{Namespace: c.namespace, Name: backupName}, backup)
func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) {
backup := &api.Backup{}
err := r.kbClient.Get(context.Background(), types.NamespacedName{Namespace: r.namespace, Name: backupName}, backup)
if err != nil {
return backupInfo{}, err
}
location := &velerov1api.BackupStorageLocation{}
if err := c.kbClient.Get(context.Background(), client.ObjectKey{
Namespace: c.namespace,
location := &api.BackupStorageLocation{}
if err := r.kbClient.Get(context.Background(), client.ObjectKey{
Namespace: r.namespace,
Name: backup.Spec.StorageLocation,
}, location); err != nil {
return backupInfo{}, errors.WithStack(err)
}
backupStore, err := c.backupStoreGetter.Get(location, pluginManager, c.logger)
backupStore, err := r.backupStoreGetter.Get(location, pluginManager, r.logger)
if err != nil {
return backupInfo{}, err
}
@@ -454,16 +412,16 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager cli
// The log and results files are uploaded to backup storage. Any error returned from this function
// means that the restore failed. This function updates the restore API object with warning and error
// counts, but *does not* update its phase or patch it via the API.
func (c *restoreController) runValidatedRestore(restore *api.Restore, info backupInfo) error {
func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backupInfo) error {
// instantiate the per-restore logger that will output both to a temp file
// (for upload to object storage) and to stdout.
restoreLog, err := newRestoreLogger(restore, c.restoreLogLevel, c.logFormat)
restoreLog, err := newRestoreLogger(restore, r.restoreLogLevel, r.logFormat)
if err != nil {
return err
}
defer restoreLog.closeAndRemove(c.logger)
defer restoreLog.closeAndRemove(r.logger)
pluginManager := c.newPluginManager(restoreLog)
pluginManager := r.newPluginManager(restoreLog)
defer pluginManager.CleanupClients()
actions, err := pluginManager.GetRestoreItemActionsV2()
@@ -482,16 +440,16 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
if err != nil {
return errors.Wrap(err, "error downloading backup")
}
defer closeAndRemoveFile(backupFile, c.logger)
defer closeAndRemoveFile(backupFile, r.logger)
listOpts := &client.ListOptions{
LabelSelector: labels.Set(map[string]string{
velerov1api.BackupNameLabel: label.GetValidName(restore.Spec.BackupName),
api.BackupNameLabel: label.GetValidName(restore.Spec.BackupName),
}).AsSelector(),
}
podVolumeBackupList := &velerov1api.PodVolumeBackupList{}
err = c.kbClient.List(context.TODO(), podVolumeBackupList, listOpts)
podVolumeBackupList := &api.PodVolumeBackupList{}
err = r.kbClient.List(context.TODO(), podVolumeBackupList, listOpts)
if err != nil {
restoreLog.Errorf("Fail to list PodVolumeBackup :%s", err.Error())
return errors.WithStack(err)
@@ -504,7 +462,7 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
restoreLog.Info("starting restore")
var podVolumeBackups []*velerov1api.PodVolumeBackup
var podVolumeBackups []*api.PodVolumeBackup
for i := range podVolumeBackupList.Items {
podVolumeBackups = append(podVolumeBackups, &podVolumeBackupList.Items[i])
}
@@ -516,7 +474,7 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
VolumeSnapshots: volumeSnapshots,
BackupReader: backupFile,
}
restoreWarnings, restoreErrors := c.restorer.RestoreWithResolvers(restoreReq, actionsResolver, snapshotItemResolver,
restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, snapshotItemResolver,
pluginManager)
// log errors and warnings to the restore log
@@ -546,12 +504,12 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
// re-instantiate the backup store because credentials could have changed since the original
// instantiation, if this was a long-running restore
info.backupStore, err = c.backupStoreGetter.Get(info.location, pluginManager, c.logger)
info.backupStore, err = r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
if err != nil {
return errors.Wrap(err, "error setting up backup store to persist log and results files")
}
if logReader, err := restoreLog.done(c.logger); err != nil {
if logReader, err := restoreLog.done(r.logger); err != nil {
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error getting restore log reader: %v", err))
} else {
if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil {
@@ -578,12 +536,35 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
}
if err := putResults(restore, m, info.backupStore); err != nil {
c.logger.WithError(err).Error("Error uploading restore results to backup storage")
r.logger.WithError(err).Error("Error uploading restore results to backup storage")
}
return nil
}
// updateTotalRestoreMetric update the velero_restore_total metric every minute.
func (r *restoreReconciler) updateTotalRestoreMetric() {
go func() {
// Wait for 5 seconds to let controller-runtime to setup k8s clients.
time.Sleep(5 * time.Second)
wait.NonSlidingUntil(
func() {
// recompute restore_total metric
restoreList := &api.RestoreList{}
err := r.kbClient.List(context.Background(), restoreList, &client.ListOptions{})
if err != nil {
r.logger.Error(err, "Error computing restore_total metric")
} else {
r.metrics.SetRestoreTotal(int64(len(restoreList.Items)))
}
},
1*time.Minute,
r.ctx.Done(),
)
}()
}
func putResults(restore *api.Restore, results map[string]results.Result, backupStore persistence.BackupStore) error {
buf := new(bytes.Buffer)
gzw := gzip.NewWriter(buf)

View File

@@ -29,14 +29,13 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/types"
clocktesting "k8s.io/utils/clock/testing"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
"github.com/vmware-tanzu/velero/pkg/metrics"
persistencemocks "github.com/vmware-tanzu/velero/pkg/persistence/mocks"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
@@ -91,21 +90,19 @@ func TestFetchBackupInfo(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{kbClient: fakeClient}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
backupStore = &persistencemocks.BackupStore{}
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{kbClient: fakeClient}
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
backupStore = &persistencemocks.BackupStore{}
)
defer restorer.AssertExpectations(t)
defer backupStore.AssertExpectations(t)
c := NewRestoreController(
r := NewRestoreReconciler(
context.Background(),
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
restorer,
fakeClient,
logger,
@@ -114,15 +111,15 @@ func TestFetchBackupInfo(t *testing.T) {
NewFakeSingleObjectBackupStoreGetter(backupStore),
metrics.NewServerMetrics(),
formatFlag,
).(*restoreController)
)
if test.backupStoreError == nil {
for _, itm := range test.informerLocations {
require.NoError(t, c.kbClient.Create(context.Background(), itm))
require.NoError(t, r.kbClient.Create(context.Background(), itm))
}
for _, itm := range test.informerBackups {
assert.NoError(t, c.kbClient.Create(context.Background(), itm))
assert.NoError(t, r.kbClient.Create(context.Background(), itm))
}
}
@@ -139,7 +136,7 @@ func TestFetchBackupInfo(t *testing.T) {
backupStore.On("GetBackupMetadata", test.backupName).Return(test.backupStoreBackup, nil).Maybe()
}
info, err := c.fetchBackupInfo(test.backupName, pluginManager)
info, err := r.fetchBackupInfo(test.backupName, pluginManager)
require.Equal(t, test.expectedErr, err != nil)
if test.expectedRes != nil {
@@ -152,33 +149,22 @@ func TestFetchBackupInfo(t *testing.T) {
func TestProcessQueueItemSkips(t *testing.T) {
tests := []struct {
name string
restoreKey string
namespace string
restoreName string
restore *velerov1api.Restore
expectError bool
}{
{
name: "invalid key returns error",
restoreKey: "invalid/key/value",
},
{
name: "missing restore returns error",
restoreKey: "foo/bar",
name: "invalid key returns error",
namespace: "invalid",
restoreName: "key/value",
expectError: true,
},
{
name: "restore with phase InProgress does not get processed",
restoreKey: "foo/bar",
restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseInProgress).Result(),
},
{
name: "restore with phase Completed does not get processed",
restoreKey: "foo/bar",
restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseCompleted).Result(),
},
{
name: "restore with phase FailedValidation does not get processed",
restoreKey: "foo/bar",
restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseFailedValidation).Result(),
name: "missing restore returns error",
namespace: "foo",
restoreName: "bar",
expectError: true,
},
}
@@ -187,16 +173,18 @@ func TestProcessQueueItemSkips(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{kbClient: fakeClient}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{kbClient: fakeClient}
logger = velerotest.NewLogger()
)
c := NewRestoreController(
if test.restore != nil {
assert.Nil(t, fakeClient.Create(context.Background(), test.restore))
}
r := NewRestoreReconciler(
context.Background(),
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
restorer,
fakeClient,
logger,
@@ -205,20 +193,19 @@ func TestProcessQueueItemSkips(t *testing.T) {
nil, // backupStoreGetter
metrics.NewServerMetrics(),
formatFlag,
).(*restoreController)
)
if test.restore != nil {
c.kbClient.Create(context.Background(), test.restore)
}
err := c.processQueueItem(test.restoreKey)
_, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: test.namespace,
Name: test.restoreName,
}})
assert.Equal(t, test.expectError, err != nil)
})
}
}
func TestProcessQueueItem(t *testing.T) {
func TestRestoreReconcile(t *testing.T) {
defaultStorageLocation := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()
@@ -410,13 +397,11 @@ func TestProcessQueueItem(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build()
restorer = &fakeRestorer{kbClient: fakeClient}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
backupStore = &persistencemocks.BackupStore{}
fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build()
restorer = &fakeRestorer{kbClient: fakeClient}
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
backupStore = &persistencemocks.BackupStore{}
)
defer restorer.AssertExpectations(t)
@@ -426,9 +411,9 @@ func TestProcessQueueItem(t *testing.T) {
defaultStorageLocation.ObjectMeta.ResourceVersion = ""
}()
c := NewRestoreController(
r := NewRestoreReconciler(
context.Background(),
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
restorer,
fakeClient,
logger,
@@ -437,18 +422,18 @@ func TestProcessQueueItem(t *testing.T) {
NewFakeSingleObjectBackupStoreGetter(backupStore),
metrics.NewServerMetrics(),
formatFlag,
).(*restoreController)
)
c.clock = clocktesting.NewFakeClock(now)
r.clock = clocktesting.NewFakeClock(now)
if test.location != nil {
require.NoError(t, fakeClient.Create(context.Background(), test.location))
require.NoError(t, r.kbClient.Create(context.Background(), test.location))
}
if test.backup != nil {
assert.NoError(t, c.kbClient.Create(context.Background(), test.backup))
assert.NoError(t, r.kbClient.Create(context.Background(), test.backup))
}
if test.restore != nil {
require.NoError(t, c.kbClient.Create(context.Background(), test.restore))
require.NoError(t, r.kbClient.Create(context.Background(), test.restore))
}
var warnings, errors results.Result
@@ -479,17 +464,6 @@ func TestProcessQueueItem(t *testing.T) {
backupStore.On("GetBackupVolumeSnapshots", test.backup.Name).Return(volumeSnapshots, nil)
}
var (
key = test.restoreKey
err error
)
if key == "" && test.restore != nil {
key, err = cache.MetaNamespaceKeyFunc(test.restore)
if err != nil {
panic(err)
}
}
if test.backupStoreGetBackupMetadataErr != nil {
// TODO why do I need .Maybe() here?
backupStore.On("GetBackupMetadata", test.restore.Spec.BackupName).Return(nil, test.backupStoreGetBackupMetadataErr).Maybe()
@@ -506,7 +480,11 @@ func TestProcessQueueItem(t *testing.T) {
pluginManager.On("CleanupClients")
}
err = c.processQueueItem(key)
//err = r.processQueueItem(key)
_, err = r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: test.restore.Namespace,
Name: test.restore.Name,
}})
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
@@ -590,26 +568,24 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
formatFlag := logging.FormatText
var (
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
backupStore = &persistencemocks.BackupStore{}
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
backupStore = &persistencemocks.BackupStore{}
)
c := NewRestoreController(
r := NewRestoreReconciler(
context.Background(),
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
nil,
fakeClient,
logger,
logrus.DebugLevel,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeSingleObjectBackupStoreGetter(backupStore),
nil,
metrics.NewServerMetrics(),
formatFlag,
).(*restoreController)
)
restore := &velerov1api.Restore{
ObjectMeta: metav1.ObjectMeta{
@@ -622,19 +598,18 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
}
// no backups created from the schedule: fail validation
require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(
defaultBackup().
ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "non-matching-schedule")).
Phase(velerov1api.BackupPhaseCompleted).
Result(),
))
require.NoError(t, r.kbClient.Create(context.Background(), defaultBackup().
ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "non-matching-schedule")).
Phase(velerov1api.BackupPhaseCompleted).
Result()))
c.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore, pluginManager)
assert.Contains(t, restore.Status.ValidationErrors, "No backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
// no completed backups created from the schedule: fail validation
require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(
require.NoError(t, r.kbClient.Create(
context.Background(),
defaultBackup().
ObjectMeta(
builder.WithName("backup-2"),
@@ -644,14 +619,14 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Result(),
))
c.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore, pluginManager)
assert.Contains(t, restore.Status.ValidationErrors, "No completed backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
// multiple completed backups created from the schedule: use most recent
now := time.Now()
require.NoError(t, c.kbClient.Create(context.Background(),
require.NoError(t, r.kbClient.Create(context.Background(),
defaultBackup().
ObjectMeta(
builder.WithName("foo"),
@@ -664,7 +639,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
))
location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()
require.NoError(t, c.kbClient.Create(context.Background(), location))
require.NoError(t, r.kbClient.Create(context.Background(), location))
restore = &velerov1api.Restore{
ObjectMeta: metav1.ObjectMeta{
@@ -675,7 +650,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
ScheduleName: "schedule-1",
},
}
c.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore, pluginManager)
assert.Nil(t, restore.Status.ValidationErrors)
assert.Equal(t, "foo", restore.Spec.BackupName)
}

View File

@@ -115,3 +115,20 @@ func (f FalsePredicate) Update(event.UpdateEvent) bool {
func (f FalsePredicate) Generic(event.GenericEvent) bool {
return false
}
func NewCreateEventPredicate(f func(client.Object) bool) predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
return f(event.Object)
},
DeleteFunc: func(event event.DeleteEvent) bool {
return false
},
GenericFunc: func(event event.GenericEvent) bool {
return false
},
UpdateFunc: func(event event.UpdateEvent) bool {
return false
},
}
}