From 3116185e5b9f90673e03ea889b51fae90ff34b9f Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Tue, 9 Apr 2019 15:17:28 -0600 Subject: [PATCH] instantiate plugin manager with per-restore logger so plugin logs are captured Signed-off-by: Steve Kriss --- changelogs/unreleased/1358-skriss | 1 + pkg/controller/restore_controller.go | 283 +++++++++++----------- pkg/controller/restore_controller_test.go | 9 +- 3 files changed, 150 insertions(+), 143 deletions(-) create mode 100644 changelogs/unreleased/1358-skriss diff --git a/changelogs/unreleased/1358-skriss b/changelogs/unreleased/1358-skriss new file mode 100644 index 000000000..25e2ad113 --- /dev/null +++ b/changelogs/unreleased/1358-skriss @@ -0,0 +1 @@ +instantiate the plugin manager with the per-restore logger so plugin logs are captured in the per-restore log diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index b2dd0dedd..a455864d3 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "bytes" "compress/gzip" "encoding/json" "fmt" @@ -41,7 +42,6 @@ import ( "github.com/heptio/velero/pkg/metrics" "github.com/heptio/velero/pkg/persistence" "github.com/heptio/velero/pkg/plugin/clientmgmt" - "github.com/heptio/velero/pkg/plugin/velero" "github.com/heptio/velero/pkg/restore" "github.com/heptio/velero/pkg/util/collections" kubeutil "github.com/heptio/velero/pkg/util/kube" @@ -91,10 +91,6 @@ type restoreController struct { newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } -type restoreResult struct { - warnings, errors api.RestoreResult -} - func NewRestoreController( namespace string, restoreInformer informers.RestoreInformer, @@ -130,7 +126,7 @@ func NewRestoreController( newBackupStore: persistence.NewObjectBackupStore, } - c.syncHandler = c.processRestore + c.syncHandler = c.processQueueItem c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced, restoreInformer.Informer().HasSynced, @@ -167,13 +163,13 @@ func NewRestoreController( return c } -func (c *restoreController) processRestore(key string) error { +func (c *restoreController) processQueueItem(key string) error { log := c.logger.WithField("key", key) - log.Debug("Running processRestore") + log.Debug("Running processQueueItem") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - log.WithError(err).Error("unable to process restore: error splitting queue key") + log.WithError(err).Error("unable to process queue item: error splitting queue key") // Return nil here so we don't try to process the key any more return nil } @@ -198,24 +194,30 @@ func (c *restoreController) processRestore(key string) error { return nil } - log.Debug("Cloning Restore") - // store ref to original for creating patch - original := restore - // don't modify items in the cache - restore = restore.DeepCopy() + // Deep-copy the restore so the copy from the lister is not modified. + // Any errors returned by processRestore will be bubbled up, meaning + // the key will be re-enqueued by the controller. + return c.processRestore(restore.DeepCopy()) +} - pluginManager := c.newPluginManager(log) - defer pluginManager.CleanupClients() +func (c *restoreController) processRestore(restore *api.Restore) error { + // Developer note: any error returned by this method will + // cause the restore to be re-enqueued and re-processed by + // the controller. - actions, err := pluginManager.GetRestoreItemActions() - if err != nil { - return errors.Wrap(err, "error initializing restore item actions") - } + // store a copy of the original restore for creating patch + original := restore.DeepCopy() - // validate the restore and fetch the backup + // Validate the restore and fetch the backup. Note that the plugin + // manager used here is not the same one used by c.runValidatedRestore, + // since within that function we want the plugin manager to log to + // our per-restore log (which is instantiated within c.runValidatedRestore). + pluginManager := c.newPluginManager(c.logger) info := c.validateAndComplete(restore, pluginManager) - backupScheduleName := restore.Spec.ScheduleName + pluginManager.CleanupClients() + // Register attempts after validation so we don't have to fetch the backup multiple times + backupScheduleName := restore.Spec.ScheduleName c.metrics.RegisterRestoreAttempt(backupScheduleName) if len(restore.Status.ValidationErrors) > 0 { @@ -228,6 +230,8 @@ func (c *restoreController) processRestore(key string) error { // patch to update status and persist to API updatedRestore, err := patchRestore(original, restore, c.restoreClient) if err != nil { + // return the error so the restore can be re-processed; it's currently + // still in phase = New. return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase) } // store ref to just-updated item for creating patch @@ -238,43 +242,20 @@ func (c *restoreController) processRestore(key string) error { return nil } - log.Debug("Running restore") - - // execution & upload of restore - restoreRes, restoreFailure := c.runRestore( - restore, - actions, - info, - pluginManager, - ) - - //TODO(1.0): Remove warnings.Ark - restore.Status.Warnings = len(restoreRes.warnings.Velero) + len(restoreRes.warnings.Cluster) + len(restoreRes.warnings.Ark) - for _, w := range restoreRes.warnings.Namespaces { - restore.Status.Warnings += len(w) - } - - //TODO (1.0): Remove errors.Ark - restore.Status.Errors = len(restoreRes.errors.Velero) + len(restoreRes.errors.Cluster) + len(restoreRes.errors.Ark) - for _, e := range restoreRes.errors.Namespaces { - restore.Status.Errors += len(e) - } - - if restoreFailure != nil { - log.Debug("restore failed") + if err := c.runValidatedRestore(restore, info); err != nil { + c.logger.WithError(err).Debug("Restore failed") restore.Status.Phase = api.RestorePhaseFailed - restore.Status.FailureReason = restoreFailure.Error() + restore.Status.FailureReason = err.Error() c.metrics.RegisterRestoreFailed(backupScheduleName) } else { - log.Debug("restore completed") - // We got through the restore process without failing validation or restore execution + c.logger.Debug("Restore completed") restore.Status.Phase = api.RestorePhaseCompleted c.metrics.RegisterRestoreSuccess(backupScheduleName) } - log.Debug("Updating Restore final status") + c.logger.Debug("Updating restore's final status") if _, err = patchRestore(original, restore, c.restoreClient); err != nil { - log.WithError(errors.WithStack(err)).Info("Error updating Restore final status") + c.logger.WithError(errors.WithStack(err)).Info("Error updating restore's final status") } return nil @@ -431,88 +412,63 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager cli }, nil } -func (c *restoreController) runRestore( - restore *api.Restore, - actions []velero.RestoreItemAction, - info backupInfo, - pluginManager clientmgmt.Manager, -) (restoreResult, error) { - var restoreWarnings, restoreErrors api.RestoreResult - var restoreFailure error - logFile, err := ioutil.TempFile("", "") +// runValidatedRestore takes a validated restore API object and executes the restore process. +// The log and results files are uploaded to backup storage. Any error returned from this function +// means that the restore failed. This function updates the restore API object with warning and error +// counts, but *does not* update its phase or patch it via the API. +func (c *restoreController) runValidatedRestore(restore *api.Restore, info backupInfo) error { + // instantiate the per-restore logger that will output both to a temp file + // (for upload to object storage) and to stdout. + restoreLog, err := newRestoreLogger(restore, c.logger, c.restoreLogLevel) if err != nil { - c.logger. - WithFields( - logrus.Fields{ - "restore": kubeutil.NamespaceAndName(restore), - "backup": restore.Spec.BackupName, - }, - ). - WithError(errors.WithStack(err)). - Error("Error creating log temp file") - restoreErrors.Velero = append(restoreErrors.Velero, err.Error()) - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure + return err } - gzippedLogFile := gzip.NewWriter(logFile) - // Assuming we successfully uploaded the log file, this will have already been closed below. It is safe to call - // close multiple times. If we get an error closing this, there's not really anything we can do about it. - defer gzippedLogFile.Close() - defer closeAndRemoveFile(logFile, c.logger) + defer restoreLog.closeAndRemove(c.logger) - // Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the - // backup log failed for whatever reason. - logger := logging.DefaultLogger(c.restoreLogLevel) - logger.Out = io.MultiWriter(os.Stdout, gzippedLogFile) - log := logger.WithFields( - logrus.Fields{ - "restore": kubeutil.NamespaceAndName(restore), - "backup": restore.Spec.BackupName, - }) + pluginManager := c.newPluginManager(restoreLog) + defer pluginManager.CleanupClients() - backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, c.logger) + actions, err := pluginManager.GetRestoreItemActions() if err != nil { - log.WithError(err).Error("Error downloading backup") - restoreErrors.Velero = append(restoreErrors.Velero, err.Error()) - restoreFailure = err - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure + return errors.Wrap(err, "error getting restore item actions") + } + + backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, restoreLog) + if err != nil { + return errors.Wrap(err, "error downloading backup") } defer closeAndRemoveFile(backupFile, c.logger) - resultsFile, err := ioutil.TempFile("", "") - if err != nil { - log.WithError(errors.WithStack(err)).Error("Error creating results temp file") - restoreErrors.Velero = append(restoreErrors.Velero, err.Error()) - restoreFailure = err - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure - } - defer closeAndRemoveFile(resultsFile, c.logger) - volumeSnapshots, err := info.backupStore.GetBackupVolumeSnapshots(restore.Spec.BackupName) if err != nil { - log.WithError(errors.WithStack(err)).Error("Error fetching volume snapshots") - restoreErrors.Velero = append(restoreErrors.Velero, err.Error()) - restoreFailure = err - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure + return errors.Wrap(err, "error fetching volume snapshots metadata") } - // Any return statement above this line means a total restore failure - // Some failures after this line *may* be a total restore failure - log.Info("starting restore") - restoreWarnings, restoreErrors = c.restorer.Restore(log, restore, info.backup, volumeSnapshots, backupFile, actions, c.snapshotLocationLister, pluginManager) - log.Info("restore completed") + restoreLog.Info("starting restore") + restoreWarnings, restoreErrors := c.restorer.Restore(restoreLog, restore, info.backup, volumeSnapshots, backupFile, actions, c.snapshotLocationLister, pluginManager) + restoreLog.Info("restore completed") - // Try to upload the log file. This is best-effort. If we fail, we'll add to the velero errors. - if err := gzippedLogFile.Close(); err != nil { - c.logger.WithError(err).Error("error closing gzippedLogFile") - } - // Reset the offset to 0 for reading - if _, err = logFile.Seek(0, 0); err != nil { - restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error resetting log file offset to 0: %v", err)) - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure + if logReader, err := restoreLog.done(c.logger); err != nil { + restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error getting restore log reader: %v", err)) + } else { + if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil { + restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error uploading log file to backup storage: %v", err)) + } } - if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logFile); err != nil { - restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to backup storage: %v", err)) + // At this point, no further logs should be written to restoreLog since it's been uploaded + // to object storage. + + //TODO(1.0): Remove warnings.Ark + restore.Status.Warnings = len(restoreWarnings.Velero) + len(restoreWarnings.Cluster) + len(restoreWarnings.Ark) + for _, w := range restoreWarnings.Namespaces { + restore.Status.Warnings += len(w) + } + + //TODO (1.0): Remove errors.Ark + restore.Status.Errors = len(restoreErrors.Velero) + len(restoreErrors.Cluster) + len(restoreErrors.Ark) + for _, e := range restoreErrors.Namespaces { + restore.Status.Errors += len(e) } m := map[string]api.RestoreResult{ @@ -520,30 +476,34 @@ func (c *restoreController) runRestore( "errors": restoreErrors, } - gzippedResultsFile := gzip.NewWriter(resultsFile) - - if err := json.NewEncoder(gzippedResultsFile).Encode(m); err != nil { - log.WithError(errors.WithStack(err)).Error("Error encoding restore results") - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure - } - gzippedResultsFile.Close() - - if _, err = resultsFile.Seek(0, 0); err != nil { - log.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure - } - if err := info.backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, resultsFile); err != nil { - log.WithError(errors.WithStack(err)).Error("Error uploading results file to backup storage") + if err := putResults(restore, m, info.backupStore, c.logger); err != nil { + c.logger.WithError(err).Error("Error uploading restore results to backup storage") } - return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure + return nil } -func downloadToTempFile( - backupName string, - backupStore persistence.BackupStore, - logger logrus.FieldLogger, -) (*os.File, error) { +func putResults(restore *api.Restore, results map[string]api.RestoreResult, backupStore persistence.BackupStore, log logrus.FieldLogger) error { + buf := new(bytes.Buffer) + gzw := gzip.NewWriter(buf) + defer gzw.Close() + + if err := json.NewEncoder(gzw).Encode(results); err != nil { + return errors.Wrap(err, "error encoding restore results to JSON") + } + + if err := gzw.Close(); err != nil { + return errors.Wrap(err, "error closing gzip writer") + } + + if err := backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, buf); err != nil { + return err + } + + return nil +} + +func downloadToTempFile(backupName string, backupStore persistence.BackupStore, logger logrus.FieldLogger) (*os.File, error) { readCloser, err := backupStore.GetBackupContents(backupName) if err != nil { return nil, err @@ -597,3 +557,50 @@ func patchRestore(original, updated *api.Restore, client velerov1client.Restores return res, nil } + +type restoreLogger struct { + logrus.FieldLogger + file *os.File + w *gzip.Writer +} + +func newRestoreLogger(restore *api.Restore, baseLogger logrus.FieldLogger, logLevel logrus.Level) (*restoreLogger, error) { + file, err := ioutil.TempFile("", "") + if err != nil { + return nil, errors.Wrap(err, "error creating temp file") + } + w := gzip.NewWriter(file) + + logger := logging.DefaultLogger(logLevel) + logger.Out = io.MultiWriter(os.Stdout, w) + + return &restoreLogger{ + FieldLogger: logger.WithField("restore", kubeutil.NamespaceAndName(restore)), + file: file, + w: w, + }, nil +} + +// done stops the restoreLogger from being able to be written to, and returns +// an io.Reader for getting the content of the logger. Any attempts to use +// restoreLogger to log after calling done will panic. +func (l *restoreLogger) done(log logrus.FieldLogger) (io.Reader, error) { + l.FieldLogger = nil + + if err := l.w.Close(); err != nil { + log.WithError(errors.WithStack(err)).Error("error closing gzip writer") + } + + if _, err := l.file.Seek(0, 0); err != nil { + return nil, errors.Wrap(err, "error resetting log file offset to 0") + } + + return l.file, nil +} + +// closeAndRemove removes the logger's underlying temporary storage. This +// method should be called when all logging and reading from the logger is +// complete. +func (l *restoreLogger) closeAndRemove(log logrus.FieldLogger) { + closeAndRemoveFile(l.file, log) +} diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index 00a8e1a30..82a33f0ef 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -150,7 +150,7 @@ func TestFetchBackupInfo(t *testing.T) { } } -func TestProcessRestoreSkips(t *testing.T) { +func TestProcessQueueItemSkips(t *testing.T) { tests := []struct { name string restoreKey string @@ -212,14 +212,14 @@ func TestProcessRestoreSkips(t *testing.T) { sharedInformers.Velero().V1().Restores().Informer().GetStore().Add(test.restore) } - err := c.processRestore(test.restoreKey) + err := c.processQueueItem(test.restoreKey) assert.Equal(t, test.expectError, err != nil) }) } } -func TestProcessRestore(t *testing.T) { +func TestProcessQueueItem(t *testing.T) { tests := []struct { name string restoreKey string @@ -524,7 +524,7 @@ func TestProcessRestore(t *testing.T) { pluginManager.On("CleanupClients") } - err = c.processRestore(key) + err = c.processQueueItem(key) assert.Equal(t, test.expectedErr, err != nil, "got error %v", err) actions := client.Actions() @@ -716,7 +716,6 @@ func TestValidateAndComplete(t *testing.T) { assert.Equal(t, tc.expectedErrs, tc.restore.Status.ValidationErrors) }) } - } func TestvalidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {