Plumb CSI listers through to backup controller

Account for having CSI enabled or not, as well as having the snapshot
CRDs installed in the kubernetes cluster.

Signed-off-by: Nolan Brubaker <brubakern@vmware.com>
This commit is contained in:
Nolan Brubaker
2020-03-12 13:32:58 -04:00
parent 232e1d8927
commit 6a7beaf5ce
2 changed files with 122 additions and 85 deletions

View File

@@ -43,8 +43,10 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
snapshotter "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned"
snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/typed/volumesnapshot/v1beta1"
snapshotv1beta1api "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapshotvebeta1client "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned"
snapshotvebeta1informers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/informers/externalversions"
snapshotv1beta1listers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/backup"
@@ -215,24 +217,25 @@ func NewCommand(f client.Factory) *cobra.Command {
}
type server struct {
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
veleroClient clientset.Interface
csiSnapClient snapshotter.Interface
discoveryClient discovery.DiscoveryInterface
discoveryHelper velerodiscovery.Helper
dynamicClient dynamic.Interface
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry clientmgmt.Registry
resticManager restic.RepositoryManager
metrics *metrics.ServerMetrics
config serverConfig
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
veleroClient clientset.Interface
csiSnapClient snapshotvebeta1client.Interface
discoveryClient discovery.DiscoveryInterface
discoveryHelper velerodiscovery.Helper
dynamicClient dynamic.Interface
sharedInformerFactory informers.SharedInformerFactory
snapshotterSharedInformerFactory snapshotvebeta1informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry clientmgmt.Registry
resticManager restic.RepositoryManager
metrics *metrics.ServerMetrics
config serverConfig
}
func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*server, error) {
@@ -270,9 +273,19 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
clientConfig, err := f.ClientConfig()
if err != nil {
cancelFunc()
return nil, err
}
var csiSnapClient *snapshotvebeta1client.Clientset
if features.IsEnabled("EnableCSI") {
csiSnapClient, err = snapshotvebeta1client.NewForConfig(clientConfig)
if err != nil {
cancelFunc()
return nil, err
}
}
s := &server{
namespace: f.Namespace(),
metricsAddress: config.metricsAddress,
@@ -282,12 +295,15 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
discoveryClient: veleroClient.Discovery(),
dynamicClient: dynamicClient,
sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(veleroClient, 0, informers.WithNamespace(f.Namespace())),
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
logLevel: logger.Level,
pluginRegistry: pluginRegistry,
config: config,
// If no namespace is specified, all namespaces are watched.
// This is desirable for VolumeSnapshots, as we want to query for all VolumeSnapshots across all namespaces using this informer
snapshotterSharedInformerFactory: snapshotvebeta1informers.NewSharedInformerFactoryWithOptions(csiSnapClient, 0),
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
logLevel: logger.Level,
pluginRegistry: pluginRegistry,
config: config,
}
return s, nil
@@ -588,6 +604,27 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
)
cmd.CheckError(err)
// Make empty listers that will only be populated if CSI is properly enabled.
var vsLister snapshotv1beta1listers.VolumeSnapshotLister
var vscLister snapshotv1beta1listers.VolumeSnapshotContentLister
// We don't care about the returned types, just whether or not there's a 404 error.
_, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1beta1api.SchemeGroupVersion.String())
switch {
case apierrors.IsNotFound(err) && !features.IsEnabled("EnableCSI"):
// Normal operating mode - CSI isn't enabled, so don't actually error
case apierrors.IsNotFound(err) && features.IsEnabled("EnableCSI"):
// CSI is enabled, but the required CRDs aren't installed, so halt.
s.logger.Fatalf("The 'EnableCSI' feature flag was specified, but CSI API group [%s] was not found.", snapshotv1beta1api.SchemeGroupVersion.String())
case err == nil && features.IsEnabled("EnableCSI"):
// CSI is enabled, and the resources were found.
// Instantiate the listers fully
vsLister = s.snapshotterSharedInformerFactory.Snapshot().V1beta1().VolumeSnapshots().Lister()
vscLister = s.snapshotterSharedInformerFactory.Snapshot().V1beta1().VolumeSnapshotContents().Lister()
case err != nil:
cmd.CheckError(err)
}
backupController := controller.NewBackupController(
s.sharedInformerFactory.Velero().V1().Backups(),
s.veleroClient.VeleroV1(),
@@ -605,6 +642,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
vsLister,
vscLister,
)
return controllerRunInfo{
@@ -793,10 +832,17 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
// start the informers & and wait for the caches to sync
s.sharedInformerFactory.Start(ctx.Done())
s.snapshotterSharedInformerFactory.Start(ctx.Done())
s.logger.Info("Waiting for informer caches to sync")
cacheSyncResults := s.sharedInformerFactory.WaitForCacheSync(ctx.Done())
csiCacheSyncResults := s.snapshotterSharedInformerFactory.WaitForCacheSync(ctx.Done())
s.logger.Info("Done waiting for informer caches to sync")
// Append our CSI informer types into the larger list of caches, so we can check them all at once
for informer, synced := range csiCacheSyncResults {
cacheSyncResults[informer] = synced
}
for informer, synced := range cacheSyncResults {
if !synced {
return errors.Errorf("cache was not synced for informer %v", informer)
@@ -837,18 +883,3 @@ func (s *server) runProfiler() {
s.logger.WithError(errors.WithStack(err)).Error("error running profiler http server")
}
}
// Helper methods that only return clients if the user has turned on CSI.
func volumeSnapshotClientIfCSIEnabled(i snapshotter.Interface) snapshotv1beta1.VolumeSnapshotInterface {
if features.IsEnabled("EnableCSI") {
return i.SnapshotV1beta1().VolumeSnapshots("") // is this correct? we need the namespace? Could we get this lazily later, via the backup?
}
return nil
}
func voluemSnapshotContentClientIfCSIEnabled(i snapshotter.Interface) snapshotv1beta1.VolumeSnapshotContentInterface {
if features.IsEnabled("EnableCSI") {
return i.SnapshotV1beta1().VolumeSnapshotContents()
}
return nil
}

View File

@@ -38,7 +38,9 @@ import (
"k8s.io/client-go/tools/cache"
snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/typed/volumesnapshot/v1beta1"
snapshotv1beta1listers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1"
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
"github.com/vmware-tanzu/velero/pkg/discovery"
@@ -59,23 +61,25 @@ import (
type backupController struct {
*genericController
discoveryHelper discovery.Helper
backupper pkgbackup.Backupper
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
csiClient snapshotv1beta1.SnapshotV1beta1Interface
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
backupLocationLister velerov1listers.BackupStorageLocationLister
defaultBackupLocation string
defaultBackupTTL time.Duration
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
newBackupStore func(*velerov1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
formatFlag logging.Format
discoveryHelper discovery.Helper
backupper pkgbackup.Backupper
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
csiClient snapshotv1beta1.SnapshotV1beta1Interface
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
backupLocationLister velerov1listers.BackupStorageLocationLister
defaultBackupLocation string
defaultBackupTTL time.Duration
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
newBackupStore func(*velerov1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
formatFlag logging.Format
volumeSnapshotLister snapshotv1beta1listers.VolumeSnapshotLister
volumeSnapshotContentLister snapshotv1beta1listers.VolumeSnapshotContentLister
}
// TODO(nrb-csi): Add clients for the VS/VSContent here.
@@ -97,27 +101,30 @@ func NewBackupController(
defaultSnapshotLocations map[string]string,
metrics *metrics.ServerMetrics,
formatFlag logging.Format,
volumeSnapshotLister snapshotv1beta1listers.VolumeSnapshotLister,
volumeSnapshotContentLister snapshotv1beta1listers.VolumeSnapshotContentLister,
) Interface {
c := &backupController{
genericController: newGenericController("backup", logger),
discoveryHelper: discoveryHelper,
backupper: backupper,
lister: backupInformer.Lister(),
client: client,
csiClient: csiClient,
clock: &clock.RealClock{},
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
backupLocationLister: backupLocationLister,
defaultBackupLocation: defaultBackupLocation,
defaultBackupTTL: defaultBackupTTL,
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
newBackupStore: persistence.NewObjectBackupStore,
genericController: newGenericController("backup", logger),
discoveryHelper: discoveryHelper,
backupper: backupper,
lister: backupInformer.Lister(),
client: client,
csiClient: csiClient,
clock: &clock.RealClock{},
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
backupLocationLister: backupLocationLister,
defaultBackupLocation: defaultBackupLocation,
defaultBackupTTL: defaultBackupTTL,
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotContentLister: volumeSnapshotContentLister,
newBackupStore: persistence.NewObjectBackupStore,
}
c.syncHandler = c.processBackup
@@ -576,17 +583,16 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
backup.Status.Phase = velerov1api.BackupPhaseCompleted
}
// TODO(nrb-csi): Fetch VS(C)s here, based on includes/exludes?
// TODO(nrb-csi): Only run listers methods if the listers aren't nil, just in case
if features.IsEnabled("EnableCSI") {
selector := metav1.LabelSelector{
MatchLabels: map[string]string{
"velero.io/backup-name": backup.Name,
},
}
// Need to iterate through included namespaces to get all VSs made by the backup
c.csiClient.VolumeSnapshots("").List(metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(&selector)})
selector := labels.SelectorFromSet(map[string]string{v1.BackupNameLabel: backup.Name})
// TODO: (nrb-csi): assign return values, handle errors
c.volumeSnapshotLister.List(selector)
c.volumeSnapshotContentLister.List(selector)
}
// TODO(nrb-csi): pass VS(C) in to persistBackup here
if errs := persistBackup(backup, backupFile, logFile, backupStore, c.logger); len(errs) > 0 {
fatalErrs = append(fatalErrs, errs...)
}