mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-07 05:46:37 +00:00
RIAv2 async operations controller work
Signed-off-by: Scott Seago <sseago@redhat.com>
This commit is contained in:
@@ -17,8 +17,8 @@ limitations under the License.
|
||||
package controller
|
||||
|
||||
const (
|
||||
BackupOperations = "backup-operations"
|
||||
Backup = "backup"
|
||||
BackupOperations = "backup-operations"
|
||||
BackupDeletion = "backup-deletion"
|
||||
BackupFinalizer = "backup-finalizer"
|
||||
BackupRepo = "backup-repo"
|
||||
@@ -29,14 +29,15 @@ const (
|
||||
PodVolumeBackup = "pod-volume-backup"
|
||||
PodVolumeRestore = "pod-volume-restore"
|
||||
Restore = "restore"
|
||||
RestoreOperations = "restore-operations"
|
||||
Schedule = "schedule"
|
||||
ServerStatusRequest = "server-status-request"
|
||||
)
|
||||
|
||||
// DisableableControllers is a list of controllers that can be disabled
|
||||
var DisableableControllers = []string{
|
||||
BackupOperations,
|
||||
Backup,
|
||||
BackupOperations,
|
||||
BackupDeletion,
|
||||
BackupFinalizer,
|
||||
BackupSync,
|
||||
@@ -44,6 +45,7 @@ var DisableableControllers = []string{
|
||||
GarbageCollection,
|
||||
BackupRepo,
|
||||
Restore,
|
||||
RestoreOperations,
|
||||
Schedule,
|
||||
ServerStatusRequest,
|
||||
}
|
||||
|
||||
@@ -44,6 +44,8 @@ type downloadRequestReconciler struct {
|
||||
|
||||
// used to force update of async backup item operations before processing download request
|
||||
backupItemOperationsMap *itemoperationmap.BackupItemOperationsMap
|
||||
// used to force update of async restore item operations before processing download request
|
||||
restoreItemOperationsMap *itemoperationmap.RestoreItemOperationsMap
|
||||
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
@@ -56,14 +58,16 @@ func NewDownloadRequestReconciler(
|
||||
backupStoreGetter persistence.ObjectBackupStoreGetter,
|
||||
log logrus.FieldLogger,
|
||||
backupItemOperationsMap *itemoperationmap.BackupItemOperationsMap,
|
||||
restoreItemOperationsMap *itemoperationmap.RestoreItemOperationsMap,
|
||||
) *downloadRequestReconciler {
|
||||
return &downloadRequestReconciler{
|
||||
client: client,
|
||||
clock: clock,
|
||||
newPluginManager: newPluginManager,
|
||||
backupStoreGetter: backupStoreGetter,
|
||||
backupItemOperationsMap: backupItemOperationsMap,
|
||||
log: log,
|
||||
client: client,
|
||||
clock: clock,
|
||||
newPluginManager: newPluginManager,
|
||||
backupStoreGetter: backupStoreGetter,
|
||||
backupItemOperationsMap: backupItemOperationsMap,
|
||||
restoreItemOperationsMap: restoreItemOperationsMap,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +133,8 @@ func (r *downloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
|
||||
if downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreLog ||
|
||||
downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreResults ||
|
||||
downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreResourceList {
|
||||
downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreResourceList ||
|
||||
downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreItemOperations {
|
||||
restore := &velerov1api.Restore{}
|
||||
if err := r.client.Get(ctx, kbclient.ObjectKey{
|
||||
Namespace: downloadRequest.Namespace,
|
||||
@@ -172,6 +177,13 @@ func (r *downloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
// ignore errors here. If we can't upload anything here, process the download as usual
|
||||
_ = r.backupItemOperationsMap.UpdateForBackup(backupStore, backupName)
|
||||
}
|
||||
// If this is a request for restore item operations, force upload of in-memory operations that
|
||||
// are not yet uploaded (if there are any)
|
||||
if downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreItemOperations &&
|
||||
r.restoreItemOperationsMap != nil {
|
||||
// ignore errors here. If we can't upload anything here, process the download as usual
|
||||
_ = r.restoreItemOperationsMap.UpdateForRestore(backupStore, downloadRequest.Spec.Target.Name)
|
||||
}
|
||||
if downloadRequest.Status.DownloadURL, err = backupStore.GetDownloadURL(downloadRequest.Spec.Target); err != nil {
|
||||
return ctrl.Result{Requeue: true}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
@@ -113,6 +113,7 @@ var _ = Describe("Download Request Reconciler", func() {
|
||||
NewFakeObjectBackupStoreGetter(backupStores),
|
||||
velerotest.NewLogger(),
|
||||
nil,
|
||||
nil,
|
||||
)
|
||||
|
||||
if test.backupLocation != nil && test.expectGetsURL {
|
||||
|
||||
@@ -40,6 +40,7 @@ import (
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/persistence"
|
||||
@@ -84,15 +85,16 @@ var nonRestorableResources = []string{
|
||||
}
|
||||
|
||||
type restoreReconciler struct {
|
||||
ctx context.Context
|
||||
namespace string
|
||||
restorer pkgrestore.Restorer
|
||||
kbClient client.Client
|
||||
restoreLogLevel logrus.Level
|
||||
logger logrus.FieldLogger
|
||||
metrics *metrics.ServerMetrics
|
||||
logFormat logging.Format
|
||||
clock clock.WithTickerAndDelayedExecution
|
||||
ctx context.Context
|
||||
namespace string
|
||||
restorer pkgrestore.Restorer
|
||||
kbClient client.Client
|
||||
restoreLogLevel logrus.Level
|
||||
logger logrus.FieldLogger
|
||||
metrics *metrics.ServerMetrics
|
||||
logFormat logging.Format
|
||||
clock clock.WithTickerAndDelayedExecution
|
||||
defaultItemOperationTimeout time.Duration
|
||||
|
||||
newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager
|
||||
backupStoreGetter persistence.ObjectBackupStoreGetter
|
||||
@@ -114,17 +116,19 @@ func NewRestoreReconciler(
|
||||
backupStoreGetter persistence.ObjectBackupStoreGetter,
|
||||
metrics *metrics.ServerMetrics,
|
||||
logFormat logging.Format,
|
||||
defaultItemOperationTimeout time.Duration,
|
||||
) *restoreReconciler {
|
||||
r := &restoreReconciler{
|
||||
ctx: ctx,
|
||||
namespace: namespace,
|
||||
restorer: restorer,
|
||||
kbClient: kbClient,
|
||||
logger: logger,
|
||||
restoreLogLevel: restoreLogLevel,
|
||||
metrics: metrics,
|
||||
logFormat: logFormat,
|
||||
clock: &clock.RealClock{},
|
||||
ctx: ctx,
|
||||
namespace: namespace,
|
||||
restorer: restorer,
|
||||
kbClient: kbClient,
|
||||
logger: logger,
|
||||
restoreLogLevel: restoreLogLevel,
|
||||
metrics: metrics,
|
||||
logFormat: logFormat,
|
||||
clock: &clock.RealClock{},
|
||||
defaultItemOperationTimeout: defaultItemOperationTimeout,
|
||||
|
||||
// use variables to refer to these functions so they can be
|
||||
// replaced with fakes for testing.
|
||||
@@ -172,6 +176,10 @@ func (r *restoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
|
||||
restore.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
restore.Status.Phase = api.RestorePhaseInProgress
|
||||
}
|
||||
if restore.Spec.ItemOperationTimeout.Duration == 0 {
|
||||
// set default item operation timeout
|
||||
restore.Spec.ItemOperationTimeout.Duration = r.defaultItemOperationTimeout
|
||||
}
|
||||
|
||||
// patch to update status and persist to API
|
||||
err = kubeutil.PatchResource(original, restore, r.kbClient)
|
||||
@@ -194,17 +202,14 @@ func (r *restoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
|
||||
restore.Status.Phase = api.RestorePhaseFailed
|
||||
restore.Status.FailureReason = err.Error()
|
||||
r.metrics.RegisterRestoreFailed(backupScheduleName)
|
||||
} else if restore.Status.Errors > 0 {
|
||||
log.Debug("Restore partially failed")
|
||||
restore.Status.Phase = api.RestorePhasePartiallyFailed
|
||||
r.metrics.RegisterRestorePartialFailure(backupScheduleName)
|
||||
} else {
|
||||
log.Debug("Restore completed")
|
||||
restore.Status.Phase = api.RestorePhaseCompleted
|
||||
r.metrics.RegisterRestoreSuccess(backupScheduleName)
|
||||
}
|
||||
|
||||
restore.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
// mark completion if in terminal phase
|
||||
if restore.Status.Phase == api.RestorePhaseFailed ||
|
||||
restore.Status.Phase == api.RestorePhasePartiallyFailed ||
|
||||
restore.Status.Phase == api.RestorePhaseCompleted {
|
||||
restore.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
}
|
||||
log.Debug("Updating restore's final status")
|
||||
if err = kubeutil.PatchResource(original, restore, r.kbClient); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Info("Error updating restore's final status")
|
||||
@@ -375,15 +380,19 @@ func mostRecentCompletedBackup(backups []api.Backup) api.Backup {
|
||||
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
|
||||
// find it, it returns an error.
|
||||
func (r *restoreReconciler) fetchBackupInfo(backupName string) (backupInfo, error) {
|
||||
return fetchBackupInfoInternal(r.kbClient, r.namespace, backupName)
|
||||
}
|
||||
|
||||
func fetchBackupInfoInternal(kbClient client.Client, namespace, backupName string) (backupInfo, error) {
|
||||
backup := &api.Backup{}
|
||||
err := r.kbClient.Get(context.Background(), types.NamespacedName{Namespace: r.namespace, Name: backupName}, backup)
|
||||
err := kbClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: backupName}, backup)
|
||||
if err != nil {
|
||||
return backupInfo{}, err
|
||||
return backupInfo{}, errors.Wrap(err, fmt.Sprintf("can't find backup %s/%s", namespace, backupName))
|
||||
}
|
||||
|
||||
location := &api.BackupStorageLocation{}
|
||||
if err := r.kbClient.Get(context.Background(), client.ObjectKey{
|
||||
Namespace: r.namespace,
|
||||
if err := kbClient.Get(context.Background(), client.ObjectKey{
|
||||
Namespace: namespace,
|
||||
Name: backup.Spec.StorageLocation,
|
||||
}, location); err != nil {
|
||||
return backupInfo{}, errors.WithStack(err)
|
||||
@@ -469,6 +478,21 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
|
||||
restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, snapshotItemResolver,
|
||||
pluginManager)
|
||||
|
||||
// Iterate over restore item operations and update progress.
|
||||
// Any errors on operations at this point should be added to restore errors.
|
||||
// If any operations are still not complete, then restore will not be set to
|
||||
// Completed yet.
|
||||
inProgressOperations, _, opsCompleted, opsFailed, errs := getRestoreItemOperationProgress(restoreReq.Restore, pluginManager, *restoreReq.GetItemOperationsList())
|
||||
if len(errs) > 0 {
|
||||
for err := range errs {
|
||||
restoreLog.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
restore.Status.RestoreItemOperationsAttempted = len(*restoreReq.GetItemOperationsList())
|
||||
restore.Status.RestoreItemOperationsCompleted = opsCompleted
|
||||
restore.Status.RestoreItemOperationsFailed = opsFailed
|
||||
|
||||
// log errors and warnings to the restore log
|
||||
for _, msg := range restoreErrors.Velero {
|
||||
restoreLog.Errorf("Velero restore error: %v", msg)
|
||||
@@ -537,6 +561,29 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
|
||||
r.logger.WithError(err).Error("Error uploading restored resource list to backup storage")
|
||||
}
|
||||
|
||||
if err := putOperationsForRestore(restore, *restoreReq.GetItemOperationsList(), backupStore); err != nil {
|
||||
r.logger.WithError(err).Error("Error uploading restore item action operation resource list to backup storage")
|
||||
}
|
||||
|
||||
if restore.Status.Errors > 0 {
|
||||
if inProgressOperations {
|
||||
r.logger.Debug("Restore WaitingForPluginOperationsPartiallyFailed")
|
||||
restore.Status.Phase = api.RestorePhaseWaitingForPluginOperationsPartiallyFailed
|
||||
} else {
|
||||
r.logger.Debug("Restore partially failed")
|
||||
restore.Status.Phase = api.RestorePhasePartiallyFailed
|
||||
r.metrics.RegisterRestorePartialFailure(restore.Spec.ScheduleName)
|
||||
}
|
||||
} else {
|
||||
if inProgressOperations {
|
||||
r.logger.Debug("Restore WaitingForPluginOperations")
|
||||
restore.Status.Phase = api.RestorePhaseWaitingForPluginOperations
|
||||
} else {
|
||||
r.logger.Debug("Restore completed")
|
||||
restore.Status.Phase = api.RestorePhaseCompleted
|
||||
r.metrics.RegisterRestoreSuccess(restore.Spec.ScheduleName)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -603,6 +650,26 @@ func putRestoredResourceList(restore *api.Restore, list map[string][]string, bac
|
||||
return nil
|
||||
}
|
||||
|
||||
func putOperationsForRestore(restore *api.Restore, operations []*itemoperation.RestoreOperation, backupStore persistence.BackupStore) error {
|
||||
buf := new(bytes.Buffer)
|
||||
gzw := gzip.NewWriter(buf)
|
||||
defer gzw.Close()
|
||||
|
||||
if err := json.NewEncoder(gzw).Encode(operations); err != nil {
|
||||
return errors.Wrap(err, "error encoding restore item operations list to JSON")
|
||||
}
|
||||
|
||||
if err := gzw.Close(); err != nil {
|
||||
return errors.Wrap(err, "error closing gzip writer")
|
||||
}
|
||||
|
||||
if err := backupStore.PutRestoreItemOperations(restore.Name, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func downloadToTempFile(backupName string, backupStore persistence.BackupStore, logger logrus.FieldLogger) (*os.File, error) {
|
||||
readCloser, err := backupStore.GetBackupContents(backupName)
|
||||
if err != nil {
|
||||
|
||||
@@ -111,6 +111,7 @@ func TestFetchBackupInfo(t *testing.T) {
|
||||
NewFakeSingleObjectBackupStoreGetter(backupStore),
|
||||
metrics.NewServerMetrics(),
|
||||
formatFlag,
|
||||
60*time.Minute,
|
||||
)
|
||||
|
||||
if test.backupStoreError == nil {
|
||||
@@ -193,6 +194,7 @@ func TestProcessQueueItemSkips(t *testing.T) {
|
||||
nil, // backupStoreGetter
|
||||
metrics.NewServerMetrics(),
|
||||
formatFlag,
|
||||
60*time.Minute,
|
||||
)
|
||||
|
||||
_, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{
|
||||
@@ -422,6 +424,7 @@ func TestRestoreReconcile(t *testing.T) {
|
||||
NewFakeSingleObjectBackupStoreGetter(backupStore),
|
||||
metrics.NewServerMetrics(),
|
||||
formatFlag,
|
||||
60*time.Minute,
|
||||
)
|
||||
|
||||
r.clock = clocktesting.NewFakeClock(now)
|
||||
@@ -453,6 +456,7 @@ func TestRestoreReconcile(t *testing.T) {
|
||||
|
||||
backupStore.On("PutRestoreResults", test.backup.Name, test.restore.Name, mock.Anything).Return(nil)
|
||||
backupStore.On("PutRestoredResourceList", test.restore.Name, mock.Anything).Return(nil)
|
||||
backupStore.On("PutRestoreItemOperations", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
volumeSnapshots := []*volume.Snapshot{
|
||||
{
|
||||
@@ -586,6 +590,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
|
||||
NewFakeSingleObjectBackupStoreGetter(backupStore),
|
||||
metrics.NewServerMetrics(),
|
||||
formatFlag,
|
||||
60*time.Minute,
|
||||
)
|
||||
|
||||
restore := &velerov1api.Restore{
|
||||
@@ -746,7 +751,7 @@ func TestMostRecentCompletedBackup(t *testing.T) {
|
||||
}
|
||||
|
||||
func NewRestore(ns, name, backup, includeNS, includeResource string, phase velerov1api.RestorePhase) *builder.RestoreBuilder {
|
||||
restore := builder.ForRestore(ns, name).Phase(phase).Backup(backup)
|
||||
restore := builder.ForRestore(ns, name).Phase(phase).Backup(backup).ItemOperationTimeout(60 * time.Minute)
|
||||
|
||||
if includeNS != "" {
|
||||
restore = restore.IncludedNamespaces(includeNS)
|
||||
|
||||
352
pkg/controller/restore_operations_controller.go
Normal file
352
pkg/controller/restore_operations_controller.go
Normal file
@@ -0,0 +1,352 @@
|
||||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clocks "k8s.io/utils/clock"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperationmap"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/persistence"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultRestoreOperationsFrequency = 10 * time.Second
|
||||
)
|
||||
|
||||
type restoreOperationsReconciler struct {
|
||||
client.Client
|
||||
namespace string
|
||||
logger logrus.FieldLogger
|
||||
clock clocks.WithTickerAndDelayedExecution
|
||||
frequency time.Duration
|
||||
itemOperationsMap *itemoperationmap.RestoreItemOperationsMap
|
||||
newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager
|
||||
backupStoreGetter persistence.ObjectBackupStoreGetter
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewRestoreOperationsReconciler(
|
||||
logger logrus.FieldLogger,
|
||||
namespace string,
|
||||
client client.Client,
|
||||
frequency time.Duration,
|
||||
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
|
||||
backupStoreGetter persistence.ObjectBackupStoreGetter,
|
||||
metrics *metrics.ServerMetrics,
|
||||
itemOperationsMap *itemoperationmap.RestoreItemOperationsMap,
|
||||
) *restoreOperationsReconciler {
|
||||
abor := &restoreOperationsReconciler{
|
||||
Client: client,
|
||||
logger: logger,
|
||||
namespace: namespace,
|
||||
clock: clocks.RealClock{},
|
||||
frequency: frequency,
|
||||
itemOperationsMap: itemOperationsMap,
|
||||
newPluginManager: newPluginManager,
|
||||
backupStoreGetter: backupStoreGetter,
|
||||
metrics: metrics,
|
||||
}
|
||||
if abor.frequency <= 0 {
|
||||
abor.frequency = defaultRestoreOperationsFrequency
|
||||
}
|
||||
return abor
|
||||
}
|
||||
|
||||
func (r *restoreOperationsReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.RestoreList{}, r.frequency, kube.PeriodicalEnqueueSourceOption{})
|
||||
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
|
||||
restore := object.(*velerov1api.Restore)
|
||||
return (restore.Status.Phase == velerov1api.RestorePhaseWaitingForPluginOperations ||
|
||||
restore.Status.Phase == velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed)
|
||||
})
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&velerov1api.Restore{}, builder.WithPredicates(kube.FalsePredicate{})).
|
||||
Watches(s, nil, builder.WithPredicates(gp)).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=restores,verbs=get;list;watch;update
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=restores/status,verbs=get
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=restorestoragelocations,verbs=get
|
||||
|
||||
func (r *restoreOperationsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
log := r.logger.WithField("restore operations for restore", req.String())
|
||||
log.Debug("restoreOperationsReconciler getting restore")
|
||||
|
||||
original := &velerov1api.Restore{}
|
||||
if err := r.Get(ctx, req.NamespacedName, original); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithError(err).Error("restore not found")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, errors.Wrapf(err, "error getting restore %s", req.String())
|
||||
}
|
||||
restore := original.DeepCopy()
|
||||
log.Debugf("restore: %s", restore.Name)
|
||||
|
||||
log = r.logger.WithFields(
|
||||
logrus.Fields{
|
||||
"restore": req.String(),
|
||||
},
|
||||
)
|
||||
|
||||
switch restore.Status.Phase {
|
||||
case velerov1api.RestorePhaseWaitingForPluginOperations, velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed:
|
||||
// only process restores waiting for plugin operations to complete
|
||||
default:
|
||||
log.Debug("Restore has no ongoing plugin operations, skipping")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
info, err := r.fetchBackupInfo(restore.Spec.BackupName)
|
||||
if err != nil {
|
||||
log.Warnf("Cannot check progress on Restore operations because backup info is unavailable %s; marking restore PartiallyFailed", err.Error())
|
||||
restore.Status.Phase = velerov1api.RestorePhasePartiallyFailed
|
||||
err2 := r.updateRestoreAndOperationsJSON(ctx, original, restore, nil, &itemoperationmap.OperationsForRestore{ErrsSinceUpdate: []string{err.Error()}}, false, false)
|
||||
if err2 != nil {
|
||||
log.WithError(err2).Error("error updating Restore")
|
||||
}
|
||||
return ctrl.Result{}, errors.Wrap(err, "error getting backup info")
|
||||
}
|
||||
|
||||
if info.location.Spec.AccessMode == velerov1api.BackupStorageLocationAccessModeReadOnly {
|
||||
log.Infof("Cannot check progress on Restore operations because backup storage location %s is currently in read-only mode; marking restore PartiallyFailed", info.location.Name)
|
||||
restore.Status.Phase = velerov1api.RestorePhasePartiallyFailed
|
||||
|
||||
err := r.updateRestoreAndOperationsJSON(ctx, original, restore, nil, &itemoperationmap.OperationsForRestore{ErrsSinceUpdate: []string{"BSL is read-only"}}, false, false)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error updating Restore")
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
pluginManager := r.newPluginManager(r.logger)
|
||||
defer pluginManager.CleanupClients()
|
||||
backupStore, err := r.backupStoreGetter.Get(info.location, pluginManager, r.logger)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error getting backup store")
|
||||
}
|
||||
|
||||
operations, err := r.itemOperationsMap.GetOperationsForRestore(backupStore, restore.Name)
|
||||
if err != nil {
|
||||
err2 := r.updateRestoreAndOperationsJSON(ctx, original, restore, backupStore, &itemoperationmap.OperationsForRestore{ErrsSinceUpdate: []string{err.Error()}}, false, false)
|
||||
if err2 != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err2, "error updating Restore")
|
||||
}
|
||||
return ctrl.Result{}, errors.Wrap(err, "error getting restore operations")
|
||||
}
|
||||
stillInProgress, changes, opsCompleted, opsFailed, errs := getRestoreItemOperationProgress(restore, pluginManager, operations.Operations)
|
||||
// if len(errs)>0, need to update restore errors and error log
|
||||
operations.ErrsSinceUpdate = append(operations.ErrsSinceUpdate, errs...)
|
||||
restore.Status.Errors += len(operations.ErrsSinceUpdate)
|
||||
completionChanges := false
|
||||
if restore.Status.RestoreItemOperationsCompleted != opsCompleted || restore.Status.RestoreItemOperationsFailed != opsFailed {
|
||||
completionChanges = true
|
||||
restore.Status.RestoreItemOperationsCompleted = opsCompleted
|
||||
restore.Status.RestoreItemOperationsFailed = opsFailed
|
||||
}
|
||||
if changes {
|
||||
operations.ChangesSinceUpdate = true
|
||||
}
|
||||
|
||||
// if stillInProgress is false, restore moves to terminal phase and needs update
|
||||
// if operations.ErrsSinceUpdate is not empty, then restore phase needs to change to
|
||||
// RestorePhaseWaitingForPluginOperationsPartiallyFailed and needs update
|
||||
// If the only changes are incremental progress, then no write is necessary, progress can remain in memory
|
||||
if !stillInProgress {
|
||||
if len(operations.ErrsSinceUpdate) > 0 {
|
||||
restore.Status.Phase = velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed
|
||||
}
|
||||
if restore.Status.Phase == velerov1api.RestorePhaseWaitingForPluginOperations {
|
||||
log.Infof("Marking restore %s completed", restore.Name)
|
||||
restore.Status.Phase = velerov1api.RestorePhaseCompleted
|
||||
r.metrics.RegisterRestoreSuccess(restore.Spec.ScheduleName)
|
||||
} else {
|
||||
log.Infof("Marking restore %s FinalizingPartiallyFailed", restore.Name)
|
||||
restore.Status.Phase = velerov1api.RestorePhasePartiallyFailed
|
||||
r.metrics.RegisterRestorePartialFailure(restore.Spec.ScheduleName)
|
||||
}
|
||||
}
|
||||
err = r.updateRestoreAndOperationsJSON(ctx, original, restore, backupStore, operations, changes, completionChanges)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error updating Restore")
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
|
||||
// find it, it returns an error.
|
||||
func (r *restoreOperationsReconciler) fetchBackupInfo(backupName string) (backupInfo, error) {
|
||||
return fetchBackupInfoInternal(r.Client, r.namespace, backupName)
|
||||
}
|
||||
|
||||
func (r *restoreOperationsReconciler) updateRestoreAndOperationsJSON(
|
||||
ctx context.Context,
|
||||
original, restore *velerov1api.Restore,
|
||||
backupStore persistence.BackupStore,
|
||||
operations *itemoperationmap.OperationsForRestore,
|
||||
changes bool,
|
||||
completionChanges bool) error {
|
||||
|
||||
if len(operations.ErrsSinceUpdate) > 0 {
|
||||
// FIXME: download/upload results
|
||||
}
|
||||
removeIfComplete := true
|
||||
defer func() {
|
||||
// remove local operations list if complete
|
||||
if removeIfComplete && (restore.Status.Phase == velerov1api.RestorePhaseCompleted ||
|
||||
restore.Status.Phase == velerov1api.RestorePhasePartiallyFailed) {
|
||||
|
||||
r.itemOperationsMap.DeleteOperationsForRestore(restore.Name)
|
||||
} else if changes {
|
||||
r.itemOperationsMap.PutOperationsForRestore(operations, restore.Name)
|
||||
}
|
||||
}()
|
||||
|
||||
// update restore and upload progress if errs or complete
|
||||
if len(operations.ErrsSinceUpdate) > 0 ||
|
||||
restore.Status.Phase == velerov1api.RestorePhaseCompleted ||
|
||||
restore.Status.Phase == velerov1api.RestorePhasePartiallyFailed {
|
||||
// update file store
|
||||
if backupStore != nil {
|
||||
if err := r.itemOperationsMap.UploadProgressAndPutOperationsForRestore(backupStore, operations, restore.Name); err != nil {
|
||||
removeIfComplete = false
|
||||
return err
|
||||
}
|
||||
}
|
||||
// update restore
|
||||
err := r.Client.Patch(ctx, restore, client.MergeFrom(original))
|
||||
if err != nil {
|
||||
removeIfComplete = false
|
||||
return errors.Wrapf(err, "error updating Restore %s", restore.Name)
|
||||
}
|
||||
} else if completionChanges {
|
||||
// If restore is still incomplete and no new errors are found but there are some new operations
|
||||
// completed, patch restore to reflect new completion numbers, but don't upload detailed json file
|
||||
err := r.Client.Patch(ctx, restore, client.MergeFrom(original))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error updating Restore %s", restore.Name)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRestoreItemOperationProgress(
|
||||
restore *velerov1api.Restore,
|
||||
pluginManager clientmgmt.Manager,
|
||||
operationsList []*itemoperation.RestoreOperation) (bool, bool, int, int, []string) {
|
||||
inProgressOperations := false
|
||||
changes := false
|
||||
var errs []string
|
||||
var completedCount, failedCount int
|
||||
|
||||
for _, operation := range operationsList {
|
||||
if operation.Status.Phase == itemoperation.OperationPhaseInProgress {
|
||||
ria, err := pluginManager.GetRestoreItemActionV2(operation.Spec.RestoreItemAction)
|
||||
if err != nil {
|
||||
operation.Status.Phase = itemoperation.OperationPhaseFailed
|
||||
operation.Status.Error = err.Error()
|
||||
errs = append(errs, err.Error())
|
||||
changes = true
|
||||
failedCount++
|
||||
continue
|
||||
}
|
||||
operationProgress, err := ria.Progress(operation.Spec.OperationID, restore)
|
||||
if err != nil {
|
||||
operation.Status.Phase = itemoperation.OperationPhaseFailed
|
||||
operation.Status.Error = err.Error()
|
||||
errs = append(errs, err.Error())
|
||||
changes = true
|
||||
failedCount++
|
||||
continue
|
||||
}
|
||||
if operation.Status.NCompleted != operationProgress.NCompleted {
|
||||
operation.Status.NCompleted = operationProgress.NCompleted
|
||||
changes = true
|
||||
}
|
||||
if operation.Status.NTotal != operationProgress.NTotal {
|
||||
operation.Status.NTotal = operationProgress.NTotal
|
||||
changes = true
|
||||
}
|
||||
if operation.Status.OperationUnits != operationProgress.OperationUnits {
|
||||
operation.Status.OperationUnits = operationProgress.OperationUnits
|
||||
changes = true
|
||||
}
|
||||
if operation.Status.Description != operationProgress.Description {
|
||||
operation.Status.Description = operationProgress.Description
|
||||
changes = true
|
||||
}
|
||||
started := metav1.NewTime(operationProgress.Started)
|
||||
if operation.Status.Started == nil || *(operation.Status.Started) != started {
|
||||
operation.Status.Started = &started
|
||||
changes = true
|
||||
}
|
||||
updated := metav1.NewTime(operationProgress.Updated)
|
||||
if operation.Status.Updated == nil || *(operation.Status.Updated) != updated {
|
||||
operation.Status.Updated = &updated
|
||||
changes = true
|
||||
}
|
||||
|
||||
if operationProgress.Completed {
|
||||
if operationProgress.Err != "" {
|
||||
operation.Status.Phase = itemoperation.OperationPhaseFailed
|
||||
operation.Status.Error = operationProgress.Err
|
||||
errs = append(errs, operationProgress.Err)
|
||||
changes = true
|
||||
failedCount++
|
||||
continue
|
||||
}
|
||||
operation.Status.Phase = itemoperation.OperationPhaseCompleted
|
||||
changes = true
|
||||
completedCount++
|
||||
continue
|
||||
}
|
||||
// cancel operation if past timeout period
|
||||
if operation.Status.Created.Time.Add(restore.Spec.ItemOperationTimeout.Duration).Before(time.Now()) {
|
||||
_ = ria.Cancel(operation.Spec.OperationID, restore)
|
||||
operation.Status.Phase = itemoperation.OperationPhaseFailed
|
||||
operation.Status.Error = "Asynchronous action timed out"
|
||||
errs = append(errs, operation.Status.Error)
|
||||
changes = true
|
||||
failedCount++
|
||||
continue
|
||||
}
|
||||
// if we reach this point, the operation is still running
|
||||
inProgressOperations = true
|
||||
} else if operation.Status.Phase == itemoperation.OperationPhaseCompleted {
|
||||
completedCount++
|
||||
} else if operation.Status.Phase == itemoperation.OperationPhaseFailed {
|
||||
failedCount++
|
||||
}
|
||||
}
|
||||
return inProgressOperations, changes, completedCount, failedCount, errs
|
||||
}
|
||||
320
pkg/controller/restore_operations_controller_test.go
Normal file
320
pkg/controller/restore_operations_controller_test.go
Normal file
@@ -0,0 +1,320 @@
|
||||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
testclocks "k8s.io/utils/clock/testing"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperationmap"
|
||||
"github.com/vmware-tanzu/velero/pkg/kuberesource"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
persistencemocks "github.com/vmware-tanzu/velero/pkg/persistence/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
|
||||
pluginmocks "github.com/vmware-tanzu/velero/pkg/plugin/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
|
||||
riav2mocks "github.com/vmware-tanzu/velero/pkg/plugin/velero/mocks/restoreitemaction/v2"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
)
|
||||
|
||||
var (
|
||||
restorePluginManager = &pluginmocks.Manager{}
|
||||
restoreBackupStore = &persistencemocks.BackupStore{}
|
||||
ria = &riav2mocks.RestoreItemAction{}
|
||||
)
|
||||
|
||||
func mockRestoreOperationsReconciler(fakeClient kbclient.Client, fakeClock *testclocks.FakeClock, freq time.Duration) *restoreOperationsReconciler {
|
||||
abor := NewRestoreOperationsReconciler(
|
||||
logrus.StandardLogger(),
|
||||
velerov1api.DefaultNamespace,
|
||||
fakeClient,
|
||||
freq,
|
||||
func(logrus.FieldLogger) clientmgmt.Manager { return restorePluginManager },
|
||||
NewFakeSingleObjectBackupStoreGetter(restoreBackupStore),
|
||||
metrics.NewServerMetrics(),
|
||||
itemoperationmap.NewRestoreItemOperationsMap(),
|
||||
)
|
||||
abor.clock = fakeClock
|
||||
return abor
|
||||
}
|
||||
|
||||
func TestRestoreOperationsReconcile(t *testing.T) {
|
||||
fakeClock := testclocks.NewFakeClock(time.Now())
|
||||
metav1Now := metav1.NewTime(fakeClock.Now())
|
||||
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "default").Result()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
restore *velerov1api.Restore
|
||||
restoreOperations []*itemoperation.RestoreOperation
|
||||
backup *velerov1api.Backup
|
||||
backupLocation *velerov1api.BackupStorageLocation
|
||||
operationComplete bool
|
||||
operationErr string
|
||||
expectError bool
|
||||
expectPhase velerov1api.RestorePhase
|
||||
}{
|
||||
{
|
||||
name: "WaitingForPluginOperations restore with completed operations is Completed",
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore-11").
|
||||
Backup("backup-1").
|
||||
ItemOperationTimeout(60 * time.Minute).
|
||||
ObjectMeta(builder.WithUID("foo-11")).
|
||||
Phase(velerov1api.RestorePhaseWaitingForPluginOperations).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
backupLocation: defaultBackupLocation,
|
||||
operationComplete: true,
|
||||
expectPhase: velerov1api.RestorePhaseCompleted,
|
||||
restoreOperations: []*itemoperation.RestoreOperation{
|
||||
{
|
||||
Spec: itemoperation.RestoreOperationSpec{
|
||||
RestoreName: "restore-11",
|
||||
RestoreUID: "foo-11",
|
||||
RestoreItemAction: "foo-11",
|
||||
ResourceIdentifier: velero.ResourceIdentifier{
|
||||
GroupResource: kuberesource.Pods,
|
||||
Namespace: "ns-1",
|
||||
Name: "pod-1",
|
||||
},
|
||||
OperationID: "operation-11",
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Phase: itemoperation.OperationPhaseInProgress,
|
||||
Created: &metav1Now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "WaitingForPluginOperations restore with incomplete operations is still incomplete",
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore-12").
|
||||
Backup("backup-1").
|
||||
ItemOperationTimeout(60 * time.Minute).
|
||||
ObjectMeta(builder.WithUID("foo-12")).
|
||||
Phase(velerov1api.RestorePhaseWaitingForPluginOperations).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
backupLocation: defaultBackupLocation,
|
||||
operationComplete: false,
|
||||
expectPhase: velerov1api.RestorePhaseWaitingForPluginOperations,
|
||||
restoreOperations: []*itemoperation.RestoreOperation{
|
||||
{
|
||||
Spec: itemoperation.RestoreOperationSpec{
|
||||
RestoreName: "restore-12",
|
||||
RestoreUID: "foo-12",
|
||||
RestoreItemAction: "foo-12",
|
||||
ResourceIdentifier: velero.ResourceIdentifier{
|
||||
GroupResource: kuberesource.Pods,
|
||||
Namespace: "ns-1",
|
||||
Name: "pod-1",
|
||||
},
|
||||
OperationID: "operation-12",
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Phase: itemoperation.OperationPhaseInProgress,
|
||||
Created: &metav1Now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "WaitingForPluginOperations restore with completed failed operations is PartiallyFailed",
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore-13").
|
||||
Backup("backup-1").
|
||||
ItemOperationTimeout(60 * time.Minute).
|
||||
ObjectMeta(builder.WithUID("foo-13")).
|
||||
Phase(velerov1api.RestorePhaseWaitingForPluginOperations).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
backupLocation: defaultBackupLocation,
|
||||
operationComplete: true,
|
||||
operationErr: "failed",
|
||||
expectPhase: velerov1api.RestorePhasePartiallyFailed,
|
||||
restoreOperations: []*itemoperation.RestoreOperation{
|
||||
{
|
||||
Spec: itemoperation.RestoreOperationSpec{
|
||||
RestoreName: "restore-13",
|
||||
RestoreUID: "foo-13",
|
||||
RestoreItemAction: "foo-13",
|
||||
ResourceIdentifier: velero.ResourceIdentifier{
|
||||
GroupResource: kuberesource.Pods,
|
||||
Namespace: "ns-1",
|
||||
Name: "pod-1",
|
||||
},
|
||||
OperationID: "operation-13",
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Phase: itemoperation.OperationPhaseInProgress,
|
||||
Created: &metav1Now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "WaitingForPluginOperationsPartiallyFailed restore with completed operations is PartiallyFailed",
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore-14").
|
||||
Backup("backup-1").
|
||||
ItemOperationTimeout(60 * time.Minute).
|
||||
ObjectMeta(builder.WithUID("foo-14")).
|
||||
Phase(velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
backupLocation: defaultBackupLocation,
|
||||
operationComplete: true,
|
||||
expectPhase: velerov1api.RestorePhasePartiallyFailed,
|
||||
restoreOperations: []*itemoperation.RestoreOperation{
|
||||
{
|
||||
Spec: itemoperation.RestoreOperationSpec{
|
||||
RestoreName: "restore-14",
|
||||
RestoreUID: "foo-14",
|
||||
RestoreItemAction: "foo-14",
|
||||
ResourceIdentifier: velero.ResourceIdentifier{
|
||||
GroupResource: kuberesource.Pods,
|
||||
Namespace: "ns-1",
|
||||
Name: "pod-1",
|
||||
},
|
||||
OperationID: "operation-14",
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Phase: itemoperation.OperationPhaseInProgress,
|
||||
Created: &metav1Now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "WaitingForPluginOperationsPartiallyFailed restore with incomplete operations is still incomplete",
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore-15").
|
||||
Backup("backup-1").
|
||||
ItemOperationTimeout(60 * time.Minute).
|
||||
ObjectMeta(builder.WithUID("foo-15")).
|
||||
Phase(velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
backupLocation: defaultBackupLocation,
|
||||
operationComplete: false,
|
||||
expectPhase: velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed,
|
||||
restoreOperations: []*itemoperation.RestoreOperation{
|
||||
{
|
||||
Spec: itemoperation.RestoreOperationSpec{
|
||||
RestoreName: "restore-15",
|
||||
RestoreUID: "foo-15",
|
||||
RestoreItemAction: "foo-15",
|
||||
ResourceIdentifier: velero.ResourceIdentifier{
|
||||
GroupResource: kuberesource.Pods,
|
||||
Namespace: "ns-1",
|
||||
Name: "pod-1",
|
||||
},
|
||||
OperationID: "operation-15",
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Phase: itemoperation.OperationPhaseInProgress,
|
||||
Created: &metav1Now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "WaitingForPluginOperationsPartiallyFailed restore with completed failed operations is PartiallyFailed",
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, "restore-16").
|
||||
Backup("backup-1").
|
||||
ItemOperationTimeout(60 * time.Minute).
|
||||
ObjectMeta(builder.WithUID("foo-16")).
|
||||
Phase(velerov1api.RestorePhaseWaitingForPluginOperationsPartiallyFailed).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
backupLocation: defaultBackupLocation,
|
||||
operationComplete: true,
|
||||
operationErr: "failed",
|
||||
expectPhase: velerov1api.RestorePhasePartiallyFailed,
|
||||
restoreOperations: []*itemoperation.RestoreOperation{
|
||||
{
|
||||
Spec: itemoperation.RestoreOperationSpec{
|
||||
RestoreName: "restore-16",
|
||||
RestoreUID: "foo-16",
|
||||
RestoreItemAction: "foo-16",
|
||||
ResourceIdentifier: velero.ResourceIdentifier{
|
||||
GroupResource: kuberesource.Pods,
|
||||
Namespace: "ns-1",
|
||||
Name: "pod-1",
|
||||
},
|
||||
OperationID: "operation-16",
|
||||
},
|
||||
Status: itemoperation.OperationStatus{
|
||||
Phase: itemoperation.OperationPhaseInProgress,
|
||||
Created: &metav1Now,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.restore == nil {
|
||||
return
|
||||
}
|
||||
|
||||
initObjs := []runtime.Object{}
|
||||
initObjs = append(initObjs, test.restore)
|
||||
initObjs = append(initObjs, test.backup)
|
||||
|
||||
if test.backupLocation != nil {
|
||||
initObjs = append(initObjs, test.backupLocation)
|
||||
}
|
||||
|
||||
fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...)
|
||||
reconciler := mockRestoreOperationsReconciler(fakeClient, fakeClock, defaultRestoreOperationsFrequency)
|
||||
restorePluginManager.On("CleanupClients").Return(nil)
|
||||
restoreBackupStore.On("GetRestoreItemOperations", test.restore.Name).Return(test.restoreOperations, nil)
|
||||
restoreBackupStore.On("PutRestoreItemOperations", mock.Anything, mock.Anything).Return(nil)
|
||||
restoreBackupStore.On("PutRestoreMetadata", mock.Anything, mock.Anything).Return(nil)
|
||||
for _, operation := range test.restoreOperations {
|
||||
ria.On("Progress", operation.Spec.OperationID, mock.Anything).
|
||||
Return(velero.OperationProgress{
|
||||
Completed: test.operationComplete,
|
||||
Err: test.operationErr,
|
||||
}, nil)
|
||||
restorePluginManager.On("GetRestoreItemActionV2", operation.Spec.RestoreItemAction).Return(ria, nil)
|
||||
}
|
||||
|
||||
_, err := reconciler.Reconcile(context.TODO(), ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.restore.Namespace, Name: test.restore.Name}})
|
||||
gotErr := err != nil
|
||||
assert.Equal(t, test.expectError, gotErr)
|
||||
|
||||
restoreAfter := velerov1api.Restore{}
|
||||
err = fakeClient.Get(context.TODO(), types.NamespacedName{
|
||||
Namespace: test.restore.Namespace,
|
||||
Name: test.restore.Name,
|
||||
}, &restoreAfter)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, test.expectPhase, restoreAfter.Status.Phase)
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user