Move restore warnings/errors to object storage

If you have a large number of warnings and/or errors, the restore
object's size can exceed the maximum allowed by etcd. Move them to
object storage, and add a new describe command to fetch and display them
on the fly.

Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
This commit is contained in:
Andy Goldstein
2017-10-26 11:24:16 -04:00
parent 203a9c6e05
commit b2d80471ac
20 changed files with 574 additions and 65 deletions

View File

@@ -17,7 +17,9 @@ limitations under the License.
package controller
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@@ -254,7 +256,17 @@ func (controller *restoreController) processRestore(key string) error {
logContext.Debug("Running restore")
// execution & upload of restore
restore.Status.Warnings, restore.Status.Errors = controller.runRestore(restore, controller.bucket)
restoreWarnings, restoreErrors := controller.runRestore(restore, controller.bucket)
restore.Status.Warnings = len(restoreWarnings.Ark) + len(restoreWarnings.Cluster)
for _, w := range restoreWarnings.Namespaces {
restore.Status.Warnings += len(w)
}
restore.Status.Errors = len(restoreErrors.Ark) + len(restoreErrors.Cluster)
for _, e := range restoreErrors.Namespaces {
restore.Status.Errors += len(e)
}
logContext.Debug("restore completed")
restore.Status.Phase = api.RestorePhaseCompleted
@@ -327,22 +339,29 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back
return backup, nil
}
func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (warnings, restoreErrors api.RestoreResult) {
logContext := controller.logger.WithField("restore", kubeutil.NamespaceAndName(restore))
func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) {
logContext := controller.logger.WithFields(
logrus.Fields{
"restore": kubeutil.NamespaceAndName(restore),
"backup": restore.Spec.BackupName,
})
backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName)
if err != nil {
logContext.WithError(err).WithField("backup", restore.Spec.BackupName).Error("Error getting backup")
logContext.WithError(err).Error("Error getting backup")
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
return
}
tmpFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger)
var tempFiles []*os.File
backupFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger)
if err != nil {
logContext.WithError(err).WithField("backup", restore.Spec.BackupName).Error("Error downloading backup")
logContext.WithError(err).Error("Error downloading backup")
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
return
}
tempFiles = append(tempFiles, backupFile)
logFile, err := ioutil.TempFile("", "")
if err != nil {
@@ -350,26 +369,29 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
return
}
tempFiles = append(tempFiles, logFile)
resultsFile, err := ioutil.TempFile("", "")
if err != nil {
logContext.WithError(errors.WithStack(err)).Error("Error creating results temp file")
restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
return
}
tempFiles = append(tempFiles, resultsFile)
defer func() {
if err := tmpFile.Close(); err != nil {
logContext.WithError(errors.WithStack(err)).WithField("file", tmpFile.Name()).Error("Error closing file")
}
for _, file := range tempFiles {
if err := file.Close(); err != nil {
logContext.WithError(errors.WithStack(err)).WithField("file", file.Name()).Error("Error closing file")
}
if err := os.Remove(tmpFile.Name()); err != nil {
logContext.WithError(errors.WithStack(err)).WithField("file", tmpFile.Name()).Error("Error removing file")
}
if err := logFile.Close(); err != nil {
logContext.WithError(errors.WithStack(err)).WithField("file", logFile.Name()).Error("Error closing file")
}
if err := os.Remove(logFile.Name()); err != nil {
logContext.WithError(errors.WithStack(err)).WithField("file", logFile.Name()).Error("Error removing file")
if err := os.Remove(file.Name()); err != nil {
logContext.WithError(errors.WithStack(err)).WithField("file", file.Name()).Error("Error removing file")
}
}
}()
warnings, restoreErrors = controller.restorer.Restore(restore, backup, tmpFile, logFile)
restoreWarnings, restoreErrors = controller.restorer.Restore(restore, backup, backupFile, logFile)
// Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors.
@@ -383,6 +405,23 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err))
}
m := map[string]api.RestoreResult{
"warnings": restoreWarnings,
"errors": restoreErrors,
}
gzippedResultsFile := gzip.NewWriter(resultsFile)
if err := json.NewEncoder(gzippedResultsFile).Encode(m); err != nil {
logContext.WithError(errors.WithStack(err)).Error("Error encoding restore results")
return
}
gzippedResultsFile.Close()
if err := controller.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage")
}
return
}

View File

@@ -189,9 +189,7 @@ func TestProcessRestore(t *testing.T) {
expectedRestoreUpdates: []*api.Restore{
NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).Restore,
NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseCompleted).
WithErrors(api.RestoreResult{
Ark: []string{"no backup here"},
}).
WithErrors(1).
Restore,
},
},
@@ -204,11 +202,7 @@ func TestProcessRestore(t *testing.T) {
expectedRestoreUpdates: []*api.Restore{
NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).Restore,
NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseCompleted).
WithErrors(api.RestoreResult{
Namespaces: map[string][]string{
"ns-1": {"blarg"},
},
}).
WithErrors(1).
Restore,
},
expectedRestorerCall: NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).Restore,
@@ -319,6 +313,7 @@ func TestProcessRestore(t *testing.T) {
backupSvc.On("DownloadBackup", mock.Anything, mock.Anything).Return(downloadedBackup, nil)
restorer.On("Restore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors)
backupSvc.On("UploadRestoreLog", "bucket", test.restore.Spec.BackupName, test.restore.Name, mock.Anything).Return(test.uploadLogError)
backupSvc.On("UploadRestoreResults", "bucket", test.restore.Spec.BackupName, test.restore.Name, mock.Anything).Return(nil)
}
var (