fix issue 5935

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2023-03-06 17:29:52 +08:00
parent 94fec66bc8
commit 40aae5ebdd
13 changed files with 229 additions and 111 deletions

View File

@@ -1002,7 +1002,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
},
},
{
name: "when deployments exist that are not in the cohabitating groups those are backed up along with apps/deployments",
name: "when deployments exist that are not in the cohabiting groups those are backed up along with apps/deployments",
backup: defaultBackup().Result(),
apiResources: []*test.APIResource{
test.VeleroDeployments(
@@ -1047,7 +1047,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
}
// TestBackupUsesNewCohabitatingResourcesForEachBackup ensures that when two backups are
// run that each include cohabitating resources, one copy of the relevant resources is
// run that each include cohabiting resources, one copy of the relevant resources is
// backed up in each backup. Verification is done by looking at the contents of the backup
// tarball. This covers a specific issue that was fixed by https://github.com/vmware-tanzu/velero/pull/485.
func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {

View File

@@ -22,7 +22,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
@@ -591,25 +590,14 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B
func (c *backupController) runBackup(backup *pkgbackup.Request) error {
c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Setting up backup log")
logFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for backup log")
}
gzippedLogFile := gzip.NewWriter(logFile)
// Assuming we successfully uploaded the log file, this will have already been closed below. It is safe to call
// close multiple times. If we get an error closing this, there's not really anything we can do about it.
defer gzippedLogFile.Close()
defer closeAndRemoveFile(logFile, c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)))
// Log the backup to both a backup log file and to stdout. This will help see what happened if the upload of the
// backup log failed for whatever reason.
logger := logging.DefaultLogger(c.backupLogLevel, c.formatFlag)
logger.Out = io.MultiWriter(os.Stdout, gzippedLogFile)
logCounter := logging.NewLogHook()
logger.Hooks.Add(logCounter)
backupLog := logger.WithField(Backup, kubeutil.NamespaceAndName(backup))
backupLog, err := logging.NewTempFileLogger(c.backupLogLevel, c.formatFlag, logCounter, logrus.Fields{Backup: kubeutil.NamespaceAndName(backup)})
if err != nil {
return errors.Wrap(err, "error creating dual mode logger for backup")
}
defer backupLog.Dispose(c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)))
backupLog.Info("Setting up backup temp file")
backupFile, err := ioutil.TempFile("", "")
@@ -735,9 +723,7 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
"errors": backupErrors,
}
if err := gzippedLogFile.Close(); err != nil {
c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).WithError(err).Error("error closing gzippedLogFile")
}
backupLog.DoneForPersist(c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)))
// Assign finalize phase as close to end as possible so that any errors
// logged to backupLog are captured. This is done before uploading the
@@ -760,8 +746,12 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
return err
}
if errs := persistBackup(backup, backupFile, logFile, backupStore, volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses, results); len(errs) > 0 {
fatalErrs = append(fatalErrs, errs...)
if logFile, err := backupLog.GetPersistFile(); err != nil {
fatalErrs = append(fatalErrs, errors.Wrap(err, "error getting backup log file"))
} else {
if errs := persistBackup(backup, backupFile, logFile, backupStore, volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses, results); len(errs) > 0 {
fatalErrs = append(fatalErrs, errs...)
}
}
c.logger.WithField(Backup, kubeutil.NamespaceAndName(backup)).Info("Backup completed")

View File

@@ -100,9 +100,8 @@ type restoreReconciler struct {
}
type backupInfo struct {
backup *api.Backup
location *api.BackupStorageLocation
backupStore persistence.BackupStore
backup *api.Backup
location *api.BackupStorageLocation
}
func NewRestoreReconciler(
@@ -160,13 +159,8 @@ func (r *restoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// store a copy of the original restore for creating patch
original := restore.DeepCopy()
// Validate the restore and fetch the backup. Note that the plugin
// manager used here is not the same one used by c.runValidatedRestore,
// since within that function we want the plugin manager to log to
// our per-restore log (which is instantiated within c.runValidatedRestore).
pluginManager := r.newPluginManager(r.logger)
defer pluginManager.CleanupClients()
info := r.validateAndComplete(restore, pluginManager)
// Validate the restore and fetch the backup
info := r.validateAndComplete(restore)
// Register attempts after validation so we don't have to fetch the backup multiple times
backupScheduleName := restore.Spec.ScheduleName
@@ -243,7 +237,7 @@ func (r *restoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
func (r *restoreReconciler) validateAndComplete(restore *api.Restore, pluginManager clientmgmt.Manager) backupInfo {
func (r *restoreReconciler) validateAndComplete(restore *api.Restore) backupInfo {
// add non-restorable resources to restore's excluded resources
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
for _, nonrestorable := range nonRestorableResources {
@@ -326,7 +320,7 @@ func (r *restoreReconciler) validateAndComplete(restore *api.Restore, pluginMana
}
}
info, err := r.fetchBackupInfo(restore.Spec.BackupName, pluginManager)
info, err := r.fetchBackupInfo(restore.Spec.BackupName)
if err != nil {
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
return backupInfo{}
@@ -381,7 +375,7 @@ func mostRecentCompletedBackup(backups []api.Backup) api.Backup {
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
// find it, it returns an error.
func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) {
func (r *restoreReconciler) fetchBackupInfo(backupName string) (backupInfo, error) {
backup := &api.Backup{}
err := r.kbClient.Get(context.Background(), types.NamespacedName{Namespace: r.namespace, Name: backupName}, backup)
if err != nil {
@@ -396,15 +390,9 @@ func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager cli
return backupInfo{}, errors.WithStack(err)
}
backupStore, err := r.backupStoreGetter.Get(location, pluginManager, r.logger)
if err != nil {
return backupInfo{}, err
}
return backupInfo{
backup: backup,
location: location,
backupStore: backupStore,
backup: backup,
location: location,
}, nil
}
@@ -415,15 +403,20 @@ func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager cli
func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backupInfo) error {
// instantiate the per-restore logger that will output both to a temp file
// (for upload to object storage) and to stdout.
restoreLog, err := newRestoreLogger(restore, r.restoreLogLevel, r.logFormat)
restoreLog, err := logging.NewTempFileLogger(r.restoreLogLevel, r.logFormat, nil, logrus.Fields{"restore": kubeutil.NamespaceAndName(restore)})
if err != nil {
return err
}
defer restoreLog.closeAndRemove(r.logger)
defer restoreLog.Dispose(r.logger)
pluginManager := r.newPluginManager(restoreLog)
defer pluginManager.CleanupClients()
backupStore, err := r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
if err != nil {
return err
}
actions, err := pluginManager.GetRestoreItemActionsV2()
if err != nil {
return errors.Wrap(err, "error getting restore item actions")
@@ -436,7 +429,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
}
snapshotItemResolver := framework.NewItemSnapshotterResolver(itemSnapshotters)
backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, restoreLog)
backupFile, err := downloadToTempFile(restore.Spec.BackupName, backupStore, restoreLog)
if err != nil {
return errors.Wrap(err, "error downloading backup")
}
@@ -455,7 +448,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
return errors.WithStack(err)
}
volumeSnapshots, err := info.backupStore.GetBackupVolumeSnapshots(restore.Spec.BackupName)
volumeSnapshots, err := backupStore.GetBackupVolumeSnapshots(restore.Spec.BackupName)
if err != nil {
return errors.Wrap(err, "error fetching volume snapshots metadata")
}
@@ -502,17 +495,19 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
}
restoreLog.Info("restore completed")
restoreLog.DoneForPersist(r.logger)
// re-instantiate the backup store because credentials could have changed since the original
// instantiation, if this was a long-running restore
info.backupStore, err = r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
backupStore, err = r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
if err != nil {
return errors.Wrap(err, "error setting up backup store to persist log and results files")
}
if logReader, err := restoreLog.done(r.logger); err != nil {
if logReader, err := restoreLog.GetPersistFile(); err != nil {
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error getting restore log reader: %v", err))
} else {
if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil {
if err := backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil {
restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error uploading log file to backup storage: %v", err))
}
}
@@ -535,11 +530,11 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
"errors": restoreErrors,
}
if err := putResults(restore, m, info.backupStore); err != nil {
if err := putResults(restore, m, backupStore); err != nil {
r.logger.WithError(err).Error("Error uploading restore results to backup storage")
}
if err := putRestoredResourceList(restore, restoreReq.RestoredResourceList(), info.backupStore); err != nil {
if err := putRestoredResourceList(restore, restoreReq.RestoredResourceList(), backupStore); err != nil {
r.logger.WithError(err).Error("Error uploading restored resource list to backup storage")
}
@@ -643,50 +638,3 @@ func downloadToTempFile(backupName string, backupStore persistence.BackupStore,
return file, nil
}
type restoreLogger struct {
logrus.FieldLogger
file *os.File
w *gzip.Writer
}
func newRestoreLogger(restore *api.Restore, logLevel logrus.Level, logFormat logging.Format) (*restoreLogger, error) {
file, err := ioutil.TempFile("", "")
if err != nil {
return nil, errors.Wrap(err, "error creating temp file")
}
w := gzip.NewWriter(file)
logger := logging.DefaultLogger(logLevel, logFormat)
logger.Out = io.MultiWriter(os.Stdout, w)
return &restoreLogger{
FieldLogger: logger.WithField("restore", kubeutil.NamespaceAndName(restore)),
file: file,
w: w,
}, nil
}
// done stops the restoreLogger from being able to be written to, and returns
// an io.Reader for getting the content of the logger. Any attempts to use
// restoreLogger to log after calling done will panic.
func (l *restoreLogger) done(log logrus.FieldLogger) (io.Reader, error) {
l.FieldLogger = nil
if err := l.w.Close(); err != nil {
log.WithError(errors.WithStack(err)).Error("error closing gzip writer")
}
if _, err := l.file.Seek(0, 0); err != nil {
return nil, errors.Wrap(err, "error resetting log file offset to 0")
}
return l.file, nil
}
// closeAndRemove removes the logger's underlying temporary storage. This
// method should be called when all logging and reading from the logger is
// complete.
func (l *restoreLogger) closeAndRemove(log logrus.FieldLogger) {
closeAndRemoveFile(l.file, log)
}

View File

@@ -136,7 +136,7 @@ func TestFetchBackupInfo(t *testing.T) {
backupStore.On("GetBackupMetadata", test.backupName).Return(test.backupStoreBackup, nil).Maybe()
}
info, err := r.fetchBackupInfo(test.backupName, pluginManager)
info, err := r.fetchBackupInfo(test.backupName)
require.Equal(t, test.expectedErr, err != nil)
if test.expectedRes != nil {
@@ -604,7 +604,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Phase(velerov1api.BackupPhaseCompleted).
Result()))
r.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore)
assert.Contains(t, restore.Status.ValidationErrors, "No backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
@@ -620,7 +620,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Result(),
))
r.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore)
assert.Contains(t, restore.Status.ValidationErrors, "No completed backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
@@ -651,7 +651,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
ScheduleName: "schedule-1",
},
}
r.validateAndComplete(restore, pluginManager)
r.validateAndComplete(restore)
assert.Nil(t, restore.Status.ValidationErrors)
assert.Equal(t, "foo", restore.Spec.BackupName)
}

View File

@@ -0,0 +1,104 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"compress/gzip"
"io"
"io/ioutil"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// DualModeLogger is a thread safe logger interface to write logs to dual targets, one of which
// is a persist file, so that the log could be further transferred.
type DualModeLogger interface {
logrus.FieldLogger
// DoneForPersist stops outputting logs to the persist file
DoneForPersist(log logrus.FieldLogger)
// GetPersistFile moves the persist file pointer to beginning and returns it
GetPersistFile() (*os.File, error)
// Dispose closes the temp file pointer and removes the file
Dispose(log logrus.FieldLogger)
}
type tempFileLogger struct {
logrus.FieldLogger
logger *logrus.Logger
file *os.File
w *gzip.Writer
}
// NewTempFileLogger creates a DualModeLogger instance that writes logs to both Stdout and a file in the temp folder.
func NewTempFileLogger(logLevel logrus.Level, logFormat Format, hook *LogHook, fields logrus.Fields) (DualModeLogger, error) {
file, err := ioutil.TempFile("", "")
if err != nil {
return nil, errors.Wrap(err, "error creating temp file")
}
w := gzip.NewWriter(file)
logger := DefaultLogger(logLevel, logFormat)
logger.Out = io.MultiWriter(os.Stdout, w)
if hook != nil {
logger.Hooks.Add(hook)
}
return &tempFileLogger{
FieldLogger: logger.WithFields(fields),
logger: logger,
file: file,
w: w,
}, nil
}
func (p *tempFileLogger) DoneForPersist(log logrus.FieldLogger) {
p.logger.SetOutput(os.Stdout)
if err := p.w.Close(); err != nil {
log.WithError(err).Warn("error closing gzip writer")
}
}
func (p *tempFileLogger) GetPersistFile() (*os.File, error) {
if _, err := p.file.Seek(0, 0); err != nil {
return nil, errors.Wrap(err, "error resetting log file offset to 0")
}
return p.file, nil
}
func (p *tempFileLogger) Dispose(log logrus.FieldLogger) {
p.w.Close()
closeAndRemoveFile(p.file, log)
}
func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if file == nil {
log.Debug("Skipping removal of temp log file due to nil file pointer")
return
}
if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Warn("error closing temp log file")
}
if err := os.Remove(file.Name()); err != nil {
log.WithError(err).WithField("file", file.Name()).Warn("error removing temp log file")
}
}

View File

@@ -0,0 +1,75 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"compress/gzip"
"io"
"os"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestDualModeLogger(t *testing.T) {
logMsgExpect := "Expected message in log"
logMsgUnexpect := "Unexpected message in log"
logger, err := NewTempFileLogger(logrus.DebugLevel, FormatText, nil, logrus.Fields{})
require.NoError(t, err)
logger.Info(logMsgExpect)
logger.DoneForPersist(velerotest.NewLogger())
logger.Info(logMsgUnexpect)
logFile, err := logger.GetPersistFile()
require.NoError(t, err)
logStr, err := readLogString(logFile)
require.NoError(t, err)
assert.Equal(t, true, strings.Contains(logStr, logMsgExpect))
assert.Equal(t, false, strings.Contains(logStr, logMsgUnexpect))
logger.Dispose(velerotest.NewLogger())
_, err = os.Stat(logFile.Name())
assert.Equal(t, true, os.IsNotExist(err))
}
func readLogString(file *os.File) (string, error) {
gzr, err := gzip.NewReader(file)
if err != nil {
return "", err
}
buffer := make([]byte, 1024)
_, err = gzr.Read(buffer)
if err != io.EOF {
return "", err
}
return string(buffer[:]), nil
}