wait for informer caches to sync before running controllers

Signed-off-by: Steve Kriss <krisss@vmware.com>
This commit is contained in:
Steve Kriss
2020-02-25 13:01:24 -07:00
parent 36e76518da
commit c7f283c7fa
16 changed files with 170 additions and 198 deletions

View File

@@ -550,9 +550,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().PodVolumeBackups(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.config.backupSyncPeriod,
s.namespace,
s.config.defaultBackupLocation,
@@ -586,10 +585,10 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logLevel,
newPluginManager,
backupTracker,
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.config.defaultBackupLocation,
s.config.defaultBackupTTL,
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
@@ -621,9 +620,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
gcController := controller.NewGCController(
s.logger,
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests(),
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests().Lister(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
)
return controllerRunInfo{
@@ -638,13 +637,13 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests(),
s.veleroClient.VeleroV1(), // deleteBackupRequestClient
s.veleroClient.VeleroV1(), // backupClient
s.sharedInformerFactory.Velero().V1().Restores(),
s.sharedInformerFactory.Velero().V1().Restores().Lister(),
s.veleroClient.VeleroV1(), // restoreClient
backupTracker,
s.resticManager,
s.sharedInformerFactory.Velero().V1().PodVolumeBackups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().PodVolumeBackups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
newPluginManager,
s.metrics,
)
@@ -656,7 +655,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
restoreControllerRunInfo := func() controllerRunInfo {
restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
@@ -675,9 +673,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
restorer,
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
s.logger,
s.logLevel,
newPluginManager,
@@ -697,7 +695,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger,
s.sharedInformerFactory.Velero().V1().ResticRepositories(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.resticManager,
s.config.defaultResticMaintenanceFrequency,
)
@@ -712,9 +710,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
downloadRequestController := controller.NewDownloadRequestController(
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().DownloadRequests(),
s.sharedInformerFactory.Velero().V1().Restores(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().Restores().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
newPluginManager,
s.logger,
)
@@ -771,8 +769,32 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}
// Instantiate the enabled controllers. This needs to be done *before*
// the shared informer factory is started, because the controller
// constructors add event handlers to various informers, which should
// be done before the informers are running.
controllers := make([]controllerRunInfo, 0, len(enabledControllers))
for _, newController := range enabledControllers {
controllerRunInfo := newController()
controllers = append(controllers, newController())
}
// start the informers & and wait for the caches to sync
s.sharedInformerFactory.Start(ctx.Done())
s.logger.Info("Waiting for informer caches to sync")
cacheSyncResults := s.sharedInformerFactory.WaitForCacheSync(ctx.Done())
s.logger.Info("Done waiting for informer caches to sync")
for informer, synced := range cacheSyncResults {
if !synced {
return errors.Errorf("cache was not synced for informer %v", informer)
}
s.logger.WithField("informer", informer).Info("Informer cache synced")
}
// now that the informer caches have all synced, we can start running the controllers
for i := range controllers {
controllerRunInfo := controllers[i]
wg.Add(1)
go func() {
controllerRunInfo.controller.Run(ctx, controllerRunInfo.numWorkers)
@@ -780,9 +802,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}()
}
// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())
s.logger.Info("Server started successfully")
<-ctx.Done()