Merge pull request #250 from ncdc/backup-controller-do-as-much-as-possible

BackupController: do as much as possible
This commit is contained in:
Steve Kriss
2018-01-03 12:10:51 -08:00
committed by GitHub
5 changed files with 130 additions and 89 deletions

View File

@@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
@@ -32,7 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kuberrs "k8s.io/apimachinery/pkg/util/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
@@ -211,13 +212,14 @@ func (controller *backupController) processBackup(key string) error {
return errors.Wrap(err, "error getting backup")
}
// TODO I think this is now unnecessary. We only initially place
// item with Phase = ("" | New) into the queue. Items will only get
// re-queued if syncHandler returns an error, which will only
// happen if there's an error updating Phase from its initial
// state to something else. So any time it's re-queued it will
// still have its initial state, which we've already confirmed
// is ("" | New)
// Double-check we have the correct phase. In the unlikely event that multiple controller
// instances are running, it's possible for controller A to succeed in changing the phase to
// InProgress, while controller B's attempt to patch the phase fails. When controller B
// reprocesses the same backup, it will either show up as New (informer hasn't seen the update
// yet) or as InProgress. In the former case, the patch attempt will fail again, until the
// informer sees the update. In the latter case, after the informer has seen the update to
// InProgress, we still need this check so we can return nil to indicate we've finished processing
// this key (even though it was a no-op).
switch backup.Status.Phase {
case "", api.BackupPhaseNew:
// only process new backups
@@ -322,39 +324,20 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin
}
func (controller *backupController) runBackup(backup *api.Backup, bucket string) error {
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for Backup")
}
log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")
logFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for Backup log")
return errors.Wrap(err, "error creating temp file for backup log")
}
defer closeAndRemoveFile(logFile, log)
defer func() {
var errs []error
// TODO should this be wrapped?
errs = append(errs, err)
if err := backupFile.Close(); err != nil {
errs = append(errs, errors.Wrap(err, "error closing Backup temp file"))
}
if err := os.Remove(backupFile.Name()); err != nil {
errs = append(errs, errors.Wrap(err, "error removing Backup temp file"))
}
if err := logFile.Close(); err != nil {
errs = append(errs, errors.Wrap(err, "error closing Backup log temp file"))
}
if err := os.Remove(logFile.Name()); err != nil {
errs = append(errs, errors.Wrap(err, "error removing Backup log temp file"))
}
err = kuberrs.NewAggregate(errs)
}()
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for backup")
}
defer closeAndRemoveFile(backupFile, log)
actions, err := controller.pluginManager.GetBackupItemActions(backup.Name)
if err != nil {
@@ -362,30 +345,42 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
}
defer controller.pluginManager.CloseBackupItemActions(backup.Name)
controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup")
var errs []error
var backupJsonToUpload, backupFileToUpload io.Reader
// Do the actual backup
if err := controller.backupper.Backup(backup, backupFile, logFile, actions); err != nil {
return err
errs = append(errs, err)
backup.Status.Phase = api.BackupPhaseFailed
} else {
backup.Status.Phase = api.BackupPhaseCompleted
}
controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed")
// note: updating this here so the uploaded JSON shows "completed". If
// the upload fails, we'll alter the phase in the calling func.
backup.Status.Phase = api.BackupPhaseCompleted
buf := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", buf); err != nil {
return errors.Wrap(err, "error encoding Backup")
backupJson := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", backupJson); err != nil {
errs = append(errs, errors.Wrap(err, "error encoding backup"))
} else {
// Only upload the json and backup tarball if encoding to json succeeded.
backupJsonToUpload = backupJson
backupFileToUpload = backupFile
}
// re-set the files' offset to 0 for reading
if _, err = backupFile.Seek(0, 0); err != nil {
return errors.Wrap(err, "error resetting Backup file offset")
}
if _, err = logFile.Seek(0, 0); err != nil {
return errors.Wrap(err, "error resetting Backup log file offset")
if err := controller.backupService.UploadBackup(bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil {
errs = append(errs, err)
}
return controller.backupService.UploadBackup(bucket, backup.Name, buf, backupFile, logFile)
log.Info("Backup completed")
return kerrors.NewAggregate(errs)
}
func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error closing file")
}
if err := os.Remove(file.Name()); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error removing file")
}
}