diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 966dd6911..0b3519457 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -150,35 +150,35 @@ func NewRestoreController( // Run is a blocking function that runs the specified number of worker goroutines // to process items in the work queue. It will return when it receives on the // ctx.Done() channel. -func (controller *restoreController) Run(ctx context.Context, numWorkers int) error { +func (c *restoreController) Run(ctx context.Context, numWorkers int) error { var wg sync.WaitGroup defer func() { - controller.logger.Info("Waiting for workers to finish their work") + c.logger.Info("Waiting for workers to finish their work") - controller.queue.ShutDown() + c.queue.ShutDown() // We have to wait here in the deferred function instead of at the bottom of the function body // because we have to shut down the queue in order for the workers to shut down gracefully, and // we want to shut down the queue via defer and not at the end of the body. wg.Wait() - controller.logger.Info("All workers have finished") + c.logger.Info("All workers have finished") }() - controller.logger.Info("Starting RestoreController") - defer controller.logger.Info("Shutting down RestoreController") + c.logger.Info("Starting RestoreController") + defer c.logger.Info("Shutting down RestoreController") - controller.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), controller.backupListerSynced, controller.restoreListerSynced) { + c.logger.Info("Waiting for caches to sync") + if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced) { return errors.New("timed out waiting for caches to sync") } - controller.logger.Info("Caches are synced") + c.logger.Info("Caches are synced") wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { go func() { - wait.Until(controller.runWorker, time.Second, ctx.Done()) + wait.Until(c.runWorker, time.Second, ctx.Done()) wg.Done() }() } @@ -188,40 +188,40 @@ func (controller *restoreController) Run(ctx context.Context, numWorkers int) er return nil } -func (controller *restoreController) runWorker() { +func (c *restoreController) runWorker() { // continually take items off the queue (waits if it's // empty) until we get a shutdown signal from the queue - for controller.processNextWorkItem() { + for c.processNextWorkItem() { } } -func (controller *restoreController) processNextWorkItem() bool { - key, quit := controller.queue.Get() +func (c *restoreController) processNextWorkItem() bool { + key, quit := c.queue.Get() if quit { return false } // always call done on this item, since if it fails we'll add // it back with rate-limiting below - defer controller.queue.Done(key) + defer c.queue.Done(key) - err := controller.syncHandler(key.(string)) + err := c.syncHandler(key.(string)) if err == nil { // If you had no error, tell the queue to stop tracking history for your key. This will reset // things like failure counts for per-item rate limiting. - controller.queue.Forget(key) + c.queue.Forget(key) return true } - controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") + c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") // we had an error processing the item so add it back // into the queue for re-processing with rate-limiting - controller.queue.AddRateLimited(key) + c.queue.AddRateLimited(key) return true } -func (controller *restoreController) processRestore(key string) error { - logContext := controller.logger.WithField("key", key) +func (c *restoreController) processRestore(key string) error { + logContext := c.logger.WithField("key", key) logContext.Debug("Running processRestore") ns, name, err := cache.SplitMetaNamespaceKey(key) @@ -230,7 +230,7 @@ func (controller *restoreController) processRestore(key string) error { } logContext.Debug("Getting Restore") - restore, err := controller.restoreLister.Restores(ns).Get(name) + restore, err := c.restoreLister.Restores(ns).Get(name) if err != nil { return errors.Wrap(err, "error getting Restore") } @@ -256,14 +256,14 @@ func (controller *restoreController) processRestore(key string) error { restore = restore.DeepCopy() // complete & validate restore - if restore.Status.ValidationErrors = controller.completeAndValidate(restore); len(restore.Status.ValidationErrors) > 0 { + if restore.Status.ValidationErrors = c.completeAndValidate(restore); len(restore.Status.ValidationErrors) > 0 { restore.Status.Phase = api.RestorePhaseFailedValidation } else { restore.Status.Phase = api.RestorePhaseInProgress } // update status - updatedRestore, err := patchRestore(original, restore, controller.restoreClient) + updatedRestore, err := patchRestore(original, restore, c.restoreClient) if err != nil { return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase) } @@ -277,7 +277,7 @@ func (controller *restoreController) processRestore(key string) error { logContext.Debug("Running restore") // execution & upload of restore - restoreWarnings, restoreErrors := controller.runRestore(restore, controller.bucket) + restoreWarnings, restoreErrors := c.runRestore(restore, c.bucket) restore.Status.Warnings = len(restoreWarnings.Ark) + len(restoreWarnings.Cluster) for _, w := range restoreWarnings.Namespaces { @@ -293,14 +293,14 @@ func (controller *restoreController) processRestore(key string) error { restore.Status.Phase = api.RestorePhaseCompleted logContext.Debug("Updating Restore final status") - if _, err = patchRestore(original, restore, controller.restoreClient); err != nil { + if _, err = patchRestore(original, restore, c.restoreClient); err != nil { logContext.WithError(errors.WithStack(err)).Info("Error updating Restore final status") } return nil } -func (controller *restoreController) completeAndValidate(restore *api.Restore) []string { +func (c *restoreController) completeAndValidate(restore *api.Restore) []string { // add non-restorable resources to restore's excluded resources excludedResources := sets.NewString(restore.Spec.ExcludedResources...) for _, nonrestorable := range nonRestorableResources { @@ -330,7 +330,7 @@ func (controller *restoreController) completeAndValidate(restore *api.Restore) [ } // validate that PV provider exists if we're restoring PVs - if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !controller.pvProviderExists { + if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !c.pvProviderExists { validationErrors = append(validationErrors, "Server is not configured for PV snapshot restores") } @@ -346,7 +346,7 @@ func (controller *restoreController) completeAndValidate(restore *api.Restore) [ "ark-schedule": restore.Spec.ScheduleName, })) - backups, err := controller.backupLister.Backups(controller.namespace).List(selector) + backups, err := c.backupLister.Backups(c.namespace).List(selector) if err != nil { return append(validationErrors, "Unable to list backups for schedule") } @@ -362,7 +362,7 @@ func (controller *restoreController) completeAndValidate(restore *api.Restore) [ } // validate that we can fetch the source backup - if _, err := controller.fetchBackup(controller.bucket, restore.Spec.BackupName); err != nil { + if _, err := c.fetchBackup(c.bucket, restore.Spec.BackupName); err != nil { return append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err)) } @@ -408,8 +408,8 @@ func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup { return nil } -func (controller *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) { - backup, err := controller.backupLister.Backups(controller.namespace).Get(name) +func (c *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) { + backup, err := c.backupLister.Backups(c.namespace).Get(name) if err == nil { return backup, nil } @@ -418,10 +418,10 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back return nil, errors.WithStack(err) } - logContext := controller.logger.WithField("backupName", name) + logContext := c.logger.WithField("backupName", name) logContext.Debug("Backup not found in backupLister, checking object storage directly") - backup, err = controller.backupService.GetBackup(bucket, name) + backup, err = c.backupService.GetBackup(bucket, name) if err != nil { return nil, err } @@ -431,7 +431,7 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back // Clear out the namespace too, just in case backup.Namespace = "" - created, createErr := controller.backupClient.Backups(controller.namespace).Create(backup) + created, createErr := c.backupClient.Backups(c.namespace).Create(backup) if createErr != nil { logContext.WithError(errors.WithStack(createErr)).Error("Unable to create API object for Backup") } else { @@ -441,14 +441,14 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back return backup, nil } -func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) { - logContext := controller.logger.WithFields( +func (c *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) { + logContext := c.logger.WithFields( logrus.Fields{ "restore": kubeutil.NamespaceAndName(restore), "backup": restore.Spec.BackupName, }) - backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName) + backup, err := c.fetchBackup(bucket, restore.Spec.BackupName) if err != nil { logContext.WithError(err).Error("Error getting backup") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) @@ -457,7 +457,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str var tempFiles []*os.File - backupFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger) + backupFile, err := downloadToTempFile(restore.Spec.BackupName, c.backupService, bucket, c.logger) if err != nil { logContext.WithError(err).Error("Error downloading backup") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) @@ -493,15 +493,15 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str } }() - actions, err := controller.pluginManager.GetRestoreItemActions(restore.Name) + actions, err := c.pluginManager.GetRestoreItemActions(restore.Name) if err != nil { restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) return } - defer controller.pluginManager.CloseRestoreItemActions(restore.Name) + defer c.pluginManager.CloseRestoreItemActions(restore.Name) logContext.Info("starting restore") - restoreWarnings, restoreErrors = controller.restorer.Restore(restore, backup, backupFile, logFile, actions) + restoreWarnings, restoreErrors = c.restorer.Restore(restore, backup, backupFile, logFile, actions) logContext.Info("restore completed") // Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors. @@ -512,7 +512,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str return } - if err := controller.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil { + if err := c.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil { restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err)) } @@ -533,7 +533,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") return } - if err := controller.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil { + if err := c.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil { logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage") }