dm controller refactor for cancel

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2025-05-20 20:39:05 +08:00
parent 0132d1127e
commit 38c927711a
7 changed files with 1039 additions and 859 deletions

View File

@@ -56,36 +56,38 @@ import (
// DataDownloadReconciler reconciles a DataDownload object
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
restorePVCConfig nodeagent.RestorePVC
podResources corev1api.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
restorePVCConfig nodeagent.RestorePVC
podResources corev1api.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
cancelledDataDownload map[string]time.Time
}
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
restorePVCConfig nodeagent.RestorePVC, podResources corev1api.ResourceRequirements, nodeName string, preparingTimeout time.Duration,
logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
cancelledDataDownload: make(map[string]time.Time),
}
}
@@ -118,42 +120,90 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}
if r.restoreExposer == nil {
return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log)
}
// Add finalizer
// Logic for clear resources when datadownload been deleted
if dd.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning
if !isDataDownloadInFinalState(dd) && !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
if !isDataDownloadInFinalState(dd) {
if !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dd *velerov2alpha1api.DataDownload) bool {
if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
return false
}
controllerutil.AddFinalizer(dd, DataUploadDownloadFinalizer)
})
if err != nil {
log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return true
}); err != nil {
log.WithError(err).Errorf("failed to add finalizer for dd %s/%s", dd.Namespace, dd.Name)
return ctrl.Result{}, err
} else if !succeeded {
log.Warnf("failed to add finalizer for %s/%s and will requeue later", dd.Namespace, dd.Name)
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}
if !dd.DeletionTimestamp.IsZero() {
if !dd.Spec.Cancel {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase)
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = "Cancel datadownload because it is being deleted"
return true
}); err != nil {
log.WithError(err).Errorf("failed to set cancel flag for dd %s/%s", dd.Namespace, dd.Name)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}
} else if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase)
} else {
delete(r.cancelledDataDownload, dd.Name)
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
// put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state.
// remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
// also in final status cr won't block the direct delete of the velero namespace
if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dd *velerov2alpha1api.DataDownload) bool {
if !controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
return false
}
controllerutil.RemoveFinalizer(dd, DataUploadDownloadFinalizer)
return true
}); err != nil {
log.WithError(err).Error("error to remove finalizer")
return ctrl.Result{}, err
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = "Cancel datadownload because it is being deleted"
return ctrl.Result{}, nil
}
}
return true
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
if dd.Spec.Cancel {
if spotted, found := r.cancelledDataDownload[dd.Name]; !found {
r.cancelledDataDownload[dd.Name] = r.Clock.Now()
} else {
delay := cancelDelayOthers
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
delay = cancelDelayInProgress
}
if time.Since(spotted) > delay {
log.Infof("Data download %s is canceled in Phase %s but not handled in rasonable time", dd.GetName(), dd.Status.Phase)
if r.tryCancelDataDownload(ctx, dd, "") {
delete(r.cancelledDataDownload, dd.Name)
}
return ctrl.Result{}, nil
}
}
}
@@ -167,7 +217,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
accepted, err := r.acceptDataDownload(ctx, dd)
if err != nil {
return r.errorOut(ctx, dd, err, "error to accept the data download", log)
return ctrl.Result{}, errors.Wrapf(err, "error accepting the data download %s", dd.Name)
}
if !accepted {
@@ -193,44 +243,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposeParam)
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
}
}
if isDataDownloadInFinalState(dd) {
log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
return ctrl.Result{}, nil
} else {
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
log.Info("Restore is exposed")
// we need to get CR again for it may canceled by datadownload controller on other
// nodes when doing expose action, if detectd cancel action we need to clear up the internal
// resources created by velero during backup.
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find datadownload")
return ctrl.Result{}, nil
}
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")
}
// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataDownloadInFinalState(dd) {
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
}
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.tryCancelDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.AcceptedTimestamp != nil {
if time.Since(dd.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
@@ -240,7 +260,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")
log.Infof("Data download is prepared and should be processed by %s (%s)", dd.Status.Node, r.nodeName)
if dd.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
@@ -259,13 +283,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err != nil {
return r.errorOut(ctx, dd, err, "restore exposer is not ready", log)
} else if result == nil {
log.Debug("Get empty restore exposer")
return ctrl.Result{}, nil
return r.errorOut(ctx, dd, errors.New("no expose result is available for the current node"), "exposed snapshot is not ready", log)
}
log.Info("Restore PVC is ready and creating data path routine")
// Need to first create file system BR and get data path instance then update data upload status
// Need to first create file system BR and get data path instance then update data download status
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
@@ -292,16 +315,30 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name)
terminated := false
if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
if isDataDownloadInFinalState(dd) {
terminated = true
return false
}
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will data path close and retry", dd.Name)
r.closeDataPath(ctx, dd.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
if terminated {
log.Warnf("datadownload %s is terminated during transition from prepared", dd.Name)
r.closeDataPath(ctx, dd.Name)
return ctrl.Result{}, nil
}
log.Info("Data download is marked as in progress")
if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil {
@@ -313,45 +350,41 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel {
log.Info("Data download is being canceled")
if dd.Status.Node != r.nodeName {
return ctrl.Result{}, nil
}
log.Info("In progress data download is being canceled")
asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name)
if asyncBR == nil {
if r.nodeName == dd.Status.Node {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else {
log.Info("Data path is not started in this node and will not canceled by current node")
}
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}
// Update status to Canceling.
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
if isDataDownloadInFinalState(dd) {
log.Warnf("datadownload %s is terminated, abort setting it to canceling", dd.Name)
return false
}
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling
return true
}); err != nil {
log.WithError(err).Error("error updating data download into canceling status")
return ctrl.Result{}, err
}
asyncBR.Cancel()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
} else {
// put the finalizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state
// remove finalizer no matter whether the cr is being deleted or not for it is no longer needed when internal resources are all cleaned up
// also in final status cr won't block the direct delete of the velero namespace
if isDataDownloadInFinalState(dd) && controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) {
original := dd.DeepCopy()
controllerutil.RemoveFinalizer(dd, DataUploadDownloadFinalizer)
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error to remove finalizer")
}
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
@@ -451,15 +484,14 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
log.WithError(err).Error("error updating data download status")
} else {
r.metrics.RegisterDataDownloadCancel(r.nodeName)
delete(r.cancelledDataDownload, dd.Name)
}
}
}
func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) bool {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Accepted data download is canceled")
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dataDownload.Status.StartTimestamp.IsZero() {
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
@@ -473,31 +505,29 @@ func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Conte
if err != nil {
log.WithError(err).Error("error updating datadownload status")
return
return false
} else if !succeeded {
log.Warn("conflict in updating datadownload status and will try it again later")
return
return false
}
// success update
r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
log.Warn("data download is canceled")
return true
}
func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
log := r.logger.WithField("datadownload", ddName)
var dd velerov2alpha1api.DataDownload
if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil {
log.WithError(err).Warn("Failed to get data download on progress")
return
}
original := dd.DeepCopy()
dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Failed to update restore snapshot progress")
if err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: namespace, Name: ddName}, log, func(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
return true
}); err != nil {
log.WithError(err).Error("Failed to update progress")
}
}
@@ -509,7 +539,19 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
dd := object.(*velerov2alpha1api.DataDownload)
return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted)
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
return true
}
if dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
return true
}
if isDataDownloadInFinalState(dd) && !dd.DeletionTimestamp.IsZero() {
return true
}
return false
})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataDownload), r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{
Predicates: []predicate.Predicate{gp},
@@ -568,10 +610,17 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
if pod.Status.Phase == corev1api.PodRunning {
log.Info("Preparing data download")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload")
if err = UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log,
func(dd *velerov2alpha1api.DataDownload) bool {
if isDataDownloadInFinalState(dd) {
log.Warnf("datadownload %s is terminated, abort setting it to prepared", dd.Name)
return false
}
r.prepareDataDownload(dd)
return true
}); err != nil {
log.WithError(err).Warn("failed to update dataudownload, prepare will halt for this dataudownload")
return []reconcile.Request{}
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
@@ -618,13 +667,19 @@ func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha
}
func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error {
log.Infof("update data download status to %v", dd.Status.Phase)
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
log.Info("update data download status to Failed")
if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
if patchErr := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log, func(dd *velerov2alpha1api.DataDownload) bool {
if isDataDownloadInFinalState(dd) {
return false
}
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
return true
}); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
} else {
r.metrics.RegisterDataDownloadFailure(r.nodeName)
@@ -647,7 +702,7 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
succeeded, err := r.exclusiveUpdateDataDownload(ctx, updated, updateFunc)
succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, updated, updateFunc)
if err != nil {
return false, err
@@ -667,7 +722,7 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
log := r.logger.WithField("DataDownload", dd.Name)
log.Info("Timeout happened for preparing datadownload")
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
})
@@ -678,7 +733,7 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
}
if !succeeded {
log.Warn("Dataupload has been updated by others")
log.Warn("Datadownload has been updated by others")
return
}
@@ -689,16 +744,18 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
log.Info("Dataupload has been cleaned up")
log.Info("Datadownload has been cleaned up")
r.metrics.RegisterDataDownloadFailure(r.nodeName)
}
func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
var funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload
func exclusiveUpdateDataDownload(ctx context.Context, cli client.Client, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updateFunc(dd)
err := r.client.Update(ctx, dd)
err := cli.Update(ctx, dd)
if err == nil {
return true, nil
@@ -805,7 +862,7 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted
}
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataDownload) bool) error {
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log logrus.FieldLogger, updateFunc func(*velerov2alpha1api.DataDownload) bool) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
dd := &velerov2alpha1api.DataDownload{}
if err := client.Get(ctx, namespacedName, dd); err != nil {
@@ -839,11 +896,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
for i := range dataDownloads.Items {
dd := &dataDownloads.Items[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
if dd.Status.Node != r.nodeName {
logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node)
continue
@@ -870,25 +923,12 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
return true
})
if err != nil {
logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
}
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase")
err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
}
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = "Datadownload is in Accepted status during the node-agent starting, mark it as cancel"
return true
})
if err != nil {
r.logger.WithField("datadownload", dd.GetName()).WithError(err).Errorf("Failed to trigger dataupload cancel")
logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger datadownload cancel")
}
} else {
// the Prepared CR could be still handled by datadownload controller after node-agent restart
// the accepted CR may also suvived from node-agent restart as long as the intermediate objects are all done
logger.WithField("datadownload", dd.GetName()).Infof("find a datadownload with status %s", dd.Status.Phase)
}
}
@@ -914,7 +954,7 @@ func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context,
OnProgress: r.OnDataDownloadProgress,
}
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log)
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log)
if err != nil {
return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name)
}