mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-03 03:35:22 +00:00
instantiate plugin manager with per-restore logger so plugin logs are captured
Signed-off-by: Steve Kriss <krisss@vmware.com>
This commit is contained in:
1
changelogs/unreleased/1358-skriss
Normal file
1
changelogs/unreleased/1358-skriss
Normal file
@@ -0,0 +1 @@
|
||||
instantiate the plugin manager with the per-restore logger so plugin logs are captured in the per-restore log
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package controller
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -41,7 +42,6 @@ import (
|
||||
"github.com/heptio/velero/pkg/metrics"
|
||||
"github.com/heptio/velero/pkg/persistence"
|
||||
"github.com/heptio/velero/pkg/plugin/clientmgmt"
|
||||
"github.com/heptio/velero/pkg/plugin/velero"
|
||||
"github.com/heptio/velero/pkg/restore"
|
||||
"github.com/heptio/velero/pkg/util/collections"
|
||||
kubeutil "github.com/heptio/velero/pkg/util/kube"
|
||||
@@ -91,10 +91,6 @@ type restoreController struct {
|
||||
newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
|
||||
}
|
||||
|
||||
type restoreResult struct {
|
||||
warnings, errors api.RestoreResult
|
||||
}
|
||||
|
||||
func NewRestoreController(
|
||||
namespace string,
|
||||
restoreInformer informers.RestoreInformer,
|
||||
@@ -130,7 +126,7 @@ func NewRestoreController(
|
||||
newBackupStore: persistence.NewObjectBackupStore,
|
||||
}
|
||||
|
||||
c.syncHandler = c.processRestore
|
||||
c.syncHandler = c.processQueueItem
|
||||
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
|
||||
backupInformer.Informer().HasSynced,
|
||||
restoreInformer.Informer().HasSynced,
|
||||
@@ -167,13 +163,13 @@ func NewRestoreController(
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *restoreController) processRestore(key string) error {
|
||||
func (c *restoreController) processQueueItem(key string) error {
|
||||
log := c.logger.WithField("key", key)
|
||||
|
||||
log.Debug("Running processRestore")
|
||||
log.Debug("Running processQueueItem")
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("unable to process restore: error splitting queue key")
|
||||
log.WithError(err).Error("unable to process queue item: error splitting queue key")
|
||||
// Return nil here so we don't try to process the key any more
|
||||
return nil
|
||||
}
|
||||
@@ -198,24 +194,30 @@ func (c *restoreController) processRestore(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("Cloning Restore")
|
||||
// store ref to original for creating patch
|
||||
original := restore
|
||||
// don't modify items in the cache
|
||||
restore = restore.DeepCopy()
|
||||
// Deep-copy the restore so the copy from the lister is not modified.
|
||||
// Any errors returned by processRestore will be bubbled up, meaning
|
||||
// the key will be re-enqueued by the controller.
|
||||
return c.processRestore(restore.DeepCopy())
|
||||
}
|
||||
|
||||
pluginManager := c.newPluginManager(log)
|
||||
defer pluginManager.CleanupClients()
|
||||
func (c *restoreController) processRestore(restore *api.Restore) error {
|
||||
// Developer note: any error returned by this method will
|
||||
// cause the restore to be re-enqueued and re-processed by
|
||||
// the controller.
|
||||
|
||||
actions, err := pluginManager.GetRestoreItemActions()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error initializing restore item actions")
|
||||
}
|
||||
// store a copy of the original restore for creating patch
|
||||
original := restore.DeepCopy()
|
||||
|
||||
// validate the restore and fetch the backup
|
||||
// Validate the restore and fetch the backup. Note that the plugin
|
||||
// manager used here is not the same one used by c.runValidatedRestore,
|
||||
// since within that function we want the plugin manager to log to
|
||||
// our per-restore log (which is instantiated within c.runValidatedRestore).
|
||||
pluginManager := c.newPluginManager(c.logger)
|
||||
info := c.validateAndComplete(restore, pluginManager)
|
||||
backupScheduleName := restore.Spec.ScheduleName
|
||||
pluginManager.CleanupClients()
|
||||
|
||||
// Register attempts after validation so we don't have to fetch the backup multiple times
|
||||
backupScheduleName := restore.Spec.ScheduleName
|
||||
c.metrics.RegisterRestoreAttempt(backupScheduleName)
|
||||
|
||||
if len(restore.Status.ValidationErrors) > 0 {
|
||||
@@ -228,6 +230,8 @@ func (c *restoreController) processRestore(key string) error {
|
||||
// patch to update status and persist to API
|
||||
updatedRestore, err := patchRestore(original, restore, c.restoreClient)
|
||||
if err != nil {
|
||||
// return the error so the restore can be re-processed; it's currently
|
||||
// still in phase = New.
|
||||
return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
|
||||
}
|
||||
// store ref to just-updated item for creating patch
|
||||
@@ -238,43 +242,20 @@ func (c *restoreController) processRestore(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("Running restore")
|
||||
|
||||
// execution & upload of restore
|
||||
restoreRes, restoreFailure := c.runRestore(
|
||||
restore,
|
||||
actions,
|
||||
info,
|
||||
pluginManager,
|
||||
)
|
||||
|
||||
//TODO(1.0): Remove warnings.Ark
|
||||
restore.Status.Warnings = len(restoreRes.warnings.Velero) + len(restoreRes.warnings.Cluster) + len(restoreRes.warnings.Ark)
|
||||
for _, w := range restoreRes.warnings.Namespaces {
|
||||
restore.Status.Warnings += len(w)
|
||||
}
|
||||
|
||||
//TODO (1.0): Remove errors.Ark
|
||||
restore.Status.Errors = len(restoreRes.errors.Velero) + len(restoreRes.errors.Cluster) + len(restoreRes.errors.Ark)
|
||||
for _, e := range restoreRes.errors.Namespaces {
|
||||
restore.Status.Errors += len(e)
|
||||
}
|
||||
|
||||
if restoreFailure != nil {
|
||||
log.Debug("restore failed")
|
||||
if err := c.runValidatedRestore(restore, info); err != nil {
|
||||
c.logger.WithError(err).Debug("Restore failed")
|
||||
restore.Status.Phase = api.RestorePhaseFailed
|
||||
restore.Status.FailureReason = restoreFailure.Error()
|
||||
restore.Status.FailureReason = err.Error()
|
||||
c.metrics.RegisterRestoreFailed(backupScheduleName)
|
||||
} else {
|
||||
log.Debug("restore completed")
|
||||
// We got through the restore process without failing validation or restore execution
|
||||
c.logger.Debug("Restore completed")
|
||||
restore.Status.Phase = api.RestorePhaseCompleted
|
||||
c.metrics.RegisterRestoreSuccess(backupScheduleName)
|
||||
}
|
||||
|
||||
log.Debug("Updating Restore final status")
|
||||
c.logger.Debug("Updating restore's final status")
|
||||
if _, err = patchRestore(original, restore, c.restoreClient); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Info("Error updating Restore final status")
|
||||
c.logger.WithError(errors.WithStack(err)).Info("Error updating restore's final status")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -431,88 +412,63 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager cli
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *restoreController) runRestore(
|
||||
restore *api.Restore,
|
||||
actions []velero.RestoreItemAction,
|
||||
info backupInfo,
|
||||
pluginManager clientmgmt.Manager,
|
||||
) (restoreResult, error) {
|
||||
var restoreWarnings, restoreErrors api.RestoreResult
|
||||
var restoreFailure error
|
||||
logFile, err := ioutil.TempFile("", "")
|
||||
// runValidatedRestore takes a validated restore API object and executes the restore process.
|
||||
// The log and results files are uploaded to backup storage. Any error returned from this function
|
||||
// means that the restore failed. This function updates the restore API object with warning and error
|
||||
// counts, but *does not* update its phase or patch it via the API.
|
||||
func (c *restoreController) runValidatedRestore(restore *api.Restore, info backupInfo) error {
|
||||
// instantiate the per-restore logger that will output both to a temp file
|
||||
// (for upload to object storage) and to stdout.
|
||||
restoreLog, err := newRestoreLogger(restore, c.logger, c.restoreLogLevel)
|
||||
if err != nil {
|
||||
c.logger.
|
||||
WithFields(
|
||||
logrus.Fields{
|
||||
"restore": kubeutil.NamespaceAndName(restore),
|
||||
"backup": restore.Spec.BackupName,
|
||||
},
|
||||
).
|
||||
WithError(errors.WithStack(err)).
|
||||
Error("Error creating log temp file")
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, err.Error())
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
return err
|
||||
}
|
||||
gzippedLogFile := gzip.NewWriter(logFile)
|
||||
// Assuming we successfully uploaded the log file, this will have already been closed below. It is safe to call
|
||||
// close multiple times. If we get an error closing this, there's not really anything we can do about it.
|
||||
defer gzippedLogFile.Close()
|
||||
defer closeAndRemoveFile(logFile, c.logger)
|
||||
defer restoreLog.closeAndRemove(c.logger)
|
||||
|
||||
// Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the
|
||||
// backup log failed for whatever reason.
|
||||
logger := logging.DefaultLogger(c.restoreLogLevel)
|
||||
logger.Out = io.MultiWriter(os.Stdout, gzippedLogFile)
|
||||
log := logger.WithFields(
|
||||
logrus.Fields{
|
||||
"restore": kubeutil.NamespaceAndName(restore),
|
||||
"backup": restore.Spec.BackupName,
|
||||
})
|
||||
pluginManager := c.newPluginManager(restoreLog)
|
||||
defer pluginManager.CleanupClients()
|
||||
|
||||
backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, c.logger)
|
||||
actions, err := pluginManager.GetRestoreItemActions()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error downloading backup")
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, err.Error())
|
||||
restoreFailure = err
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
return errors.Wrap(err, "error getting restore item actions")
|
||||
}
|
||||
|
||||
backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, restoreLog)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error downloading backup")
|
||||
}
|
||||
defer closeAndRemoveFile(backupFile, c.logger)
|
||||
|
||||
resultsFile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error creating results temp file")
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, err.Error())
|
||||
restoreFailure = err
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
}
|
||||
defer closeAndRemoveFile(resultsFile, c.logger)
|
||||
|
||||
volumeSnapshots, err := info.backupStore.GetBackupVolumeSnapshots(restore.Spec.BackupName)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error fetching volume snapshots")
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, err.Error())
|
||||
restoreFailure = err
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
return errors.Wrap(err, "error fetching volume snapshots metadata")
|
||||
}
|
||||
|
||||
// Any return statement above this line means a total restore failure
|
||||
// Some failures after this line *may* be a total restore failure
|
||||
log.Info("starting restore")
|
||||
restoreWarnings, restoreErrors = c.restorer.Restore(log, restore, info.backup, volumeSnapshots, backupFile, actions, c.snapshotLocationLister, pluginManager)
|
||||
log.Info("restore completed")
|
||||
restoreLog.Info("starting restore")
|
||||
restoreWarnings, restoreErrors := c.restorer.Restore(restoreLog, restore, info.backup, volumeSnapshots, backupFile, actions, c.snapshotLocationLister, pluginManager)
|
||||
restoreLog.Info("restore completed")
|
||||
|
||||
// Try to upload the log file. This is best-effort. If we fail, we'll add to the velero errors.
|
||||
if err := gzippedLogFile.Close(); err != nil {
|
||||
c.logger.WithError(err).Error("error closing gzippedLogFile")
|
||||
}
|
||||
// Reset the offset to 0 for reading
|
||||
if _, err = logFile.Seek(0, 0); err != nil {
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error resetting log file offset to 0: %v", err))
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
if logReader, err := restoreLog.done(c.logger); err != nil {
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error getting restore log reader: %v", err))
|
||||
} else {
|
||||
if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil {
|
||||
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error uploading log file to backup storage: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logFile); err != nil {
|
||||
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to backup storage: %v", err))
|
||||
// At this point, no further logs should be written to restoreLog since it's been uploaded
|
||||
// to object storage.
|
||||
|
||||
//TODO(1.0): Remove warnings.Ark
|
||||
restore.Status.Warnings = len(restoreWarnings.Velero) + len(restoreWarnings.Cluster) + len(restoreWarnings.Ark)
|
||||
for _, w := range restoreWarnings.Namespaces {
|
||||
restore.Status.Warnings += len(w)
|
||||
}
|
||||
|
||||
//TODO (1.0): Remove errors.Ark
|
||||
restore.Status.Errors = len(restoreErrors.Velero) + len(restoreErrors.Cluster) + len(restoreErrors.Ark)
|
||||
for _, e := range restoreErrors.Namespaces {
|
||||
restore.Status.Errors += len(e)
|
||||
}
|
||||
|
||||
m := map[string]api.RestoreResult{
|
||||
@@ -520,30 +476,34 @@ func (c *restoreController) runRestore(
|
||||
"errors": restoreErrors,
|
||||
}
|
||||
|
||||
gzippedResultsFile := gzip.NewWriter(resultsFile)
|
||||
|
||||
if err := json.NewEncoder(gzippedResultsFile).Encode(m); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error encoding restore results")
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
}
|
||||
gzippedResultsFile.Close()
|
||||
|
||||
if _, err = resultsFile.Seek(0, 0); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0")
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
}
|
||||
if err := info.backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error uploading results file to backup storage")
|
||||
if err := putResults(restore, m, info.backupStore, c.logger); err != nil {
|
||||
c.logger.WithError(err).Error("Error uploading restore results to backup storage")
|
||||
}
|
||||
|
||||
return restoreResult{warnings: restoreWarnings, errors: restoreErrors}, restoreFailure
|
||||
return nil
|
||||
}
|
||||
|
||||
func downloadToTempFile(
|
||||
backupName string,
|
||||
backupStore persistence.BackupStore,
|
||||
logger logrus.FieldLogger,
|
||||
) (*os.File, error) {
|
||||
func putResults(restore *api.Restore, results map[string]api.RestoreResult, backupStore persistence.BackupStore, log logrus.FieldLogger) error {
|
||||
buf := new(bytes.Buffer)
|
||||
gzw := gzip.NewWriter(buf)
|
||||
defer gzw.Close()
|
||||
|
||||
if err := json.NewEncoder(gzw).Encode(results); err != nil {
|
||||
return errors.Wrap(err, "error encoding restore results to JSON")
|
||||
}
|
||||
|
||||
if err := gzw.Close(); err != nil {
|
||||
return errors.Wrap(err, "error closing gzip writer")
|
||||
}
|
||||
|
||||
if err := backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func downloadToTempFile(backupName string, backupStore persistence.BackupStore, logger logrus.FieldLogger) (*os.File, error) {
|
||||
readCloser, err := backupStore.GetBackupContents(backupName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -597,3 +557,50 @@ func patchRestore(original, updated *api.Restore, client velerov1client.Restores
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
type restoreLogger struct {
|
||||
logrus.FieldLogger
|
||||
file *os.File
|
||||
w *gzip.Writer
|
||||
}
|
||||
|
||||
func newRestoreLogger(restore *api.Restore, baseLogger logrus.FieldLogger, logLevel logrus.Level) (*restoreLogger, error) {
|
||||
file, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating temp file")
|
||||
}
|
||||
w := gzip.NewWriter(file)
|
||||
|
||||
logger := logging.DefaultLogger(logLevel)
|
||||
logger.Out = io.MultiWriter(os.Stdout, w)
|
||||
|
||||
return &restoreLogger{
|
||||
FieldLogger: logger.WithField("restore", kubeutil.NamespaceAndName(restore)),
|
||||
file: file,
|
||||
w: w,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// done stops the restoreLogger from being able to be written to, and returns
|
||||
// an io.Reader for getting the content of the logger. Any attempts to use
|
||||
// restoreLogger to log after calling done will panic.
|
||||
func (l *restoreLogger) done(log logrus.FieldLogger) (io.Reader, error) {
|
||||
l.FieldLogger = nil
|
||||
|
||||
if err := l.w.Close(); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("error closing gzip writer")
|
||||
}
|
||||
|
||||
if _, err := l.file.Seek(0, 0); err != nil {
|
||||
return nil, errors.Wrap(err, "error resetting log file offset to 0")
|
||||
}
|
||||
|
||||
return l.file, nil
|
||||
}
|
||||
|
||||
// closeAndRemove removes the logger's underlying temporary storage. This
|
||||
// method should be called when all logging and reading from the logger is
|
||||
// complete.
|
||||
func (l *restoreLogger) closeAndRemove(log logrus.FieldLogger) {
|
||||
closeAndRemoveFile(l.file, log)
|
||||
}
|
||||
|
||||
@@ -150,7 +150,7 @@ func TestFetchBackupInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessRestoreSkips(t *testing.T) {
|
||||
func TestProcessQueueItemSkips(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
restoreKey string
|
||||
@@ -212,14 +212,14 @@ func TestProcessRestoreSkips(t *testing.T) {
|
||||
sharedInformers.Velero().V1().Restores().Informer().GetStore().Add(test.restore)
|
||||
}
|
||||
|
||||
err := c.processRestore(test.restoreKey)
|
||||
err := c.processQueueItem(test.restoreKey)
|
||||
|
||||
assert.Equal(t, test.expectError, err != nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessRestore(t *testing.T) {
|
||||
func TestProcessQueueItem(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
restoreKey string
|
||||
@@ -524,7 +524,7 @@ func TestProcessRestore(t *testing.T) {
|
||||
pluginManager.On("CleanupClients")
|
||||
}
|
||||
|
||||
err = c.processRestore(key)
|
||||
err = c.processQueueItem(key)
|
||||
|
||||
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
|
||||
actions := client.Actions()
|
||||
@@ -716,7 +716,6 @@ func TestValidateAndComplete(t *testing.T) {
|
||||
assert.Equal(t, tc.expectedErrs, tc.restore.Status.ValidationErrors)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestvalidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user