From 6a7beaf5ce330d37c711a60761f63ef1b10a053a Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Thu, 12 Mar 2020 13:32:58 -0400 Subject: [PATCH] Plumb CSI listers through to backup controller Account for having CSI enabled or not, as well as having the snapshot CRDs installed in the kubernetes cluster. Signed-off-by: Nolan Brubaker --- pkg/cmd/server/server.go | 113 ++++++++++++++++++---------- pkg/controller/backup_controller.go | 94 ++++++++++++----------- 2 files changed, 122 insertions(+), 85 deletions(-) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 0c518d8a7..43699a955 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -43,8 +43,10 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - snapshotter "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned" - snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/typed/volumesnapshot/v1beta1" + snapshotv1beta1api "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1" + snapshotvebeta1client "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned" + snapshotvebeta1informers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/informers/externalversions" + snapshotv1beta1listers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1" api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/backup" @@ -215,24 +217,25 @@ func NewCommand(f client.Factory) *cobra.Command { } type server struct { - namespace string - metricsAddress string - kubeClientConfig *rest.Config - kubeClient kubernetes.Interface - veleroClient clientset.Interface - csiSnapClient snapshotter.Interface - discoveryClient discovery.DiscoveryInterface - discoveryHelper velerodiscovery.Helper - dynamicClient dynamic.Interface - sharedInformerFactory informers.SharedInformerFactory - ctx context.Context - cancelFunc context.CancelFunc - logger logrus.FieldLogger - logLevel logrus.Level - pluginRegistry clientmgmt.Registry - resticManager restic.RepositoryManager - metrics *metrics.ServerMetrics - config serverConfig + namespace string + metricsAddress string + kubeClientConfig *rest.Config + kubeClient kubernetes.Interface + veleroClient clientset.Interface + csiSnapClient snapshotvebeta1client.Interface + discoveryClient discovery.DiscoveryInterface + discoveryHelper velerodiscovery.Helper + dynamicClient dynamic.Interface + sharedInformerFactory informers.SharedInformerFactory + snapshotterSharedInformerFactory snapshotvebeta1informers.SharedInformerFactory + ctx context.Context + cancelFunc context.CancelFunc + logger logrus.FieldLogger + logLevel logrus.Level + pluginRegistry clientmgmt.Registry + resticManager restic.RepositoryManager + metrics *metrics.ServerMetrics + config serverConfig } func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*server, error) { @@ -270,9 +273,19 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s clientConfig, err := f.ClientConfig() if err != nil { + cancelFunc() return nil, err } + var csiSnapClient *snapshotvebeta1client.Clientset + if features.IsEnabled("EnableCSI") { + csiSnapClient, err = snapshotvebeta1client.NewForConfig(clientConfig) + if err != nil { + cancelFunc() + return nil, err + } + } + s := &server{ namespace: f.Namespace(), metricsAddress: config.metricsAddress, @@ -282,12 +295,15 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s discoveryClient: veleroClient.Discovery(), dynamicClient: dynamicClient, sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(veleroClient, 0, informers.WithNamespace(f.Namespace())), - ctx: ctx, - cancelFunc: cancelFunc, - logger: logger, - logLevel: logger.Level, - pluginRegistry: pluginRegistry, - config: config, + // If no namespace is specified, all namespaces are watched. + // This is desirable for VolumeSnapshots, as we want to query for all VolumeSnapshots across all namespaces using this informer + snapshotterSharedInformerFactory: snapshotvebeta1informers.NewSharedInformerFactoryWithOptions(csiSnapClient, 0), + ctx: ctx, + cancelFunc: cancelFunc, + logger: logger, + logLevel: logger.Level, + pluginRegistry: pluginRegistry, + config: config, } return s, nil @@ -588,6 +604,27 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string ) cmd.CheckError(err) + // Make empty listers that will only be populated if CSI is properly enabled. + var vsLister snapshotv1beta1listers.VolumeSnapshotLister + var vscLister snapshotv1beta1listers.VolumeSnapshotContentLister + + // We don't care about the returned types, just whether or not there's a 404 error. + _, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1beta1api.SchemeGroupVersion.String()) + switch { + case apierrors.IsNotFound(err) && !features.IsEnabled("EnableCSI"): + // Normal operating mode - CSI isn't enabled, so don't actually error + case apierrors.IsNotFound(err) && features.IsEnabled("EnableCSI"): + // CSI is enabled, but the required CRDs aren't installed, so halt. + s.logger.Fatalf("The 'EnableCSI' feature flag was specified, but CSI API group [%s] was not found.", snapshotv1beta1api.SchemeGroupVersion.String()) + case err == nil && features.IsEnabled("EnableCSI"): + // CSI is enabled, and the resources were found. + // Instantiate the listers fully + vsLister = s.snapshotterSharedInformerFactory.Snapshot().V1beta1().VolumeSnapshots().Lister() + vscLister = s.snapshotterSharedInformerFactory.Snapshot().V1beta1().VolumeSnapshotContents().Lister() + case err != nil: + cmd.CheckError(err) + } + backupController := controller.NewBackupController( s.sharedInformerFactory.Velero().V1().Backups(), s.veleroClient.VeleroV1(), @@ -605,6 +642,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string defaultVolumeSnapshotLocations, s.metrics, s.config.formatFlag.Parse(), + vsLister, + vscLister, ) return controllerRunInfo{ @@ -793,10 +832,17 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string // start the informers & and wait for the caches to sync s.sharedInformerFactory.Start(ctx.Done()) + s.snapshotterSharedInformerFactory.Start(ctx.Done()) s.logger.Info("Waiting for informer caches to sync") cacheSyncResults := s.sharedInformerFactory.WaitForCacheSync(ctx.Done()) + csiCacheSyncResults := s.snapshotterSharedInformerFactory.WaitForCacheSync(ctx.Done()) s.logger.Info("Done waiting for informer caches to sync") + // Append our CSI informer types into the larger list of caches, so we can check them all at once + for informer, synced := range csiCacheSyncResults { + cacheSyncResults[informer] = synced + } + for informer, synced := range cacheSyncResults { if !synced { return errors.Errorf("cache was not synced for informer %v", informer) @@ -837,18 +883,3 @@ func (s *server) runProfiler() { s.logger.WithError(errors.WithStack(err)).Error("error running profiler http server") } } - -// Helper methods that only return clients if the user has turned on CSI. -func volumeSnapshotClientIfCSIEnabled(i snapshotter.Interface) snapshotv1beta1.VolumeSnapshotInterface { - if features.IsEnabled("EnableCSI") { - return i.SnapshotV1beta1().VolumeSnapshots("") // is this correct? we need the namespace? Could we get this lazily later, via the backup? - } - return nil -} - -func voluemSnapshotContentClientIfCSIEnabled(i snapshotter.Interface) snapshotv1beta1.VolumeSnapshotContentInterface { - if features.IsEnabled("EnableCSI") { - return i.SnapshotV1beta1().VolumeSnapshotContents() - } - return nil -} diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 7bb9be83e..705edd5d2 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -38,7 +38,9 @@ import ( "k8s.io/client-go/tools/cache" snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/typed/volumesnapshot/v1beta1" + snapshotv1beta1listers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1" + v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" pkgbackup "github.com/vmware-tanzu/velero/pkg/backup" "github.com/vmware-tanzu/velero/pkg/discovery" @@ -59,23 +61,25 @@ import ( type backupController struct { *genericController - discoveryHelper discovery.Helper - backupper pkgbackup.Backupper - lister velerov1listers.BackupLister - client velerov1client.BackupsGetter - csiClient snapshotv1beta1.SnapshotV1beta1Interface - clock clock.Clock - backupLogLevel logrus.Level - newPluginManager func(logrus.FieldLogger) clientmgmt.Manager - backupTracker BackupTracker - backupLocationLister velerov1listers.BackupStorageLocationLister - defaultBackupLocation string - defaultBackupTTL time.Duration - snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister - defaultSnapshotLocations map[string]string - metrics *metrics.ServerMetrics - newBackupStore func(*velerov1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) - formatFlag logging.Format + discoveryHelper discovery.Helper + backupper pkgbackup.Backupper + lister velerov1listers.BackupLister + client velerov1client.BackupsGetter + csiClient snapshotv1beta1.SnapshotV1beta1Interface + clock clock.Clock + backupLogLevel logrus.Level + newPluginManager func(logrus.FieldLogger) clientmgmt.Manager + backupTracker BackupTracker + backupLocationLister velerov1listers.BackupStorageLocationLister + defaultBackupLocation string + defaultBackupTTL time.Duration + snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister + defaultSnapshotLocations map[string]string + metrics *metrics.ServerMetrics + newBackupStore func(*velerov1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) + formatFlag logging.Format + volumeSnapshotLister snapshotv1beta1listers.VolumeSnapshotLister + volumeSnapshotContentLister snapshotv1beta1listers.VolumeSnapshotContentLister } // TODO(nrb-csi): Add clients for the VS/VSContent here. @@ -97,27 +101,30 @@ func NewBackupController( defaultSnapshotLocations map[string]string, metrics *metrics.ServerMetrics, formatFlag logging.Format, + volumeSnapshotLister snapshotv1beta1listers.VolumeSnapshotLister, + volumeSnapshotContentLister snapshotv1beta1listers.VolumeSnapshotContentLister, ) Interface { c := &backupController{ - genericController: newGenericController("backup", logger), - discoveryHelper: discoveryHelper, - backupper: backupper, - lister: backupInformer.Lister(), - client: client, - csiClient: csiClient, - clock: &clock.RealClock{}, - backupLogLevel: backupLogLevel, - newPluginManager: newPluginManager, - backupTracker: backupTracker, - backupLocationLister: backupLocationLister, - defaultBackupLocation: defaultBackupLocation, - defaultBackupTTL: defaultBackupTTL, - snapshotLocationLister: volumeSnapshotLocationLister, - defaultSnapshotLocations: defaultSnapshotLocations, - metrics: metrics, - formatFlag: formatFlag, - - newBackupStore: persistence.NewObjectBackupStore, + genericController: newGenericController("backup", logger), + discoveryHelper: discoveryHelper, + backupper: backupper, + lister: backupInformer.Lister(), + client: client, + csiClient: csiClient, + clock: &clock.RealClock{}, + backupLogLevel: backupLogLevel, + newPluginManager: newPluginManager, + backupTracker: backupTracker, + backupLocationLister: backupLocationLister, + defaultBackupLocation: defaultBackupLocation, + defaultBackupTTL: defaultBackupTTL, + snapshotLocationLister: volumeSnapshotLocationLister, + defaultSnapshotLocations: defaultSnapshotLocations, + metrics: metrics, + formatFlag: formatFlag, + volumeSnapshotLister: volumeSnapshotLister, + volumeSnapshotContentLister: volumeSnapshotContentLister, + newBackupStore: persistence.NewObjectBackupStore, } c.syncHandler = c.processBackup @@ -576,17 +583,16 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error { backup.Status.Phase = velerov1api.BackupPhaseCompleted } - // TODO(nrb-csi): Fetch VS(C)s here, based on includes/exludes? + // TODO(nrb-csi): Only run listers methods if the listers aren't nil, just in case if features.IsEnabled("EnableCSI") { - selector := metav1.LabelSelector{ - MatchLabels: map[string]string{ - "velero.io/backup-name": backup.Name, - }, - } - // Need to iterate through included namespaces to get all VSs made by the backup - c.csiClient.VolumeSnapshots("").List(metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(&selector)}) + selector := labels.SelectorFromSet(map[string]string{v1.BackupNameLabel: backup.Name}) + + // TODO: (nrb-csi): assign return values, handle errors + c.volumeSnapshotLister.List(selector) + c.volumeSnapshotContentLister.List(selector) } + // TODO(nrb-csi): pass VS(C) in to persistBackup here if errs := persistBackup(backup, backupFile, logFile, backupStore, c.logger); len(errs) > 0 { fatalErrs = append(fatalErrs, errs...) }