From 7d7e3fff0defd9ee9476d4ddbaf50c435e6a4ef6 Mon Sep 17 00:00:00 2001 From: Xun Jiang/Bruce Jiang <59276555+blackpiglet@users.noreply.github.com> Date: Fri, 10 Mar 2023 08:59:40 +0800 Subject: [PATCH] Refoctor backup controller with controller-runtime. (#5969) Signed-off-by: Ming Signed-off-by: Xun Jiang Co-authored-by: Ming --- changelogs/unreleased/5969-qiuming-best | 1 + config/rbac/role.yaml | 33 ++ .../velero/v1/{backup.go => backup_types.go} | 5 + .../v1/{restore.go => restore_types.go} | 5 + pkg/backup/backup.go | 35 +- pkg/backup/backup_test.go | 2 +- pkg/cmd/server/server.go | 498 +++++++++--------- pkg/cmd/server/server_test.go | 23 +- .../async_backup_operations_controller.go | 1 + pkg/controller/backup_controller.go | 350 ++++++------ pkg/controller/backup_controller_test.go | 253 ++++----- pkg/controller/backup_finalizer_controller.go | 1 + .../backup_storage_location_controller.go | 1 + pkg/controller/constants.go | 2 +- pkg/controller/download_request_controller.go | 1 + pkg/controller/gc_controller.go | 1 + pkg/controller/generic_controller.go | 146 ----- .../pod_volume_backup_controller.go | 1 + pkg/controller/restore_controller.go | 2 +- .../server_status_request_controller.go | 1 + pkg/podvolume/backupper_factory.go | 39 +- pkg/podvolume/restorer_factory.go | 33 +- 22 files changed, 616 insertions(+), 818 deletions(-) create mode 100644 changelogs/unreleased/5969-qiuming-best rename pkg/apis/velero/v1/{backup.go => backup_types.go} (98%) rename pkg/apis/velero/v1/{restore.go => restore_types.go} (97%) delete mode 100644 pkg/controller/generic_controller.go diff --git a/changelogs/unreleased/5969-qiuming-best b/changelogs/unreleased/5969-qiuming-best new file mode 100644 index 000000000..053573b78 --- /dev/null +++ b/changelogs/unreleased/5969-qiuming-best @@ -0,0 +1 @@ +Refactor backup controller based on the controller-runtime framework. diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 508a08421..6ec17660b 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -51,6 +51,19 @@ rules: verbs: - create - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - velero.io + resources: + - backups/status + verbs: + - get + - patch + - update - apiGroups: - velero.io resources: @@ -151,6 +164,26 @@ rules: - get - patch - update +- apiGroups: + - velero.io + resources: + - restores + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - velero.io + resources: + - restores/status + verbs: + - get + - patch + - update - apiGroups: - velero.io resources: diff --git a/pkg/apis/velero/v1/backup.go b/pkg/apis/velero/v1/backup_types.go similarity index 98% rename from pkg/apis/velero/v1/backup.go rename to pkg/apis/velero/v1/backup_types.go index 394fe969c..9824075b9 100644 --- a/pkg/apis/velero/v1/backup.go +++ b/pkg/apis/velero/v1/backup_types.go @@ -407,6 +407,11 @@ type BackupProgress struct { // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:object:root=true +// +kubebuilder:object:generate=true +// +kubebuilder:storageversion +// +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=create;delete;get;list;patch;update;watch +// +kubebuilder:rbac:groups=velero.io,resources=backups/status,verbs=get;update;patch // Backup is a Velero resource that represents the capture of Kubernetes // cluster state at a point in time (API objects and associated volume state). diff --git a/pkg/apis/velero/v1/restore.go b/pkg/apis/velero/v1/restore_types.go similarity index 97% rename from pkg/apis/velero/v1/restore.go rename to pkg/apis/velero/v1/restore_types.go index 984c1587c..5cdeb615e 100644 --- a/pkg/apis/velero/v1/restore.go +++ b/pkg/apis/velero/v1/restore_types.go @@ -330,6 +330,11 @@ type RestoreProgress struct { // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:object:root=true +// +kubebuilder:object:generate=true +// +kubebuilder:storageversion +// +kubebuilder:rbac:groups=velero.io,resources=restores,verbs=create;delete;get;list;patch;update;watch +// +kubebuilder:rbac:groups=velero.io,resources=restores/status,verbs=get;update;patch // Restore is a Velero resource that represents the application of // resources from a Velero backup to a target Kubernetes cluster. diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index ab3d8ecc3..67243361b 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -34,14 +34,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" kubeerrs "k8s.io/apimachinery/pkg/util/errors" "github.com/vmware-tanzu/velero/internal/hook" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/client" + + kbclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/vmware-tanzu/velero/pkg/discovery" - velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1" "github.com/vmware-tanzu/velero/pkg/itemoperation" "github.com/vmware-tanzu/velero/pkg/kuberesource" "github.com/vmware-tanzu/velero/pkg/plugin/framework" @@ -52,6 +53,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/podvolume" "github.com/vmware-tanzu/velero/pkg/util/boolptr" "github.com/vmware-tanzu/velero/pkg/util/collections" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) // BackupVersion is the current backup major version for Velero. @@ -76,7 +78,7 @@ type Backupper interface { // kubernetesBackupper implements Backupper. type kubernetesBackupper struct { - backupClient velerov1client.BackupsGetter + kbClient kbclient.Client dynamicFactory client.DynamicFactory discoveryHelper discovery.Helper podCommandExecutor podexec.PodCommandExecutor @@ -103,7 +105,7 @@ func cohabitatingResources() map[string]*cohabitatingResource { // NewKubernetesBackupper creates a new kubernetesBackupper. func NewKubernetesBackupper( - backupClient velerov1client.BackupsGetter, + kbClient kbclient.Client, discoveryHelper discovery.Helper, dynamicFactory client.DynamicFactory, podCommandExecutor podexec.PodCommandExecutor, @@ -114,7 +116,7 @@ func NewKubernetesBackupper( uploaderType string, ) (Backupper, error) { return &kubernetesBackupper{ - backupClient: backupClient, + kbClient: kbClient, discoveryHelper: discoveryHelper, dynamicFactory: dynamicFactory, podCommandExecutor: podCommandExecutor, @@ -277,8 +279,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger, log.WithField("progress", "").Infof("Collected %d items matching the backup spec from the Kubernetes API (actual number of items backed up may be more or less depending on velero.io/exclude-from-backup annotation, plugins returning additional related items to back up, etc.)", len(items)) backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(items)} - patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d}}}`, len(items)) - if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(context.TODO(), backupRequest.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { + original := backupRequest.Backup.DeepCopy() + backupRequest.Backup.Status.Progress.TotalItems = len(items) + if err := kube.PatchResource(original, backupRequest.Backup, kb.kbClient); err != nil { log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress.totalItems") } @@ -328,11 +331,10 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger, lastUpdate = &val case <-ticker.C: if lastUpdate != nil { - backupRequest.Status.Progress.TotalItems = lastUpdate.totalItems - backupRequest.Status.Progress.ItemsBackedUp = lastUpdate.itemsBackedUp - - patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, lastUpdate.totalItems, lastUpdate.itemsBackedUp) - if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(context.TODO(), backupRequest.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { + backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: lastUpdate.totalItems, ItemsBackedUp: lastUpdate.itemsBackedUp} + original := backupRequest.Backup.DeepCopy() + backupRequest.Backup.Status.Progress = &velerov1api.BackupProgress{TotalItems: lastUpdate.totalItems, ItemsBackedUp: lastUpdate.itemsBackedUp} + if err := kube.PatchResource(original, backupRequest.Backup, kb.kbClient); err != nil { log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress") } lastUpdate = nil @@ -407,11 +409,10 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger, // do a final update on progress since we may have just added some CRDs and may not have updated // for the last few processed items. - backupRequest.Status.Progress.TotalItems = len(backupRequest.BackedUpItems) - backupRequest.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems) - - patch = fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, len(backupRequest.BackedUpItems), len(backupRequest.BackedUpItems)) - if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(context.TODO(), backupRequest.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { + backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)} + original = backupRequest.Backup.DeepCopy() + backupRequest.Backup.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)} + if err := kube.PatchResource(original, backupRequest.Backup, kb.kbClient); err != nil { log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress") } diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index bd41642c1..e67c4e3fc 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -3013,7 +3013,7 @@ func newHarness(t *testing.T) *harness { return &harness{ APIServer: apiServer, backupper: &kubernetesBackupper{ - backupClient: apiServer.VeleroClient.VeleroV1(), + kbClient: test.NewFakeControllerRuntimeClient(t), dynamicFactory: client.NewDynamicFactory(apiServer.DynamicClient), discoveryHelper: discoveryHelper, diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 880e29f1e..bc87608fa 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -54,7 +54,6 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/internal/storage" - "github.com/vmware-tanzu/velero/internal/util/managercontroller" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/backup" "github.com/vmware-tanzu/velero/pkg/buildinfo" @@ -66,7 +65,6 @@ import ( velerodiscovery "github.com/vmware-tanzu/velero/pkg/discovery" "github.com/vmware-tanzu/velero/pkg/features" clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned" - informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/persistence" @@ -234,30 +232,29 @@ func NewCommand(f client.Factory) *cobra.Command { } type server struct { - namespace string - metricsAddress string - kubeClientConfig *rest.Config - kubeClient kubernetes.Interface - veleroClient clientset.Interface - discoveryClient discovery.DiscoveryInterface - discoveryHelper velerodiscovery.Helper - dynamicClient dynamic.Interface - sharedInformerFactory informers.SharedInformerFactory - csiSnapshotterSharedInformerFactory *CSIInformerFactoryWrapper - csiSnapshotClient *snapshotv1client.Clientset - ctx context.Context - cancelFunc context.CancelFunc - logger logrus.FieldLogger - logLevel logrus.Level - pluginRegistry process.Registry - repoManager repository.Manager - repoLocker *repository.RepoLocker - repoEnsurer *repository.RepositoryEnsurer - metrics *metrics.ServerMetrics - config serverConfig - mgr manager.Manager - credentialFileStore credentials.FileStore - credentialSecretStore credentials.SecretStore + namespace string + metricsAddress string + kubeClientConfig *rest.Config + kubeClient kubernetes.Interface + veleroClient clientset.Interface + discoveryClient discovery.DiscoveryInterface + discoveryHelper velerodiscovery.Helper + dynamicClient dynamic.Interface + csiSnapshotClient *snapshotv1client.Clientset + csiSnapshotLister snapshotv1listers.VolumeSnapshotLister + ctx context.Context + cancelFunc context.CancelFunc + logger logrus.FieldLogger + logLevel logrus.Level + pluginRegistry process.Registry + repoManager repository.Manager + repoLocker *repository.RepoLocker + repoEnsurer *repository.RepositoryEnsurer + metrics *metrics.ServerMetrics + config serverConfig + mgr manager.Manager + credentialFileStore credentials.FileStore + credentialSecretStore credentials.SecretStore } func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*server, error) { @@ -311,15 +308,6 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s return nil, err } - var csiSnapClient *snapshotv1client.Clientset - if features.IsEnabled(velerov1api.CSIFeatureFlag) { - csiSnapClient, err = snapshotv1client.NewForConfig(clientConfig) - if err != nil { - cancelFunc() - return nil, err - } - } - scheme := runtime.NewScheme() velerov1api.AddToScheme(scheme) corev1api.AddToScheme(scheme) @@ -350,25 +338,39 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s credentialSecretStore, err := credentials.NewNamespacedSecretStore(mgr.GetClient(), f.Namespace()) s := &server{ - namespace: f.Namespace(), - metricsAddress: config.metricsAddress, - kubeClientConfig: clientConfig, - kubeClient: kubeClient, - veleroClient: veleroClient, - discoveryClient: veleroClient.Discovery(), - dynamicClient: dynamicClient, - sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(veleroClient, 0, informers.WithNamespace(f.Namespace())), - csiSnapshotterSharedInformerFactory: NewCSIInformerFactoryWrapper(csiSnapClient), - csiSnapshotClient: csiSnapClient, - ctx: ctx, - cancelFunc: cancelFunc, - logger: logger, - logLevel: logger.Level, - pluginRegistry: pluginRegistry, - config: config, - mgr: mgr, - credentialFileStore: credentialFileStore, - credentialSecretStore: credentialSecretStore, + namespace: f.Namespace(), + metricsAddress: config.metricsAddress, + kubeClientConfig: clientConfig, + kubeClient: kubeClient, + veleroClient: veleroClient, + discoveryClient: veleroClient.Discovery(), + dynamicClient: dynamicClient, + ctx: ctx, + cancelFunc: cancelFunc, + logger: logger, + logLevel: logger.Level, + pluginRegistry: pluginRegistry, + config: config, + mgr: mgr, + credentialFileStore: credentialFileStore, + credentialSecretStore: credentialSecretStore, + } + + // Setup CSI snapshot client and lister + var csiSnapClient *snapshotv1client.Clientset + if features.IsEnabled(velerov1api.CSIFeatureFlag) { + csiSnapClient, err = snapshotv1client.NewForConfig(clientConfig) + if err != nil { + cancelFunc() + return nil, err + } + s.csiSnapshotClient = csiSnapClient + + s.csiSnapshotLister, err = s.getCSIVolumeSnapshotListers() + if err != nil { + cancelFunc() + return nil, err + } } return s, nil @@ -488,32 +490,32 @@ func (s *server) veleroResourcesExist() error { } // High priorities: -// - Custom Resource Definitions come before Custom Resource so that they can be -// restored with their corresponding CRD. -// - Namespaces go second because all namespaced resources depend on them. -// - Storage Classes are needed to create PVs and PVCs correctly. -// - VolumeSnapshotClasses are needed to provision volumes using volumesnapshots -// - VolumeSnapshotContents are needed as they contain the handle to the volume snapshot in the -// storage provider -// - VolumeSnapshots are needed to create PVCs using the VolumeSnapshot as their data source. -// - PVs go before PVCs because PVCs depend on them. -// - PVCs go before pods or controllers so they can be mounted as volumes. -// - Service accounts go before secrets so service account token secrets can be filled automatically. -// - Secrets and config maps go before pods or controllers so they can be mounted -// as volumes. -// - Limit ranges go before pods or controllers so pods can use them. -// - Pods go before controllers so they can be explicitly restored and potentially -// have pod volume restores run before controllers adopt the pods. -// - Replica sets go before deployments/other controllers so they can be explicitly -// restored and be adopted by controllers. -// - CAPI ClusterClasses go before Clusters. +// - Custom Resource Definitions come before Custom Resource so that they can be +// restored with their corresponding CRD. +// - Namespaces go second because all namespaced resources depend on them. +// - Storage Classes are needed to create PVs and PVCs correctly. +// - VolumeSnapshotClasses are needed to provision volumes using volumesnapshots +// - VolumeSnapshotContents are needed as they contain the handle to the volume snapshot in the +// storage provider +// - VolumeSnapshots are needed to create PVCs using the VolumeSnapshot as their data source. +// - PVs go before PVCs because PVCs depend on them. +// - PVCs go before pods or controllers so they can be mounted as volumes. +// - Service accounts go before secrets so service account token secrets can be filled automatically. +// - Secrets and config maps go before pods or controllers so they can be mounted +// as volumes. +// - Limit ranges go before pods or controllers so pods can use them. +// - Pods go before controllers so they can be explicitly restored and potentially +// have pod volume restores run before controllers adopt the pods. +// - Replica sets go before deployments/other controllers so they can be explicitly +// restored and be adopted by controllers. +// - CAPI ClusterClasses go before Clusters. // // Low priorities: -// - Tanzu ClusterBootstraps go last as it can reference any other kind of resources. -// ClusterBootstraps go before CAPI Clusters otherwise a new default ClusterBootstrap object is created for the cluster -// - CAPI Clusters come before ClusterResourceSets because failing to do so means the CAPI controller-manager will panic. -// Both Clusters and ClusterResourceSets need to come before ClusterResourceSetBinding in order to properly restore workload clusters. -// See https://github.com/kubernetes-sigs/cluster-api/issues/4105 +// - Tanzu ClusterBootstraps go last as it can reference any other kind of resources. +// ClusterBootstraps go before CAPI Clusters otherwise a new default ClusterBootstrap object is created for the cluster +// - CAPI Clusters come before ClusterResourceSets because failing to do so means the CAPI controller-manager will panic. +// Both Clusters and ClusterResourceSets need to come before ClusterResourceSetBinding in order to properly restore workload clusters. +// See https://github.com/kubernetes-sigs/cluster-api/issues/4105 var defaultRestorePriorities = restore.Priorities{ HighPriorities: []string{ "customresourcedefinitions", @@ -573,37 +575,42 @@ func (s *server) initRepoManager() error { return nil } -func (s *server) getCSIVolumeSnapshotListers() snapshotv1listers.VolumeSnapshotLister { - // Make empty listers that will only be populated if CSI is properly enabled. - var vsLister snapshotv1listers.VolumeSnapshotLister - var err error +func (s *server) getCSIVolumeSnapshotListers() (vsLister snapshotv1listers.VolumeSnapshotLister, err error) { + _, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String()) + switch { + case apierrors.IsNotFound(err): + // CSI is enabled, but the required CRDs aren't installed, so halt. + s.logger.Warnf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String()) + case err == nil: + wrapper := NewCSIInformerFactoryWrapper(s.csiSnapshotClient) - // If CSI is enabled, check for the CSI groups and generate the listers - // If CSI isn't enabled, return empty listers. - if features.IsEnabled(velerov1api.CSIFeatureFlag) { - _, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String()) - switch { - case apierrors.IsNotFound(err): - // CSI is enabled, but the required CRDs aren't installed, so halt. - s.logger.Fatalf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String()) - case err == nil: - // CSI is enabled, and the resources were found. - // Instantiate the listers fully - s.logger.Debug("Creating CSI listers") - // Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe. - vsLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshots().Lister() - case err != nil: - cmd.CheckError(err) + s.logger.Debug("Creating CSI listers") + // Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe. + vsLister = wrapper.factory.Snapshot().V1().VolumeSnapshots().Lister() + + // start the informers & and wait for the caches to sync + wrapper.Start(s.ctx.Done()) + s.logger.Info("Waiting for informer caches to sync") + csiCacheSyncResults := wrapper.WaitForCacheSync(s.ctx.Done()) + s.logger.Info("Done waiting for informer caches to sync") + + for informer, synced := range csiCacheSyncResults { + if !synced { + err = errors.Errorf("cache was not synced for informer %v", informer) + return + } + s.logger.WithField("informer", informer).Info("Informer cache synced") } + case err != nil: + s.logger.Errorf("fail to find snapshot v1 schema: %s", err) } - return vsLister + + return } func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string) error { s.logger.Info("Starting controllers") - ctx := s.ctx - go func() { metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", promhttp.Handler()) @@ -625,121 +632,44 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string backupTracker := controller.NewBackupTracker() - backupControllerRunInfo := func() controllerRunInfo { - backupper, err := backup.NewKubernetesBackupper( - s.veleroClient.VeleroV1(), - s.discoveryHelper, - client.NewDynamicFactory(s.dynamicClient), - podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()), - podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(), - s.kubeClient.CoreV1(), s.kubeClient.CoreV1(), - s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger), - s.config.podVolumeOperationTimeout, - s.config.defaultVolumesToFsBackup, - s.config.clientPageSize, - s.config.uploaderType, - ) - cmd.CheckError(err) - - backupController := controller.NewBackupController( - s.sharedInformerFactory.Velero().V1().Backups(), - s.veleroClient.VeleroV1(), - s.discoveryHelper, - backupper, - s.logger, - s.logLevel, - newPluginManager, - backupTracker, - s.mgr.GetClient(), - s.config.defaultBackupLocation, - s.config.defaultVolumesToFsBackup, - s.config.defaultBackupTTL, - s.config.defaultCSISnapshotTimeout, - s.config.defaultItemOperationTimeout, - s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(), - defaultVolumeSnapshotLocations, - s.metrics, - backupStoreGetter, - s.config.formatFlag.Parse(), - s.getCSIVolumeSnapshotListers(), - s.csiSnapshotClient, - s.credentialFileStore, - ) - - return controllerRunInfo{ - controller: backupController, - numWorkers: defaultControllerWorkers, - } - } - // By far, PodVolumeBackup, PodVolumeRestore, BackupStorageLocation controllers // are not included in --disable-controllers list. // This is because of PVB and PVR are used by node agent DaemonSet, // and BSL controller is mandatory for Velero to work. - enabledControllers := map[string]func() controllerRunInfo{ - controller.Backup: backupControllerRunInfo, - } // Note: all runtime type controllers that can be disabled are grouped separately, below: enabledRuntimeControllers := map[string]struct{}{ - controller.ServerStatusRequest: {}, - controller.DownloadRequest: {}, - controller.Schedule: {}, - controller.BackupRepo: {}, + controller.AsyncBackupOperations: {}, + controller.Backup: {}, controller.BackupDeletion: {}, controller.BackupFinalizer: {}, - controller.GarbageCollection: {}, + controller.BackupRepo: {}, controller.BackupSync: {}, - controller.AsyncBackupOperations: {}, + controller.DownloadRequest: {}, + controller.GarbageCollection: {}, controller.Restore: {}, + controller.Schedule: {}, + controller.ServerStatusRequest: {}, } if s.config.restoreOnly { s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers") s.config.disabledControllers = append(s.config.disabledControllers, + controller.AsyncBackupOperations, controller.Backup, - controller.Schedule, - controller.GarbageCollection, controller.BackupDeletion, controller.BackupFinalizer, - controller.AsyncBackupOperations, + controller.GarbageCollection, + controller.Schedule, ) } // Remove disabled controllers so they are not initialized. If a match is not found we want // to halt the system so the user knows this operation was not possible. - if err := removeControllers(s.config.disabledControllers, enabledControllers, enabledRuntimeControllers, s.logger); err != nil { + if err := removeControllers(s.config.disabledControllers, enabledRuntimeControllers, s.logger); err != nil { log.Fatal(err, "unable to disable a controller") } - // Instantiate the enabled controllers. This needs to be done *before* - // the shared informer factory is started, because the controller - // constructors add event handlers to various informers, which should - // be done before the informers are running. - controllers := make([]controllerRunInfo, 0, len(enabledControllers)) - for _, newController := range enabledControllers { - controllers = append(controllers, newController()) - } - - // start the informers & and wait for the caches to sync - s.sharedInformerFactory.Start(ctx.Done()) - s.csiSnapshotterSharedInformerFactory.Start(ctx.Done()) - s.logger.Info("Waiting for informer caches to sync") - cacheSyncResults := s.sharedInformerFactory.WaitForCacheSync(ctx.Done()) - csiCacheSyncResults := s.csiSnapshotterSharedInformerFactory.WaitForCacheSync(ctx.Done()) - s.logger.Info("Done waiting for informer caches to sync") - - // Append our CSI informer types into the larger list of caches, so we can check them all at once - for informer, synced := range csiCacheSyncResults { - cacheSyncResults[informer] = synced - } - - for informer, synced := range cacheSyncResults { - if !synced { - return errors.Errorf("cache was not synced for informer %v", informer) - } - s.logger.WithField("informer", informer).Info("Informer cache synced") - } - + // Enable BSL controller. No need to check whether it's enabled or not. bslr := controller.NewBackupStorageLocationReconciler( s.ctx, s.mgr.GetClient(), @@ -756,15 +686,66 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupStorageLocation) } - if _, ok := enabledRuntimeControllers[controller.Schedule]; ok { - if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil { - s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule) + var backupOpsMap *controller.BackupItemOperationsMap + if _, ok := enabledRuntimeControllers[controller.AsyncBackupOperations]; ok { + r, m := controller.NewAsyncBackupOperationsReconciler( + s.logger, + s.mgr.GetClient(), + s.config.itemOperationSyncFrequency, + newPluginManager, + backupStoreGetter, + s.metrics, + ) + if err := r.SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.AsyncBackupOperations) } + backupOpsMap = m } - if _, ok := enabledRuntimeControllers[controller.BackupRepo]; ok { - if err := controller.NewBackupRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil { - s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupRepo) + if _, ok := enabledRuntimeControllers[controller.Backup]; ok { + backupper, err := backup.NewKubernetesBackupper( + s.mgr.GetClient(), + s.discoveryHelper, + client.NewDynamicFactory(s.dynamicClient), + podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()), + podvolume.NewBackupperFactory( + s.repoLocker, + s.repoEnsurer, + s.veleroClient, + s.kubeClient.CoreV1(), + s.kubeClient.CoreV1(), + s.kubeClient.CoreV1(), + s.logger, + ), + s.config.podVolumeOperationTimeout, + s.config.defaultVolumesToFsBackup, + s.config.clientPageSize, + s.config.uploaderType, + ) + cmd.CheckError(err) + if err := controller.NewBackupReconciler( + s.ctx, + s.discoveryHelper, + backupper, + s.logger, + s.logLevel, + newPluginManager, + backupTracker, + s.mgr.GetClient(), + s.config.defaultBackupLocation, + s.config.defaultVolumesToFsBackup, + s.config.defaultBackupTTL, + s.config.defaultCSISnapshotTimeout, + s.config.defaultItemOperationTimeout, + defaultVolumeSnapshotLocations, + s.metrics, + backupStoreGetter, + s.config.formatFlag.Parse(), + s.csiSnapshotLister, + s.csiSnapshotClient, + s.credentialFileStore, + ).SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.Backup) } } @@ -784,43 +765,21 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } - if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok { - if err := controller.NewServerStatusRequestReconciler( - s.mgr.GetClient(), - s.ctx, - s.pluginRegistry, - clock.RealClock{}, - s.logger, - ).SetupWithManager(s.mgr); err != nil { - s.logger.Fatal(err, "unable to create controller", "controller", controller.ServerStatusRequest) - } - } - - var backupOpsMap *controller.BackupItemOperationsMap - if _, ok := enabledRuntimeControllers[controller.AsyncBackupOperations]; ok { - r, m := controller.NewAsyncBackupOperationsReconciler( - s.logger, - s.mgr.GetClient(), - s.config.itemOperationSyncFrequency, - newPluginManager, - backupStoreGetter, - s.metrics, - ) - if err := r.SetupWithManager(s.mgr); err != nil { - s.logger.Fatal(err, "unable to create controller", "controller", controller.AsyncBackupOperations) - } - backupOpsMap = m - } - if _, ok := enabledRuntimeControllers[controller.BackupFinalizer]; ok { backupper, err := backup.NewKubernetesBackupper( - s.veleroClient.VeleroV1(), + s.mgr.GetClient(), s.discoveryHelper, client.NewDynamicFactory(s.dynamicClient), podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()), - podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(), - s.kubeClient.CoreV1(), s.kubeClient.CoreV1(), - s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger), + podvolume.NewBackupperFactory( + s.repoLocker, + s.repoEnsurer, + s.veleroClient, + s.kubeClient.CoreV1(), + s.kubeClient.CoreV1(), + s.kubeClient.CoreV1(), + s.logger, + ), s.config.podVolumeOperationTimeout, s.config.defaultVolumesToFsBackup, s.config.clientPageSize, @@ -841,17 +800,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } - if _, ok := enabledRuntimeControllers[controller.DownloadRequest]; ok { - r := controller.NewDownloadRequestReconciler( - s.mgr.GetClient(), - clock.RealClock{}, - newPluginManager, - backupStoreGetter, - s.logger, - backupOpsMap, - ) - if err := r.SetupWithManager(s.mgr); err != nil { - s.logger.Fatal(err, "unable to create controller", "controller", controller.DownloadRequest) + if _, ok := enabledRuntimeControllers[controller.BackupRepo]; ok { + if err := controller.NewBackupRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupRepo) } } @@ -874,6 +825,20 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } + if _, ok := enabledRuntimeControllers[controller.DownloadRequest]; ok { + r := controller.NewDownloadRequestReconciler( + s.mgr.GetClient(), + clock.RealClock{}, + newPluginManager, + backupStoreGetter, + s.logger, + backupOpsMap, + ) + if err := r.SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.DownloadRequest) + } + } + if _, ok := enabledRuntimeControllers[controller.GarbageCollection]; ok { r := controller.NewGCReconciler(s.logger, s.mgr.GetClient(), s.config.garbageCollectionFrequency) if err := r.SetupWithManager(s.mgr); err != nil { @@ -887,8 +852,15 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string client.NewDynamicFactory(s.dynamicClient), s.config.restoreResourcePriorities, s.kubeClient.CoreV1().Namespaces(), - podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(), - s.kubeClient.CoreV1(), s.kubeClient, s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger), + podvolume.NewRestorerFactory( + s.repoLocker, + s.repoEnsurer, + s.veleroClient, + s.kubeClient.CoreV1(), + s.kubeClient.CoreV1(), + s.kubeClient, + s.logger, + ), s.config.podVolumeOperationTimeout, s.config.resourceTerminatingTimeout, s.logger, @@ -918,15 +890,22 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } - // TODO(2.0): presuming all controllers and resources are converted to runtime-controller - // by v2.0, the block from this line and including the `s.mgr.Start() will be - // deprecated, since the manager auto-starts all the caches. Until then, we need to start the - // cache for them manually. - for i := range controllers { - controllerRunInfo := controllers[i] - // Adding the controllers to the manager will register them as a (runtime-controller) runnable, - // so the manager will ensure the cache is started and ready before all controller are started - s.mgr.Add(managercontroller.Runnable(controllerRunInfo.controller, controllerRunInfo.numWorkers)) + if _, ok := enabledRuntimeControllers[controller.Schedule]; ok { + if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule) + } + } + + if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok { + if err := controller.NewServerStatusRequestReconciler( + s.mgr.GetClient(), + s.ctx, + s.pluginRegistry, + clock.RealClock{}, + s.logger, + ).SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.ServerStatusRequest) + } } s.logger.Info("Server starting...") @@ -938,23 +917,16 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } // removeControllers will remove any controller listed to be disabled from the list -// of controllers to be initialized. First it will check the legacy list of controllers, -// then it will check the new runtime controllers. If both checks fail a match +// of controllers to be initialized. It will check the runtime controllers. If a match // wasn't found and it returns an error. -func removeControllers(disabledControllers []string, enabledControllers map[string]func() controllerRunInfo, enabledRuntimeControllers map[string]struct{}, logger logrus.FieldLogger) error { +func removeControllers(disabledControllers []string, enabledRuntimeControllers map[string]struct{}, logger logrus.FieldLogger) error { for _, controllerName := range disabledControllers { - if _, ok := enabledControllers[controllerName]; ok { + if _, ok := enabledRuntimeControllers[controllerName]; ok { logger.Infof("Disabling controller: %s", controllerName) - delete(enabledControllers, controllerName) + delete(enabledRuntimeControllers, controllerName) } else { - // maybe it is a runtime type controllers, so attempt to remove that - if _, ok := enabledRuntimeControllers[controllerName]; ok { - logger.Infof("Disabling controller: %s", controllerName) - delete(enabledRuntimeControllers, controllerName) - } else { - msg := fmt.Sprintf("Invalid value for --disable-controllers flag provided: %s. Valid values are: %s", controllerName, strings.Join(controller.DisableableControllers, ",")) - return errors.New(msg) - } + msg := fmt.Sprintf("Invalid value for --disable-controllers flag provided: %s. Valid values are: %s", controllerName, strings.Join(controller.DisableableControllers, ",")) + return errors.New(msg) } } return nil diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index d25066d65..52d7de106 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -120,14 +120,11 @@ func TestRemoveControllers(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - enabledControllers := map[string]func() controllerRunInfo{ - controller.BackupSync: func() controllerRunInfo { return controllerRunInfo{} }, - controller.Backup: func() controllerRunInfo { return controllerRunInfo{} }, - controller.GarbageCollection: func() controllerRunInfo { return controllerRunInfo{} }, - controller.Restore: func() controllerRunInfo { return controllerRunInfo{} }, - } - enabledRuntimeControllers := map[string]struct{}{ + controller.BackupSync: {}, + controller.Backup: {}, + controller.GarbageCollection: {}, + controller.Restore: {}, controller.ServerStatusRequest: {}, controller.Schedule: {}, controller.BackupDeletion: {}, @@ -136,20 +133,18 @@ func TestRemoveControllers(t *testing.T) { controller.AsyncBackupOperations: {}, } - totalNumOriginalControllers := len(enabledControllers) + len(enabledRuntimeControllers) + totalNumOriginalControllers := len(enabledRuntimeControllers) if tt.errorExpected { - assert.Error(t, removeControllers(tt.disabledControllers, enabledControllers, enabledRuntimeControllers, logger)) + assert.Error(t, removeControllers(tt.disabledControllers, enabledRuntimeControllers, logger)) } else { - assert.NoError(t, removeControllers(tt.disabledControllers, enabledControllers, enabledRuntimeControllers, logger)) + assert.NoError(t, removeControllers(tt.disabledControllers, enabledRuntimeControllers, logger)) - totalNumEnabledControllers := len(enabledControllers) + len(enabledRuntimeControllers) + totalNumEnabledControllers := len(enabledRuntimeControllers) assert.Equal(t, totalNumEnabledControllers, totalNumOriginalControllers-len(tt.disabledControllers)) for _, disabled := range tt.disabledControllers { - _, ok := enabledControllers[disabled] - assert.False(t, ok) - _, ok = enabledRuntimeControllers[disabled] + _, ok := enabledRuntimeControllers[disabled] assert.False(t, ok) } } diff --git a/pkg/controller/async_backup_operations_controller.go b/pkg/controller/async_backup_operations_controller.go index e945a2ad4..1d46ecb6f 100644 --- a/pkg/controller/async_backup_operations_controller.go +++ b/pkg/controller/async_backup_operations_controller.go @@ -174,6 +174,7 @@ func (c *asyncBackupOperationsReconciler) SetupWithManager(mgr ctrl.Manager) err // +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=get;list;watch;update // +kubebuilder:rbac:groups=velero.io,resources=backups/status,verbs=get // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get + func (c *asyncBackupOperationsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := c.logger.WithField("async backup operations for backup", req.String()) // FIXME: make this log.Debug diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index ec1bb443f..cb587d484 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -27,7 +27,6 @@ import ( "sync" "time" - jsonpatch "github.com/evanphx/json-patch" snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1" @@ -39,12 +38,11 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - clocks "k8s.io/utils/clock" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" @@ -53,9 +51,6 @@ import ( pkgbackup "github.com/vmware-tanzu/velero/pkg/backup" "github.com/vmware-tanzu/velero/pkg/discovery" "github.com/vmware-tanzu/velero/pkg/features" - velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1" - velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1" - velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1" "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/persistence" @@ -71,14 +66,17 @@ import ( "github.com/vmware-tanzu/velero/pkg/volume" ) -type backupController struct { - *genericController +const ( + backupResyncPeriod = time.Minute +) + +type backupReconciler struct { + ctx context.Context + logger logrus.FieldLogger discoveryHelper discovery.Helper backupper pkgbackup.Backupper - lister velerov1listers.BackupLister - client velerov1client.BackupsGetter kbClient kbclient.Client - clock clocks.WithTickerAndDelayedExecution + clock clock.WithTickerAndDelayedExecution backupLogLevel logrus.Level newPluginManager func(logrus.FieldLogger) clientmgmt.Manager backupTracker BackupTracker @@ -87,7 +85,6 @@ type backupController struct { defaultBackupTTL time.Duration defaultCSISnapshotTimeout time.Duration defaultItemOperationTimeout time.Duration - snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister defaultSnapshotLocations map[string]string metrics *metrics.ServerMetrics backupStoreGetter persistence.ObjectBackupStoreGetter @@ -97,9 +94,8 @@ type backupController struct { credentialFileStore credentials.FileStore } -func NewBackupController( - backupInformer velerov1informers.BackupInformer, - client velerov1client.BackupsGetter, +func NewBackupReconciler( + ctx context.Context, discoveryHelper discovery.Helper, backupper pkgbackup.Backupper, logger logrus.FieldLogger, @@ -112,7 +108,6 @@ func NewBackupController( defaultBackupTTL time.Duration, defaultCSISnapshotTimeout time.Duration, defaultItemOperationTimeout time.Duration, - volumeSnapshotLocationLister velerov1listers.VolumeSnapshotLocationLister, defaultSnapshotLocations map[string]string, metrics *metrics.ServerMetrics, backupStoreGetter persistence.ObjectBackupStoreGetter, @@ -120,14 +115,14 @@ func NewBackupController( volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, volumeSnapshotClient snapshotterClientSet.Interface, credentialStore credentials.FileStore, -) Interface { - c := &backupController{ - genericController: newGenericController(Backup, logger), +) *backupReconciler { + + b := &backupReconciler{ + ctx: ctx, discoveryHelper: discoveryHelper, backupper: backupper, - lister: backupInformer.Lister(), - client: client, - clock: &clocks.RealClock{}, + clock: &clock.RealClock{}, + logger: logger, backupLogLevel: backupLogLevel, newPluginManager: newPluginManager, backupTracker: backupTracker, @@ -137,7 +132,6 @@ func NewBackupController( defaultBackupTTL: defaultBackupTTL, defaultCSISnapshotTimeout: defaultCSISnapshotTimeout, defaultItemOperationTimeout: defaultItemOperationTimeout, - snapshotLocationLister: volumeSnapshotLocationLister, defaultSnapshotLocations: defaultSnapshotLocations, metrics: metrics, backupStoreGetter: backupStoreGetter, @@ -146,61 +140,49 @@ func NewBackupController( volumeSnapshotClient: volumeSnapshotClient, credentialFileStore: credentialStore, } - - c.syncHandler = c.processBackup - c.resyncFunc = c.resync - c.resyncPeriod = time.Minute - - backupInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - backup := obj.(*velerov1api.Backup) - - switch backup.Status.Phase { - case "", velerov1api.BackupPhaseNew: - // only process new backups - default: - c.logger.WithFields(logrus.Fields{ - "backup": kubeutil.NamespaceAndName(backup), - "phase": backup.Status.Phase, - }).Debug("Backup is not new, skipping") - return - } - - key, err := cache.MetaNamespaceKeyFunc(backup) - if err != nil { - c.logger.WithError(err).WithField(Backup, backup).Error("Error creating queue key, item not added to queue") - return - } - c.queue.Add(key) - }, - }, - ) - - return c + b.updateTotalBackupMetric() + return b } -func (c *backupController) resync() { - // recompute backup_total metric - backups, err := c.lister.List(labels.Everything()) - if err != nil { - c.logger.Error(err, "Error computing backup_total metric") - } else { - c.metrics.SetBackupTotal(int64(len(backups))) - } +func (b *backupReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&velerov1api.Backup{}). + Complete(b) +} - // recompute backup_last_successful_timestamp metric for each - // schedule (including the empty schedule, i.e. ad-hoc backups) - for schedule, timestamp := range getLastSuccessBySchedule(backups) { - c.metrics.SetBackupLastSuccessfulTimestamp(schedule, timestamp) - } +func (b *backupReconciler) updateTotalBackupMetric() { + go func() { + // Wait for 5 seconds to let controller-runtime to setup k8s clients. + time.Sleep(5 * time.Second) + + wait.Until( + func() { + // recompute backup_total metric + backups := &velerov1api.BackupList{} + err := b.kbClient.List(context.Background(), backups, &kbclient.ListOptions{LabelSelector: labels.Everything()}) + if err != nil { + b.logger.Error(err, "Error computing backup_total metric") + } else { + b.metrics.SetBackupTotal(int64(len(backups.Items))) + } + + // recompute backup_last_successful_timestamp metric for each + // schedule (including the empty schedule, i.e. ad-hoc backups) + for schedule, timestamp := range getLastSuccessBySchedule(backups.Items) { + b.metrics.SetBackupLastSuccessfulTimestamp(schedule, timestamp) + } + }, + backupResyncPeriod, + b.ctx.Done(), + ) + }() } // getLastSuccessBySchedule finds the most recent completed backup for each schedule // and returns a map of schedule name -> completion time of the most recent completed // backup. This map includes an entry for ad-hoc/non-scheduled backups, where the key // is the empty string. -func getLastSuccessBySchedule(backups []*velerov1api.Backup) map[string]time.Time { +func getLastSuccessBySchedule(backups []velerov1api.Backup) map[string]time.Time { lastSuccessBySchedule := map[string]time.Time{} for _, backup := range backups { if backup.Status.Phase != velerov1api.BackupPhaseCompleted { @@ -221,24 +203,23 @@ func getLastSuccessBySchedule(backups []*velerov1api.Backup) map[string]time.Tim return lastSuccessBySchedule } -func (c *backupController) processBackup(key string) error { - log := c.logger.WithField("key", key) - - log.Debug("Running processBackup") - ns, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - log.WithError(err).Errorf("error splitting key") - return nil - } +func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := b.logger.WithFields(logrus.Fields{ + "controller": Backup, + "backuprequest": req.String(), + }) log.Debug("Getting backup") - original, err := c.lister.Backups(ns).Get(name) - if apierrors.IsNotFound(err) { - log.Debugf("backup %s not found", name) - return nil - } + + original := &velerov1api.Backup{} + err := b.kbClient.Get(ctx, req.NamespacedName, original) if err != nil { - return errors.Wrap(err, "error getting backup") + if apierrors.IsNotFound(err) { + log.Debug("backup not found") + return ctrl.Result{}, nil + } + log.WithError(err).Error("error getting backup") + return ctrl.Result{}, err } // Double-check we have the correct phase. In the unlikely event that multiple controller @@ -253,42 +234,44 @@ func (c *backupController) processBackup(key string) error { case "", velerov1api.BackupPhaseNew: // only process new backups default: - return nil + b.logger.WithFields(logrus.Fields{ + "backup": kubeutil.NamespaceAndName(original), + "phase": original.Status.Phase, + }).Debug("Backup is not handled") + return ctrl.Result{}, nil } log.Debug("Preparing backup request") - request := c.prepareBackupRequest(original, log) + request := b.prepareBackupRequest(original, log) if len(request.Status.ValidationErrors) > 0 { request.Status.Phase = velerov1api.BackupPhaseFailedValidation } else { request.Status.Phase = velerov1api.BackupPhaseInProgress - request.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()} + request.Status.StartTimestamp = &metav1.Time{Time: b.clock.Now()} } // update status - updatedBackup, err := patchBackup(original, request.Backup, c.client) - if err != nil { - return errors.Wrapf(err, "error updating Backup status to %s", request.Status.Phase) + if err := kubeutil.PatchResource(original, request.Backup, b.kbClient); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", request.Status.Phase) } - // store ref to just-updated item for creating patch - original = updatedBackup - request.Backup = updatedBackup.DeepCopy() + original = request.Backup.DeepCopy() if request.Status.Phase == velerov1api.BackupPhaseFailedValidation { - return nil + log.Debug("failed to validate backup status") + return ctrl.Result{}, nil } - c.backupTracker.Add(request.Namespace, request.Name) - defer c.backupTracker.Delete(request.Namespace, request.Name) + b.backupTracker.Add(request.Namespace, request.Name) + defer b.backupTracker.Delete(request.Namespace, request.Name) log.Debug("Running backup") backupScheduleName := request.GetLabels()[velerov1api.ScheduleNameLabel] - c.metrics.RegisterBackupAttempt(backupScheduleName) + b.metrics.RegisterBackupAttempt(backupScheduleName) // execution & upload of backup - if err := c.runBackup(request); err != nil { + if err := b.runBackup(request); err != nil { // even though runBackup sets the backup's phase prior // to uploading artifacts to object storage, we have to // check for an error again here and update the phase if @@ -302,52 +285,26 @@ func (c *backupController) processBackup(key string) error { switch request.Status.Phase { case velerov1api.BackupPhaseCompleted: - c.metrics.RegisterBackupSuccess(backupScheduleName) - c.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusSucc) + b.metrics.RegisterBackupSuccess(backupScheduleName) + b.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusSucc) case velerov1api.BackupPhasePartiallyFailed: - c.metrics.RegisterBackupPartialFailure(backupScheduleName) - c.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure) + b.metrics.RegisterBackupPartialFailure(backupScheduleName) + b.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure) case velerov1api.BackupPhaseFailed: - c.metrics.RegisterBackupFailed(backupScheduleName) - c.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure) + b.metrics.RegisterBackupFailed(backupScheduleName) + b.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure) case velerov1api.BackupPhaseFailedValidation: - c.metrics.RegisterBackupValidationFailure(backupScheduleName) - c.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure) + b.metrics.RegisterBackupValidationFailure(backupScheduleName) + b.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure) } - - log.Debug("Updating backup's final status") - if _, err := patchBackup(original, request.Backup, c.client); err != nil { + log.Info("Updating backup's final status") + if err := kubeutil.PatchResource(original, request.Backup, b.kbClient); err != nil { log.WithError(err).Error("error updating backup's final status") } - - return nil + return ctrl.Result{}, nil } -func patchBackup(original, updated *velerov1api.Backup, client velerov1client.BackupsGetter) (*velerov1api.Backup, error) { - origBytes, err := json.Marshal(original) - if err != nil { - return nil, errors.Wrap(err, "error marshalling original backup") - } - - updatedBytes, err := json.Marshal(updated) - if err != nil { - return nil, errors.Wrap(err, "error marshalling updated backup") - } - - patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes) - if err != nil { - return nil, errors.Wrap(err, "error creating json merge patch for backup") - } - - res, err := client.Backups(original.Namespace).Patch(context.TODO(), original.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) - if err != nil { - return nil, errors.Wrap(err, "error patching backup") - } - - return res, nil -} - -func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request { +func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request { request := &pkgbackup.Request{ Backup: backup.DeepCopy(), // don't modify items in the cache } @@ -360,21 +317,21 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logg if request.Spec.TTL.Duration == 0 { // set default backup TTL - request.Spec.TTL.Duration = c.defaultBackupTTL + request.Spec.TTL.Duration = b.defaultBackupTTL } if request.Spec.CSISnapshotTimeout.Duration == 0 { // set default CSI VolumeSnapshot timeout - request.Spec.CSISnapshotTimeout.Duration = c.defaultCSISnapshotTimeout + request.Spec.CSISnapshotTimeout.Duration = b.defaultCSISnapshotTimeout } if request.Spec.ItemOperationTimeout.Duration == 0 { // set default item operation timeout - request.Spec.ItemOperationTimeout.Duration = c.defaultItemOperationTimeout + request.Spec.ItemOperationTimeout.Duration = b.defaultItemOperationTimeout } // calculate expiration - request.Status.Expiration = &metav1.Time{Time: c.clock.Now().Add(request.Spec.TTL.Duration)} + request.Status.Expiration = &metav1.Time{Time: b.clock.Now().Add(request.Spec.TTL.Duration)} // TODO: post v1.10. Remove this code block after DefaultVolumesToRestic is removed from CRD // For now, for CRs created by old versions, we need to respect the DefaultVolumesToRestic value if it is set true @@ -384,17 +341,17 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logg } if request.Spec.DefaultVolumesToFsBackup == nil { - request.Spec.DefaultVolumesToFsBackup = &c.defaultVolumesToFsBackup + request.Spec.DefaultVolumesToFsBackup = &b.defaultVolumesToFsBackup } // find which storage location to use var serverSpecified bool if request.Spec.StorageLocation == "" { // when the user doesn't specify a location, use the server default unless there is an existing BSL marked as default - // TODO(2.0) c.defaultBackupLocation will be deprecated - request.Spec.StorageLocation = c.defaultBackupLocation + // TODO(2.0) b.defaultBackupLocation will be deprecated + request.Spec.StorageLocation = b.defaultBackupLocation - locationList, err := storage.ListBackupStorageLocations(context.Background(), c.kbClient, request.Namespace) + locationList, err := storage.ListBackupStorageLocations(context.Background(), b.kbClient, request.Namespace) if err == nil { for _, location := range locationList.Items { if location.Spec.Default { @@ -408,7 +365,7 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logg // get the storage location, and store the BackupStorageLocation API obj on the request storageLocation := &velerov1api.BackupStorageLocation{} - if err := c.kbClient.Get(context.Background(), kbclient.ObjectKey{ + if err := b.kbClient.Get(context.Background(), kbclient.ObjectKey{ Namespace: request.Namespace, Name: request.Spec.StorageLocation, }, storageLocation); err != nil { @@ -439,7 +396,7 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logg // validate and get the backup's VolumeSnapshotLocations, and store the // VolumeSnapshotLocation API objs on the request - if locs, errs := c.validateAndGetSnapshotLocations(request.Backup); len(errs) > 0 { + if locs, errs := b.validateAndGetSnapshotLocations(request.Backup); len(errs) > 0 { request.Status.ValidationErrors = append(request.Status.ValidationErrors, errs...) } else { request.Spec.VolumeSnapshotLocations = nil @@ -453,14 +410,14 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logg if request.Annotations == nil { request.Annotations = make(map[string]string) } - request.Annotations[velerov1api.SourceClusterK8sGitVersionAnnotation] = c.discoveryHelper.ServerVersion().String() - request.Annotations[velerov1api.SourceClusterK8sMajorVersionAnnotation] = c.discoveryHelper.ServerVersion().Major - request.Annotations[velerov1api.SourceClusterK8sMinorVersionAnnotation] = c.discoveryHelper.ServerVersion().Minor + request.Annotations[velerov1api.SourceClusterK8sGitVersionAnnotation] = b.discoveryHelper.ServerVersion().String() + request.Annotations[velerov1api.SourceClusterK8sMajorVersionAnnotation] = b.discoveryHelper.ServerVersion().Major + request.Annotations[velerov1api.SourceClusterK8sMinorVersionAnnotation] = b.discoveryHelper.ServerVersion().Minor // Add namespaces with label velero.io/exclude-from-backup=true into request.Spec.ExcludedNamespaces // Essentially, adding the label velero.io/exclude-from-backup=true to a namespace would be equivalent to setting spec.ExcludedNamespaces namespaces := corev1api.NamespaceList{} - if err := c.kbClient.List(context.Background(), &namespaces, kbclient.MatchingLabels{"velero.io/exclude-from-backup": "true"}); err == nil { + if err := b.kbClient.List(context.Background(), &namespaces, kbclient.MatchingLabels{"velero.io/exclude-from-backup": "true"}); err == nil { for _, ns := range namespaces.Items { request.Spec.ExcludedNamespaces = append(request.Spec.ExcludedNamespaces, ns.Name) } @@ -495,7 +452,7 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup, logg // it will automatically be used) // // if backup has snapshotVolume disabled then it returns empty VSL -func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.Backup) (map[string]*velerov1api.VolumeSnapshotLocation, []string) { +func (b *backupReconciler) validateAndGetSnapshotLocations(backup *velerov1api.Backup) (map[string]*velerov1api.VolumeSnapshotLocation, []string) { errors := []string{} providerLocations := make(map[string]*velerov1api.VolumeSnapshotLocation) @@ -506,8 +463,8 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B for _, locationName := range backup.Spec.VolumeSnapshotLocations { // validate each locationName exists as a VolumeSnapshotLocation - location, err := c.snapshotLocationLister.VolumeSnapshotLocations(backup.Namespace).Get(locationName) - if err != nil { + location := &velerov1api.VolumeSnapshotLocation{} + if err := b.kbClient.Get(context.Background(), kbclient.ObjectKey{Namespace: backup.Namespace, Name: locationName}, location); err != nil { if apierrors.IsNotFound(err) { errors = append(errors, fmt.Sprintf("a VolumeSnapshotLocation CRD for the location %s with the name specified in the backup spec needs to be created before this snapshot can be executed. Error: %v", locationName, err)) } else { @@ -532,8 +489,8 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B if len(errors) > 0 { return nil, errors } - - allLocations, err := c.snapshotLocationLister.VolumeSnapshotLocations(backup.Namespace).List(labels.Everything()) + allLocations := &velerov1api.VolumeSnapshotLocationList{} + err := b.kbClient.List(context.Background(), allLocations, &kbclient.ListOptions{Namespace: backup.Namespace, LabelSelector: labels.Everything()}) if err != nil { errors = append(errors, fmt.Sprintf("error listing volume snapshot locations: %v", err)) return nil, errors @@ -541,9 +498,9 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B // build a map of provider->list of all locations for the provider allProviderLocations := make(map[string][]*velerov1api.VolumeSnapshotLocation) - for i := range allLocations { - loc := allLocations[i] - allProviderLocations[loc.Spec.Provider] = append(allProviderLocations[loc.Spec.Provider], loc) + for i := range allLocations.Items { + loc := allLocations.Items[i] + allProviderLocations[loc.Spec.Provider] = append(allProviderLocations[loc.Spec.Provider], &loc) } // go through each provider and make sure we have/can get a VSL @@ -557,12 +514,13 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B if len(locations) > 1 { // more than one possible location for the provider: check // the defaults - defaultLocation := c.defaultSnapshotLocations[provider] + defaultLocation := b.defaultSnapshotLocations[provider] if defaultLocation == "" { errors = append(errors, fmt.Sprintf("provider %s has more than one possible volume snapshot location, and none were specified explicitly or as a default", provider)) continue } - location, err := c.snapshotLocationLister.VolumeSnapshotLocations(backup.Namespace).Get(defaultLocation) + location := &velerov1api.VolumeSnapshotLocation{} + b.kbClient.Get(context.Background(), kbclient.ObjectKey{Namespace: backup.Namespace, Name: defaultLocation}, location) if err != nil { errors = append(errors, fmt.Sprintf("error getting volume snapshot location named %s: %v", defaultLocation, err)) continue @@ -582,7 +540,7 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B // add credential to config for each location for _, location := range providerLocations { - err = volume.UpdateVolumeSnapshotLocationWithCredentialConfig(location, c.credentialFileStore, c.logger) + err = volume.UpdateVolumeSnapshotLocationWithCredentialConfig(location, b.credentialFileStore, b.logger) if err != nil { errors = append(errors, fmt.Sprintf("error adding credentials to volume snapshot location named %s: %v", location.Name, err)) continue @@ -595,17 +553,17 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B // runBackup runs and uploads a validated backup. Any error returned from this function // causes the backup to be Failed; if no error is returned, the backup's status's Errors // field is checked to see if the backup was a partial failure. -func (c *backupController) runBackup(backup *pkgbackup.Request) error { - c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Setting up backup log") +func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error { + b.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Setting up backup log") // Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the // backup log failed for whatever reason. logCounter := logging.NewLogHook() - backupLog, err := logging.NewTempFileLogger(c.backupLogLevel, c.formatFlag, logCounter, logrus.Fields{Backup: kubeutil.NamespaceAndName(backup)}) + backupLog, err := logging.NewTempFileLogger(b.backupLogLevel, b.formatFlag, logCounter, logrus.Fields{Backup: kubeutil.NamespaceAndName(backup)}) if err != nil { return errors.Wrap(err, "error creating dual mode logger for backup") } - defer backupLog.Dispose(c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup))) + defer backupLog.Dispose(b.logger.WithField(Backup, kubeutil.NamespaceAndName(backup))) backupLog.Info("Setting up backup temp file") backupFile, err := ioutil.TempFile("", "") @@ -615,7 +573,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { defer closeAndRemoveFile(backupFile, backupLog) backupLog.Info("Setting up plugin manager") - pluginManager := c.newPluginManager(backupLog) + pluginManager := b.newPluginManager(backupLog) defer pluginManager.CleanupClients() backupLog.Info("Getting backup item actions") @@ -629,7 +587,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { } backupLog.Info("Setting up backup store to check for backup existence") - backupStore, err := c.backupStoreGetter.Get(backup.StorageLocation, pluginManager, backupLog) + backupStore, err := b.backupStoreGetter.Get(backup.StorageLocation, pluginManager, backupLog) if err != nil { return err } @@ -637,7 +595,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { exists, err := backupStore.BackupExists(backup.StorageLocation.Spec.StorageType.ObjectStorage.Bucket, backup.Name) if exists || err != nil { backup.Status.Phase = velerov1api.BackupPhaseFailed - backup.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} + backup.Status.CompletionTimestamp = &metav1.Time{Time: b.clock.Now()} if err != nil { return errors.Wrapf(err, "error checking if backup already exists in object storage") } @@ -648,7 +606,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { itemSnapshottersResolver := framework.NewItemSnapshotterResolver(itemSnapshotters) var fatalErrs []error - if err := c.backupper.BackupWithResolvers(backupLog, backup, backupFile, backupItemActionsResolver, + if err := b.backupper.BackupWithResolvers(backupLog, backup, backupFile, backupItemActionsResolver, itemSnapshottersResolver, pluginManager); err != nil { fatalErrs = append(fatalErrs, err) } @@ -662,14 +620,14 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { selector := label.NewSelectorForBackup(backup.Name) vscList := &snapshotv1api.VolumeSnapshotContentList{} - volumeSnapshots, err = c.waitVolumeSnapshotReadyToUse(context.Background(), backup.Spec.CSISnapshotTimeout.Duration, backup.Name) + volumeSnapshots, err = b.waitVolumeSnapshotReadyToUse(context.Background(), backup.Spec.CSISnapshotTimeout.Duration, backup.Name) if err != nil { backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error()) } backup.CSISnapshots = volumeSnapshots - err = c.kbClient.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector}) + err = b.kbClient.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector}) if err != nil { backupLog.Error(err) } @@ -682,7 +640,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { // persist the volumesnapshotclasses referenced by vsc if volumeSnapshotContents[index].Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) { vsClass := &snapshotv1api.VolumeSnapshotClass{} - if err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil { + if err := b.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil { backupLog.Error(err) } else { vsClassSet.Insert(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) @@ -697,7 +655,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { // Delete the VolumeSnapshots created in the backup, when CSI feature is enabled. if len(volumeSnapshots) > 0 && len(volumeSnapshotContents) > 0 { - c.deleteVolumeSnapshot(volumeSnapshots, volumeSnapshotContents, backupLog) + b.deleteVolumeSnapshot(volumeSnapshots, volumeSnapshotContents, backupLog) } } @@ -740,7 +698,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { "errors": backupErrors, } - backupLog.DoneForPersist(c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup))) + backupLog.DoneForPersist(b.logger.WithField(Backup, kubeutil.NamespaceAndName(backup))) // Assign finalize phase as close to end as possible so that any errors // logged to backupLog are captured. This is done before uploading the @@ -767,14 +725,14 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { if backup.Status.Phase == velerov1api.BackupPhaseFailed || backup.Status.Phase == velerov1api.BackupPhasePartiallyFailed || backup.Status.Phase == velerov1api.BackupPhaseCompleted { - backup.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} + backup.Status.CompletionTimestamp = &metav1.Time{Time: b.clock.Now()} } - recordBackupMetrics(backupLog, backup.Backup, backupFile, c.metrics, false) + recordBackupMetrics(backupLog, backup.Backup, backupFile, b.metrics, false) // re-instantiate the backup store because credentials could have changed since the original // instantiation, if this was a long-running backup backupLog.Info("Setting up backup store to persist the backup") - backupStore, err = c.backupStoreGetter.Get(backup.StorageLocation, pluginManager, backupLog) + backupStore, err = b.backupStoreGetter.Get(backup.StorageLocation, pluginManager, backupLog) if err != nil { return err } @@ -787,7 +745,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { } } - c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Backup completed") + b.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Backup completed") // if we return a non-nil error, the calling function will update // the backup's phase to Failed. @@ -965,20 +923,19 @@ func encodeToJSONGzip(data interface{}, desc string) (*bytes.Buffer, []error) { // using goroutine here instead of waiting in CSI plugin, because it's not easy to make BackupItemAction // parallel by now. After BackupItemAction parallel is implemented, this logic should be moved to CSI plugin // as https://github.com/vmware-tanzu/velero-plugin-for-csi/pull/100 -func (c *backupController) waitVolumeSnapshotReadyToUse(ctx context.Context, +func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context, csiSnapshotTimeout time.Duration, backupName string) ([]snapshotv1api.VolumeSnapshot, error) { eg, _ := errgroup.WithContext(ctx) timeout := csiSnapshotTimeout interval := 5 * time.Second volumeSnapshots := make([]snapshotv1api.VolumeSnapshot, 0) - if c.volumeSnapshotLister != nil { - tmpVSs, err := c.volumeSnapshotLister.List(label.NewSelectorForBackup(backupName)) + if b.volumeSnapshotLister != nil { + tmpVSs, err := b.volumeSnapshotLister.List(label.NewSelectorForBackup(backupName)) if err != nil { - c.logger.Error(err) + b.logger.Error(err) return volumeSnapshots, err } - for _, vs := range tmpVSs { volumeSnapshots = append(volumeSnapshots, *vs) } @@ -991,22 +948,22 @@ func (c *backupController) waitVolumeSnapshotReadyToUse(ctx context.Context, volumeSnapshot := volumeSnapshots[index] eg.Go(func() error { err := wait.PollImmediate(interval, timeout, func() (bool, error) { - tmpVS, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(ctx, volumeSnapshot.Name, metav1.GetOptions{}) + tmpVS, err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(b.ctx, volumeSnapshot.Name, metav1.GetOptions{}) if err != nil { return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name)) } if tmpVS.Status == nil || tmpVS.Status.BoundVolumeSnapshotContentName == nil || !boolptr.IsSetToTrue(tmpVS.Status.ReadyToUse) { - c.logger.Infof("Waiting for CSI driver to reconcile volumesnapshot %s/%s. Retrying in %ds", volumeSnapshot.Namespace, volumeSnapshot.Name, interval/time.Second) + b.logger.Infof("Waiting for CSI driver to reconcile volumesnapshot %s/%s. Retrying in %ds", volumeSnapshot.Namespace, volumeSnapshot.Name, interval/time.Second) return false, nil } - c.logger.Debugf("VolumeSnapshot %s/%s turned into ReadyToUse.", volumeSnapshot.Namespace, volumeSnapshot.Name) + b.logger.Debugf("VolumeSnapshot %s/%s turned into ReadyToUse.", volumeSnapshot.Namespace, volumeSnapshot.Name) // Put the ReadyToUse VolumeSnapshot element in the result channel. vsChannel <- *tmpVS return true, nil }) if err == wait.ErrWaitTimeout { - c.logger.Errorf("Timed out awaiting reconciliation of volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name) + b.logger.Errorf("Timed out awaiting reconciliation of volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name) } return err }) @@ -1028,7 +985,7 @@ func (c *backupController) waitVolumeSnapshotReadyToUse(ctx context.Context, // which will cause snapshot deletion on cloud provider, then backup cannot restore the PV. // If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to // change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete. -func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.VolumeSnapshot, +func (b *backupReconciler) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.VolumeSnapshot, volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent, logger logrus.FieldLogger) { var wg sync.WaitGroup @@ -1067,14 +1024,15 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api. logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name) original := vsc.DeepCopy() vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain - if err := c.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil { + err := kubeutil.PatchResource(original, &vsc, b.kbClient) + if err != nil { logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error()) return } defer func() { logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name) - err := c.recreateVolumeSnapshotContent(vsc) + err := b.recreateVolumeSnapshotContent(vsc) if err != nil { logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error()) } @@ -1083,7 +1041,7 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api. // Delete VolumeSnapshot from cluster logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name) - err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{}) + err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(b.ctx, vs.Name, metav1.DeleteOptions{}) if err != nil { logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error()) } @@ -1098,11 +1056,11 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api. // and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS. // Set VolumeSnapshotRef's UID to nil will let the csi-controller finds out the related VS is gone, then // VSC can be deleted. -func (c *backupController) recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent) error { +func (b *backupReconciler) recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent) error { timeout := 1 * time.Minute interval := 1 * time.Second - err := c.kbClient.Delete(context.TODO(), &vsc) + err := b.kbClient.Delete(context.TODO(), &vsc) if err != nil { return errors.Wrapf(err, "fail to delete VolumeSnapshotContent: %s", vsc.Name) } @@ -1110,7 +1068,7 @@ func (c *backupController) recreateVolumeSnapshotContent(vsc snapshotv1api.Volum // Check VolumeSnapshotContents is already deleted, before re-creating it. err = wait.PollImmediate(interval, timeout, func() (bool, error) { tmpVSC := &snapshotv1api.VolumeSnapshotContent{} - err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC) + err := b.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC) if err != nil { if apierrors.IsNotFound(err) { return true, nil @@ -1139,7 +1097,7 @@ func (c *backupController) recreateVolumeSnapshotContent(vsc snapshotv1api.Volum } // ResourceVersion shouldn't exist for new creation. vsc.ResourceVersion = "" - err = c.kbClient.Create(context.TODO(), &vsc) + err = b.kbClient.Create(context.TODO(), &vsc) if err != nil { return errors.Wrapf(err, "fail to create VolumeSnapshotContent %s", vsc.Name) } diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 9c90da37e..9b31d2517 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -36,17 +36,17 @@ import ( "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/version" - clocks "k8s.io/utils/clock" + "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" + ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" pkgbackup "github.com/vmware-tanzu/velero/pkg/backup" "github.com/vmware-tanzu/velero/pkg/builder" "github.com/vmware-tanzu/velero/pkg/discovery" - "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" - informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions" "github.com/vmware-tanzu/velero/pkg/itemoperation" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/persistence" @@ -93,14 +93,6 @@ func TestProcessBackupNonProcessedItems(t *testing.T) { key string backup *velerov1api.Backup }{ - { - name: "bad key does not return error", - key: "bad/key/here", - }, - { - name: "backup not found in lister does not return error", - key: "nonexistent/backup", - }, { name: "FailedValidation backup is not processed", key: "velero/backup-1", @@ -127,21 +119,19 @@ func TestProcessBackupNonProcessedItems(t *testing.T) { t.Run(test.name, func(t *testing.T) { formatFlag := logging.FormatText var ( - sharedInformers = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) ) - c := &backupController{ - genericController: newGenericController("backup-test", logger), - lister: sharedInformers.Velero().V1().Backups().Lister(), - formatFlag: formatFlag, + c := &backupReconciler{ + kbClient: velerotest.NewFakeControllerRuntimeClient(t), + formatFlag: formatFlag, + logger: logger, } - if test.backup != nil { - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(test.backup)) + require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) } - - err := c.processBackup(test.key) + actualResult, err := c.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}}) + assert.Equal(t, actualResult, ctrl.Result{}) assert.Nil(t, err) // Any backup that would actually proceed to validation will cause a segfault because this @@ -197,9 +187,7 @@ func TestProcessBackupValidationFailures(t *testing.T) { t.Run(test.name, func(t *testing.T) { formatFlag := logging.FormatText var ( - clientset = fake.NewSimpleClientset(test.backup) - sharedInformers = informers.NewSharedInformerFactory(clientset, 0) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) ) apiServer := velerotest.NewAPIServer(t) @@ -213,24 +201,23 @@ func TestProcessBackupValidationFailures(t *testing.T) { fakeClient = velerotest.NewFakeControllerRuntimeClient(t) } - c := &backupController{ - genericController: newGenericController("backup-test", logger), - discoveryHelper: discoveryHelper, - client: clientset.VeleroV1(), - lister: sharedInformers.Velero().V1().Backups().Lister(), - kbClient: fakeClient, - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), - defaultBackupLocation: defaultBackupLocation.Name, - clock: &clocks.RealClock{}, - formatFlag: formatFlag, + c := &backupReconciler{ + logger: logger, + discoveryHelper: discoveryHelper, + kbClient: fakeClient, + defaultBackupLocation: defaultBackupLocation.Name, + clock: &clock.RealClock{}, + formatFlag: formatFlag, } require.NotNil(t, test.backup) - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(test.backup)) + require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) - require.NoError(t, c.processBackup(fmt.Sprintf("%s/%s", test.backup.Namespace, test.backup.Name))) - - res, err := clientset.VeleroV1().Backups(test.backup.Namespace).Get(context.TODO(), test.backup.Name, metav1.GetOptions{}) + actualResult, err := c.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}}) + assert.Equal(t, actualResult, ctrl.Result{}) + assert.Nil(t, err) + res := &velerov1api.Backup{} + err = c.kbClient.Get(context.Background(), kbclient.ObjectKey{Namespace: test.backup.Namespace, Name: test.backup.Name}, res) require.NoError(t, err) assert.Equal(t, velerov1api.BackupPhaseFailedValidation, res.Status.Phase) @@ -270,26 +257,20 @@ func TestBackupLocationLabel(t *testing.T) { formatFlag := logging.FormatText var ( - clientset = fake.NewSimpleClientset(test.backup) - sharedInformers = informers.NewSharedInformerFactory(clientset, 0) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) ) apiServer := velerotest.NewAPIServer(t) discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, logger) require.NoError(t, err) - c := &backupController{ - genericController: newGenericController("backup-test", logger), - discoveryHelper: discoveryHelper, - client: clientset.VeleroV1(), - lister: sharedInformers.Velero().V1().Backups().Lister(), - kbClient: fakeClient, - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), - defaultBackupLocation: test.backupLocation.Name, - clock: &clocks.RealClock{}, - formatFlag: formatFlag, + c := &backupReconciler{ + discoveryHelper: discoveryHelper, + kbClient: fakeClient, + defaultBackupLocation: test.backupLocation.Name, + clock: &clock.RealClock{}, + formatFlag: formatFlag, } res := c.prepareBackupRequest(test.backup, logger) @@ -360,10 +341,9 @@ func Test_prepareBackupRequest_BackupStorageLocation(t *testing.T) { t.Run(test.name, func(t *testing.T) { // Arrange var ( - formatFlag = logging.FormatText - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) - apiServer = velerotest.NewAPIServer(t) - sharedInformers = informers.NewSharedInformerFactory(apiServer.VeleroClient, 0) + formatFlag = logging.FormatText + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + apiServer = velerotest.NewAPIServer(t) ) // objects that should init with client @@ -379,15 +359,13 @@ func Test_prepareBackupRequest_BackupStorageLocation(t *testing.T) { discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, logger) require.NoError(t, err) - c := &backupController{ - genericController: newGenericController("backup-test", logger), - discoveryHelper: discoveryHelper, - defaultBackupLocation: defaultBackupLocation, - kbClient: fakeClient, - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), - defaultBackupTTL: defaultBackupTTL.Duration, - clock: testclocks.NewFakeClock(now), - formatFlag: formatFlag, + c := &backupReconciler{ + discoveryHelper: discoveryHelper, + defaultBackupLocation: defaultBackupLocation, + kbClient: fakeClient, + defaultBackupTTL: defaultBackupTTL.Duration, + clock: testclocks.NewFakeClock(now), + formatFlag: formatFlag, } test.backup.Spec.StorageLocation = test.backupLocationNameInBackup @@ -443,10 +421,8 @@ func TestDefaultBackupTTL(t *testing.T) { for _, test := range tests { formatFlag := logging.FormatText var ( - clientset = fake.NewSimpleClientset(test.backup) - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) - sharedInformers = informers.NewSharedInformerFactory(clientset, 0) + fakeClient kbclient.Client + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) ) t.Run(test.name, func(t *testing.T) { @@ -454,15 +430,19 @@ func TestDefaultBackupTTL(t *testing.T) { apiServer := velerotest.NewAPIServer(t) discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, logger) require.NoError(t, err) - - c := &backupController{ - genericController: newGenericController("backup-test", logger), - discoveryHelper: discoveryHelper, - kbClient: fakeClient, - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), - defaultBackupTTL: defaultBackupTTL.Duration, - clock: testclocks.NewFakeClock(now), - formatFlag: formatFlag, + // add the test's backup storage location if it's different than the default + if test.backupLocation != nil { + fakeClient = velerotest.NewFakeControllerRuntimeClient(t, test.backupLocation) + } else { + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + } + c := &backupReconciler{ + logger: logger, + discoveryHelper: discoveryHelper, + kbClient: fakeClient, + defaultBackupTTL: defaultBackupTTL.Duration, + clock: testclocks.NewFakeClock(now), + formatFlag: formatFlag, } res := c.prepareBackupRequest(test.backup, logger) @@ -548,24 +528,19 @@ func TestDefaultVolumesToResticDeprecation(t *testing.T) { formatFlag := logging.FormatText var ( - clientset = fake.NewSimpleClientset(test.backup) - sharedInformers = informers.NewSharedInformerFactory(clientset, 0) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) ) apiServer := velerotest.NewAPIServer(t) discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, logger) require.NoError(t, err) - c := &backupController{ - genericController: newGenericController("backup-test", logger), + c := &backupReconciler{ + logger: logger, discoveryHelper: discoveryHelper, - client: clientset.VeleroV1(), - lister: sharedInformers.Velero().V1().Backups().Lister(), kbClient: fakeClient, - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), - clock: &clocks.RealClock{}, + clock: &clock.RealClock{}, formatFlag: formatFlag, defaultVolumesToFsBackup: test.globalVal, } @@ -1018,12 +993,10 @@ func TestProcessBackupCompletions(t *testing.T) { t.Run(test.name, func(t *testing.T) { formatFlag := logging.FormatText var ( - clientset = fake.NewSimpleClientset(test.backup) - sharedInformers = informers.NewSharedInformerFactory(clientset, 0) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) - pluginManager = new(pluginmocks.Manager) - backupStore = new(persistencemocks.BackupStore) - backupper = new(fakeBackupper) + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + pluginManager = new(pluginmocks.Manager) + backupStore = new(persistencemocks.BackupStore) + backupper = new(fakeBackupper) ) var fakeClient kbclient.Client @@ -1051,13 +1024,10 @@ func TestProcessBackupCompletions(t *testing.T) { discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, logger) require.NoError(t, err) - c := &backupController{ - genericController: newGenericController("backup-test", logger), + c := &backupReconciler{ + logger: logger, discoveryHelper: discoveryHelper, - client: clientset.VeleroV1(), - lister: sharedInformers.Velero().V1().Backups().Lister(), kbClient: fakeClient, - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), defaultBackupLocation: defaultBackupLocation.Name, defaultVolumesToFsBackup: test.defaultVolumesToFsBackup, backupTracker: NewBackupTracker(), @@ -1091,18 +1061,21 @@ func TestProcessBackupCompletions(t *testing.T) { // add the test's backup to the informer/lister store require.NotNil(t, test.backup) - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(test.backup)) + + require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) // add the default backup storage location to the clientset and the informer/lister store require.NoError(t, fakeClient.Create(context.Background(), defaultBackupLocation)) - require.NoError(t, c.processBackup(fmt.Sprintf("%s/%s", test.backup.Namespace, test.backup.Name))) + actualResult, err := c.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}}) + assert.Equal(t, actualResult, ctrl.Result{}) + assert.Nil(t, err) - res, err := clientset.VeleroV1().Backups(test.backup.Namespace).Get(context.TODO(), test.backup.Name, metav1.GetOptions{}) + res := &velerov1api.Backup{} + err = c.kbClient.Get(context.Background(), kbclient.ObjectKey{Namespace: test.backup.Namespace, Name: test.backup.Name}, res) require.NoError(t, err) - + res.ResourceVersion = "" assert.Equal(t, test.expectedResult, res) - // reset defaultBackupLocation resourceVersion defaultBackupLocation.ObjectMeta.ResourceVersion = "" }) @@ -1127,7 +1100,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) { builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "aws-us-west-1").Provider("aws").Result(), builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "some-name").Provider("fake-provider").Result(), }, - expectedErrors: "a VolumeSnapshotLocation CRD for the location random-name with the name specified in the backup spec needs to be created before this snapshot can be executed. Error: volumesnapshotlocation.velero.io \"random-name\" not found", expectedSuccess: false, + expectedErrors: "a VolumeSnapshotLocation CRD for the location random-name with the name specified in the backup spec needs to be created before this snapshot can be executed. Error: volumesnapshotlocations.velero.io \"random-name\" not found", expectedSuccess: false, }, { name: "duplicate locationName per provider: should filter out dups", @@ -1241,22 +1214,20 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) { t.Run(test.name, func(t *testing.T) { formatFlag := logging.FormatText var ( - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) + logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag) ) - c := &backupController{ - genericController: newGenericController("backup-test", logger), - snapshotLocationLister: sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(), + c := &backupReconciler{ + logger: logger, defaultSnapshotLocations: test.defaultLocations, + kbClient: velerotest.NewFakeControllerRuntimeClient(t), } // set up a Backup object to represent what we expect to be passed to backupper.Backup() backup := test.backup.DeepCopy() backup.Spec.VolumeSnapshotLocations = test.backup.Spec.VolumeSnapshotLocations for _, location := range test.locations { - require.NoError(t, sharedInformers.Velero().V1().VolumeSnapshotLocations().Informer().GetStore().Add(location)) + require.NoError(t, c.kbClient.Create(context.Background(), location)) } providerLocations, errs := c.validateAndGetSnapshotLocations(backup) @@ -1287,7 +1258,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) { // the completion timestamp of the most recent completed backup for each schedule, including an entry for ad-hoc // or non-scheduled backups. func Test_getLastSuccessBySchedule(t *testing.T) { - buildBackup := func(phase velerov1api.BackupPhase, completion time.Time, schedule string) *velerov1api.Backup { + buildBackup := func(phase velerov1api.BackupPhase, completion time.Time, schedule string) velerov1api.Backup { b := builder.ForBackup("", ""). ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, schedule)). Phase(phase) @@ -1296,7 +1267,7 @@ func Test_getLastSuccessBySchedule(t *testing.T) { b.CompletionTimestamp(completion) } - return b.Result() + return *b.Result() } // create a static "base time" that can be used to easily construct completion timestamps @@ -1306,7 +1277,7 @@ func Test_getLastSuccessBySchedule(t *testing.T) { tests := []struct { name string - backups []*velerov1api.Backup + backups []velerov1api.Backup want map[string]time.Time }{ { @@ -1316,12 +1287,12 @@ func Test_getLastSuccessBySchedule(t *testing.T) { }, { name: "when backups is empty, an empty map is returned", - backups: []*velerov1api.Backup{}, + backups: []velerov1api.Backup{}, want: map[string]time.Time{}, }, { name: "when multiple completed backups for a schedule exist, the latest one is returned", - backups: []*velerov1api.Backup{ + backups: []velerov1api.Backup{ buildBackup(velerov1api.BackupPhaseCompleted, baseTime, "schedule-1"), buildBackup(velerov1api.BackupPhaseCompleted, baseTime.Add(time.Second), "schedule-1"), buildBackup(velerov1api.BackupPhaseCompleted, baseTime.Add(-time.Second), "schedule-1"), @@ -1332,7 +1303,7 @@ func Test_getLastSuccessBySchedule(t *testing.T) { }, { name: "when the most recent backup for a schedule is Failed, the timestamp of the most recent Completed one is returned", - backups: []*velerov1api.Backup{ + backups: []velerov1api.Backup{ buildBackup(velerov1api.BackupPhaseCompleted, baseTime, "schedule-1"), buildBackup(velerov1api.BackupPhaseFailed, baseTime.Add(time.Second), "schedule-1"), buildBackup(velerov1api.BackupPhaseCompleted, baseTime.Add(-time.Second), "schedule-1"), @@ -1343,7 +1314,7 @@ func Test_getLastSuccessBySchedule(t *testing.T) { }, { name: "when there are no Completed backups for a schedule, it's not returned", - backups: []*velerov1api.Backup{ + backups: []velerov1api.Backup{ buildBackup(velerov1api.BackupPhaseInProgress, baseTime, "schedule-1"), buildBackup(velerov1api.BackupPhaseFailed, baseTime.Add(time.Second), "schedule-1"), buildBackup(velerov1api.BackupPhasePartiallyFailed, baseTime.Add(-time.Second), "schedule-1"), @@ -1352,7 +1323,7 @@ func Test_getLastSuccessBySchedule(t *testing.T) { }, { name: "when backups exist without a schedule, the most recent Completed one is returned", - backups: []*velerov1api.Backup{ + backups: []velerov1api.Backup{ buildBackup(velerov1api.BackupPhaseCompleted, baseTime, ""), buildBackup(velerov1api.BackupPhaseFailed, baseTime.Add(time.Second), ""), buildBackup(velerov1api.BackupPhaseCompleted, baseTime.Add(-time.Second), ""), @@ -1363,7 +1334,7 @@ func Test_getLastSuccessBySchedule(t *testing.T) { }, { name: "when backups exist for multiple schedules, the most recent Completed timestamp for each schedule is returned", - backups: []*velerov1api.Backup{ + backups: []velerov1api.Backup{ // ad-hoc backups (no schedule) buildBackup(velerov1api.BackupPhaseCompleted, baseTime.Add(30*time.Minute), ""), buildBackup(velerov1api.BackupPhaseFailed, baseTime.Add(time.Hour), ""), @@ -1404,7 +1375,7 @@ func TestDeleteVolumeSnapshot(t *testing.T) { name string vsArray []snapshotv1api.VolumeSnapshot vscArray []snapshotv1api.VolumeSnapshotContent - expectedVSArray []snapshotv1api.VolumeSnapshot + expectedVSArray []*snapshotv1api.VolumeSnapshot expectedVSCArray []snapshotv1api.VolumeSnapshotContent }{ { @@ -1415,7 +1386,7 @@ func TestDeleteVolumeSnapshot(t *testing.T) { vscArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), }, - expectedVSArray: []snapshotv1api.VolumeSnapshot{}, + expectedVSArray: []*snapshotv1api.VolumeSnapshot{}, expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(), }, @@ -1426,8 +1397,8 @@ func TestDeleteVolumeSnapshot(t *testing.T) { *builder.ForVolumeSnapshot("velero", "vs1").ObjectMeta(builder.WithLabels("testing-vs", "vs1")).Status().BoundVolumeSnapshotContentName("vsc1").Result(), }, vscArray: []snapshotv1api.VolumeSnapshotContent{}, - expectedVSArray: []snapshotv1api.VolumeSnapshot{ - *builder.ForVolumeSnapshot("velero", "vs1").Status().BoundVolumeSnapshotContentName("vsc1").Result(), + expectedVSArray: []*snapshotv1api.VolumeSnapshot{ + builder.ForVolumeSnapshot("velero", "vs1").Status().BoundVolumeSnapshotContentName("vsc1").Result(), }, expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{}, }, @@ -1439,7 +1410,7 @@ func TestDeleteVolumeSnapshot(t *testing.T) { vscArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), }, - expectedVSArray: []snapshotv1api.VolumeSnapshot{}, + expectedVSArray: []*snapshotv1api.VolumeSnapshot{}, expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), }, @@ -1448,27 +1419,27 @@ func TestDeleteVolumeSnapshot(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - fakeClient := velerotest.NewFakeControllerRuntimeClientBuilder(t).WithLists( - &snapshotv1api.VolumeSnapshotContentList{Items: tc.vscArray}, - ).Build() - - vsClient := snapshotfake.NewSimpleClientset(&tc.vsArray[0]) - sharedInformers := snapshotinformers.NewSharedInformerFactory(vsClient, 0) + var ( + fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).WithLists( + &snapshotv1api.VolumeSnapshotContentList{Items: tc.vscArray}, + ).Build() + vsClient = snapshotfake.NewSimpleClientset() + sharedInformers = snapshotinformers.NewSharedInformerFactory(vsClient, 0) + ) + c := &backupReconciler{ + kbClient: fakeClient, + volumeSnapshotLister: sharedInformers.Snapshot().V1().VolumeSnapshots().Lister(), + volumeSnapshotClient: vsClient, + } for _, vs := range tc.vsArray { - sharedInformers.Snapshot().V1().VolumeSnapshots().Informer().GetStore().Add(vs) + _, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Create(context.Background(), &vs, metav1.CreateOptions{}) + require.NoError(t, err) } - logger := logging.DefaultLogger(logrus.DebugLevel, logging.FormatText) - c := &backupController{ - kbClient: fakeClient, - volumeSnapshotClient: vsClient, - volumeSnapshotLister: sharedInformers.Snapshot().V1().VolumeSnapshots().Lister(), - } c.deleteVolumeSnapshot(tc.vsArray, tc.vscArray, logger) - - vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.TODO(), metav1.ListOptions{}) + vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) assert.Equal(t, len(tc.expectedVSArray), len(vsList.Items)) for index := range tc.expectedVSArray { diff --git a/pkg/controller/backup_finalizer_controller.go b/pkg/controller/backup_finalizer_controller.go index c8db402ba..562efc965 100644 --- a/pkg/controller/backup_finalizer_controller.go +++ b/pkg/controller/backup_finalizer_controller.go @@ -73,6 +73,7 @@ func NewBackupFinalizerReconciler( // +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=backups/status,verbs=get;update;patch + func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.log.WithFields(logrus.Fields{ "controller": "backup-finalizer", diff --git a/pkg/controller/backup_storage_location_controller.go b/pkg/controller/backup_storage_location_controller.go index 793347dec..26cc1e734 100644 --- a/pkg/controller/backup_storage_location_controller.go +++ b/pkg/controller/backup_storage_location_controller.go @@ -79,6 +79,7 @@ func NewBackupStorageLocationReconciler( // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations/status,verbs=get;update;patch + func (r *backupStorageLocationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var unavailableErrors []string var location velerov1api.BackupStorageLocation diff --git a/pkg/controller/constants.go b/pkg/controller/constants.go index 2d051e0b5..f13edc033 100644 --- a/pkg/controller/constants.go +++ b/pkg/controller/constants.go @@ -21,13 +21,13 @@ const ( Backup = "backup" BackupDeletion = "backup-deletion" BackupFinalizer = "backup-finalizer" + BackupRepo = "backup-repo" BackupStorageLocation = "backup-storage-location" BackupSync = "backup-sync" DownloadRequest = "download-request" GarbageCollection = "gc" PodVolumeBackup = "pod-volume-backup" PodVolumeRestore = "pod-volume-restore" - BackupRepo = "backup-repo" Restore = "restore" Schedule = "schedule" ServerStatusRequest = "server-status-request" diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go index 45491aba6..2eaa13f43 100644 --- a/pkg/controller/download_request_controller.go +++ b/pkg/controller/download_request_controller.go @@ -68,6 +68,7 @@ func NewDownloadRequestReconciler( // +kubebuilder:rbac:groups=velero.io,resources=downloadrequests,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=downloadrequests/status,verbs=get;update;patch + func (r *downloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.log.WithFields(logrus.Fields{ "controller": "download-request", diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index 9aab68580..3d1fe48c7 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -96,6 +96,7 @@ func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error { // +kubebuilder:rbac:groups=velero.io,resources=deletebackuprequests,verbs=get;list;watch;create; // +kubebuilder:rbac:groups=velero.io,resources=deletebackuprequests/status,verbs=get // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get + func (c *gcReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := c.logger.WithField("gc backup", req.String()) log.Debug("gcController getting backup") diff --git a/pkg/controller/generic_controller.go b/pkg/controller/generic_controller.go deleted file mode 100644 index b62e03655..000000000 --- a/pkg/controller/generic_controller.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2018 the Velero contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" -) - -type genericController struct { - name string - queue workqueue.RateLimitingInterface - logger logrus.FieldLogger - syncHandler func(key string) error - resyncFunc func() - resyncPeriod time.Duration - cacheSyncWaiters []cache.InformerSynced -} - -func newGenericController(name string, logger logrus.FieldLogger) *genericController { - c := &genericController{ - name: name, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), - logger: logger.WithField("controller", name), - } - - return c -} - -// Run is a blocking function that runs the specified number of worker goroutines -// to process items in the work queue. It will return when it receives on the -// ctx.Done() channel. -func (c *genericController) Run(ctx context.Context, numWorkers int) error { - if c.syncHandler == nil && c.resyncFunc == nil { - // programmer error - panic("at least one of syncHandler or resyncFunc is required") - } - - var wg sync.WaitGroup - - defer func() { - c.logger.Info("Waiting for workers to finish their work") - - c.queue.ShutDown() - - // We have to wait here in the deferred function instead of at the bottom of the function body - // because we have to shut down the queue in order for the workers to shut down gracefully, and - // we want to shut down the queue via defer and not at the end of the body. - wg.Wait() - - c.logger.Info("All workers have finished") - - }() - - c.logger.Info("Starting controller") - defer c.logger.Info("Shutting down controller") - - // only want to log about cache sync waiters if there are any - if len(c.cacheSyncWaiters) > 0 { - c.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), c.cacheSyncWaiters...) { - return errors.New("timed out waiting for caches to sync") - } - c.logger.Info("Caches are synced") - } - - if c.syncHandler != nil { - wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - go func() { - wait.Until(c.runWorker, time.Second, ctx.Done()) - wg.Done() - }() - } - } - - if c.resyncFunc != nil { - if c.resyncPeriod == 0 { - // Programmer error - panic("non-zero resyncPeriod is required") - } - - wg.Add(1) - go func() { - wait.Until(c.resyncFunc, c.resyncPeriod, ctx.Done()) - wg.Done() - }() - } - - <-ctx.Done() - - return nil -} - -func (c *genericController) runWorker() { - // continually take items off the queue (waits if it's - // empty) until we get a shutdown signal from the queue - for c.processNextWorkItem() { - } -} - -func (c *genericController) processNextWorkItem() bool { - key, quit := c.queue.Get() - if quit { - return false - } - // always call done on this item, since if it fails we'll add - // it back with rate-limiting below - defer c.queue.Done(key) - - err := c.syncHandler(key.(string)) - if err == nil { - // If you had no error, tell the queue to stop tracking history for your key. This will reset - // things like failure counts for per-item rate limiting. - c.queue.Forget(key) - return true - } - - c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") - // we had an error processing the item so add it back - // into the queue for re-processing with rate-limiting - c.queue.AddRateLimited(key) - - return true -} diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 6b149b15f..f0c480716 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -67,6 +67,7 @@ type BackupProgressUpdater struct { // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups/status,verbs=get;update;patch + func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.Log.WithFields(logrus.Fields{ "controller": "podvolumebackup", diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 61efab377..4e9ad7293 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -547,7 +547,7 @@ func (r *restoreReconciler) updateTotalRestoreMetric() { // Wait for 5 seconds to let controller-runtime to setup k8s clients. time.Sleep(5 * time.Second) - wait.NonSlidingUntil( + wait.Until( func() { // recompute restore_total metric restoreList := &api.RestoreList{} diff --git a/pkg/controller/server_status_request_controller.go b/pkg/controller/server_status_request_controller.go index 1aea5db58..e9f0bf607 100644 --- a/pkg/controller/server_status_request_controller.go +++ b/pkg/controller/server_status_request_controller.go @@ -74,6 +74,7 @@ func NewServerStatusRequestReconciler( // +kubebuilder:rbac:groups=velero.io,resources=serverstatusrequests,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=serverstatusrequests/status,verbs=get;update;patch + func (r *serverStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.log.WithFields(logrus.Fields{ "controller": ServerStatusRequest, diff --git a/pkg/podvolume/backupper_factory.go b/pkg/podvolume/backupper_factory.go index 7b87865e2..4092e7a06 100644 --- a/pkg/podvolume/backupper_factory.go +++ b/pkg/podvolume/backupper_factory.go @@ -38,35 +38,34 @@ type BackupperFactory interface { NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error) } -func NewBackupperFactory(repoLocker *repository.RepoLocker, +func NewBackupperFactory( + repoLocker *repository.RepoLocker, repoEnsurer *repository.RepositoryEnsurer, veleroClient clientset.Interface, pvcClient corev1client.PersistentVolumeClaimsGetter, pvClient corev1client.PersistentVolumesGetter, podClient corev1client.PodsGetter, - repoInformerSynced cache.InformerSynced, - log logrus.FieldLogger) BackupperFactory { + log logrus.FieldLogger, +) BackupperFactory { return &backupperFactory{ - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - veleroClient: veleroClient, - pvcClient: pvcClient, - pvClient: pvClient, - podClient: podClient, - repoInformerSynced: repoInformerSynced, - log: log, + repoLocker: repoLocker, + repoEnsurer: repoEnsurer, + veleroClient: veleroClient, + pvcClient: pvcClient, + pvClient: pvClient, + podClient: podClient, + log: log, } } type backupperFactory struct { - repoLocker *repository.RepoLocker - repoEnsurer *repository.RepositoryEnsurer - veleroClient clientset.Interface - pvcClient corev1client.PersistentVolumeClaimsGetter - pvClient corev1client.PersistentVolumesGetter - podClient corev1client.PodsGetter - repoInformerSynced cache.InformerSynced - log logrus.FieldLogger + repoLocker *repository.RepoLocker + repoEnsurer *repository.RepositoryEnsurer + veleroClient clientset.Interface + pvcClient corev1client.PersistentVolumeClaimsGetter + pvClient corev1client.PersistentVolumesGetter + podClient corev1client.PodsGetter + log logrus.FieldLogger } func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) { @@ -83,7 +82,7 @@ func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1ap b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.podClient, uploaderType, bf.log) go informer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, bf.repoInformerSynced) { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { return nil, errors.New("timed out waiting for caches to sync") } diff --git a/pkg/podvolume/restorer_factory.go b/pkg/podvolume/restorer_factory.go index e4f2ce9c1..7813b0ff5 100644 --- a/pkg/podvolume/restorer_factory.go +++ b/pkg/podvolume/restorer_factory.go @@ -45,29 +45,26 @@ func NewRestorerFactory(repoLocker *repository.RepoLocker, pvcClient corev1client.PersistentVolumeClaimsGetter, podClient corev1client.PodsGetter, kubeClient kubernetes.Interface, - repoInformerSynced cache.InformerSynced, log logrus.FieldLogger) RestorerFactory { return &restorerFactory{ - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - veleroClient: veleroClient, - pvcClient: pvcClient, - podClient: podClient, - kubeClient: kubeClient, - repoInformerSynced: repoInformerSynced, - log: log, + repoLocker: repoLocker, + repoEnsurer: repoEnsurer, + veleroClient: veleroClient, + pvcClient: pvcClient, + podClient: podClient, + kubeClient: kubeClient, + log: log, } } type restorerFactory struct { - repoLocker *repository.RepoLocker - repoEnsurer *repository.RepositoryEnsurer - veleroClient clientset.Interface - pvcClient corev1client.PersistentVolumeClaimsGetter - podClient corev1client.PodsGetter - kubeClient kubernetes.Interface - repoInformerSynced cache.InformerSynced - log logrus.FieldLogger + repoLocker *repository.RepoLocker + repoEnsurer *repository.RepositoryEnsurer + veleroClient clientset.Interface + pvcClient corev1client.PersistentVolumeClaimsGetter + podClient corev1client.PodsGetter + kubeClient kubernetes.Interface + log logrus.FieldLogger } func (rf *restorerFactory) NewRestorer(ctx context.Context, restore *velerov1api.Restore) (Restorer, error) { @@ -84,7 +81,7 @@ func (rf *restorerFactory) NewRestorer(ctx context.Context, restore *velerov1api r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, informer, rf.veleroClient, rf.pvcClient, rf.podClient, rf.kubeClient, rf.log) go informer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rf.repoInformerSynced) { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { return nil, errors.New("timed out waiting for cache to sync") }