mirror of
https://github.com/vmware-tanzu/velero.git
synced 2025-12-23 06:15:21 +00:00
data mover ms node agent resume
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
1
changelogs/unreleased/8085-Lyndon-Li
Normal file
1
changelogs/unreleased/8085-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
According to design #7576, after node-agent restarts, if a DU/DD is in InProgress status, re-capture the data mover ms pod and continue the execution
|
||||
@@ -19,6 +19,7 @@ package builder
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
|
||||
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
)
|
||||
|
||||
@@ -116,3 +117,27 @@ func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownlo
|
||||
d.object.Status.StartTimestamp = startTime
|
||||
return d
|
||||
}
|
||||
|
||||
// CompletionTimestamp sets the DataDownload's StartTimestamp.
|
||||
func (d *DataDownloadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataDownloadBuilder {
|
||||
d.object.Status.CompletionTimestamp = completionTimestamp
|
||||
return d
|
||||
}
|
||||
|
||||
// Labels sets the DataDownload's Labels.
|
||||
func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder {
|
||||
d.object.Labels = labels
|
||||
return d
|
||||
}
|
||||
|
||||
// Labels sets the DataDownload's Progress.
|
||||
func (d *DataDownloadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataDownloadBuilder {
|
||||
d.object.Status.Progress = progress
|
||||
return d
|
||||
}
|
||||
|
||||
// Node sets the DataDownload's Node.
|
||||
func (d *DataDownloadBuilder) Node(node string) *DataDownloadBuilder {
|
||||
d.object.Status.Node = node
|
||||
return d
|
||||
}
|
||||
|
||||
@@ -133,7 +133,14 @@ func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder
|
||||
return d
|
||||
}
|
||||
|
||||
// Labels sets the DataUpload's Progress.
|
||||
func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataUploadBuilder {
|
||||
d.object.Status.Progress = progress
|
||||
return d
|
||||
}
|
||||
|
||||
// Node sets the DataUpload's Node.
|
||||
func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder {
|
||||
d.object.Status.Node = node
|
||||
return d
|
||||
}
|
||||
|
||||
@@ -292,18 +292,28 @@ func (s *nodeAgentServer) run() {
|
||||
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
|
||||
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
|
||||
}
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
s.attemptDataUploadResume(dataUploadReconciler)
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
|
||||
}
|
||||
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
s.attemptDataDownloadResume(dataDownloadReconciler)
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data download controller")
|
||||
}
|
||||
|
||||
go func() {
|
||||
s.mgr.GetCache().WaitForCacheSync(s.ctx)
|
||||
|
||||
if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
|
||||
}
|
||||
|
||||
if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
|
||||
}
|
||||
}()
|
||||
|
||||
s.logger.Info("Controllers starting...")
|
||||
|
||||
if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil {
|
||||
@@ -373,31 +383,6 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
|
||||
s.markInProgressPVRsFailed(client)
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {
|
||||
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
|
||||
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
|
||||
return
|
||||
}
|
||||
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {
|
||||
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
|
||||
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
|
||||
pvbs := &velerov1api.PodVolumeBackupList{}
|
||||
if err := client.List(s.ctx, pvbs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil {
|
||||
|
||||
@@ -1148,9 +1148,15 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew ||
|
||||
du.Status.Phase == "" {
|
||||
err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
||||
if dataUpload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
|
||||
dataUpload.Status.Message = fmt.Sprintf("Dataupload is in status %q during the velero server starting, mark it as cancel", du.Status.Phase)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -1183,9 +1189,15 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew ||
|
||||
dd.Status.Phase == "" {
|
||||
err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) bool {
|
||||
if dataDownload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
|
||||
dataDownload.Status.Message = fmt.Sprintf("Datadownload is in status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
@@ -56,6 +57,7 @@ import (
|
||||
type DataDownloadReconciler struct {
|
||||
client client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
mgr manager.Manager
|
||||
logger logrus.FieldLogger
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
fileSystem filesystem.Interface
|
||||
@@ -68,11 +70,12 @@ type DataDownloadReconciler struct {
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
|
||||
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
|
||||
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
|
||||
return &DataDownloadReconciler{
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
mgr: mgr,
|
||||
logger: logger.WithField("controller", "DataDownload"),
|
||||
credentialGetter: credentialGetter,
|
||||
fileSystem: filesystem.NewFileSystem(),
|
||||
@@ -137,9 +140,17 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
} else if controllerutil.ContainsFinalizer(dd, DataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
|
||||
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
|
||||
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
|
||||
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
log.Warnf("Cancel dd under phase %s because it is being deleted", dd.Status.Phase)
|
||||
|
||||
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) bool {
|
||||
if dataDownload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
|
||||
dataDownload.Status.Message = "Cancel datadownload because it is being deleted"
|
||||
|
||||
return true
|
||||
}); err != nil {
|
||||
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
|
||||
return ctrl.Result{}, err
|
||||
@@ -552,9 +563,15 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
|
||||
}
|
||||
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
|
||||
err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) bool {
|
||||
if dataDownload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason)
|
||||
dataDownload.Status.Message = fmt.Sprintf("Cancel datadownload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -575,75 +592,6 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
|
||||
return []reconcile.Request{request}
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
|
||||
pods := &v1.PodList{}
|
||||
var dataDownloads []*velerov2alpha1api.DataDownload
|
||||
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
|
||||
return nil, errors.Wrapf(err, "failed to list pods on current node")
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
if pod.Spec.NodeName != r.nodeName {
|
||||
r.logger.Debugf("Pod %s related data download will not handled by %s nodes", pod.GetName(), r.nodeName)
|
||||
continue
|
||||
}
|
||||
dd, err := findDataDownloadByPod(cli, pod)
|
||||
if err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataDownload by pod")
|
||||
continue
|
||||
} else if dd != nil {
|
||||
dataDownloads = append(dataDownloads, dd)
|
||||
}
|
||||
}
|
||||
return dataDownloads, nil
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) {
|
||||
dataDownloads := &velerov2alpha1api.DataDownloadList{}
|
||||
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
|
||||
return nil, errors.Wrapf(err, "failed to list datauploads")
|
||||
}
|
||||
|
||||
var result []velerov2alpha1api.DataDownload
|
||||
for _, dd := range dataDownloads.Items {
|
||||
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
|
||||
continue
|
||||
}
|
||||
if dd.Labels[acceptNodeLabelKey] == r.nodeName {
|
||||
result = append(result, dd)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// CancelAcceptedDataDownload will cancel the accepted data download
|
||||
func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) {
|
||||
r.logger.Infof("Canceling accepted data for node %s", r.nodeName)
|
||||
dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Error("failed to find data downloads")
|
||||
return
|
||||
}
|
||||
|
||||
for _, dd := range dataDownloads {
|
||||
if dd.Spec.Cancel {
|
||||
continue
|
||||
}
|
||||
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
|
||||
r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
|
||||
})
|
||||
|
||||
r.logger.Warn(dd.Status.Message)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
|
||||
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
|
||||
ssb.Status.Node = r.nodeName
|
||||
@@ -795,56 +743,139 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted
|
||||
}
|
||||
|
||||
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error {
|
||||
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataDownload) bool) error {
|
||||
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
dd := &velerov2alpha1api.DataDownload{}
|
||||
if err := client.Get(ctx, namespacedName, dd); err != nil {
|
||||
return false, errors.Wrap(err, "getting DataDownload")
|
||||
}
|
||||
|
||||
updateFunc(dd)
|
||||
updateErr := client.Update(ctx, dd)
|
||||
if updateErr != nil {
|
||||
if apierrors.IsConflict(updateErr) {
|
||||
if updateFunc(dd) {
|
||||
err := client.Update(ctx, dd)
|
||||
if err != nil {
|
||||
if apierrors.IsConflict(err) {
|
||||
log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name)
|
||||
return false, nil
|
||||
} else {
|
||||
return false, errors.Wrapf(err, "error updating datadownload %s/%s", dd.Namespace, dd.Name)
|
||||
}
|
||||
}
|
||||
log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath
|
||||
|
||||
func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
|
||||
if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil {
|
||||
return errors.Wrapf(err, "failed to find data downloads")
|
||||
} else {
|
||||
for i := range dataDownloads {
|
||||
dd := dataDownloads[i]
|
||||
dataDownloads := &velerov2alpha1api.DataDownloadList{}
|
||||
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads")
|
||||
return errors.Wrapf(err, "error to list datadownloads")
|
||||
}
|
||||
|
||||
for i := range dataDownloads.Items {
|
||||
dd := &dataDownloads.Items[i]
|
||||
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
|
||||
// keep doing nothing let controller re-download the data
|
||||
// the Prepared CR could be still handled by datadownload controller after node-agent restart
|
||||
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
|
||||
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
|
||||
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
|
||||
if dd.Status.Node != r.nodeName {
|
||||
logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node)
|
||||
continue
|
||||
}
|
||||
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")
|
||||
|
||||
err := funcResumeCancellableDataRestore(r, ctx, dd, logger)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it")
|
||||
|
||||
resumeErr := err
|
||||
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) bool {
|
||||
if dataDownload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("Resume InProgress datadownload failed with error %v, mark it as cancel", resumeErr)
|
||||
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
logger.WithError(errors.WithStack(err)).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
|
||||
}
|
||||
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
|
||||
r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase")
|
||||
|
||||
err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
|
||||
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool {
|
||||
if dataDownload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = "Datadownload is in Accepted status during the node-agent starting, mark it as cancel"
|
||||
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
r.logger.WithField("datadownload", dd.GetName()).WithError(err).Errorf("Failed to trigger dataupload cancel")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//If the data download is in Accepted status, the expoded PVC may be not created
|
||||
// so we need to mark the data download as canceled for it may not be recoverable
|
||||
r.CancelAcceptedDataDownload(ctx, cli, ns)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, log logrus.FieldLogger) error {
|
||||
log.Info("Resume cancelable dataDownload")
|
||||
|
||||
res, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to get exposed volume for dd %s", dd.Name)
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return errors.Errorf("expose info missed for dd %s", dd.Name)
|
||||
}
|
||||
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataDownloadCompleted,
|
||||
OnFailed: r.OnDataDownloadFailed,
|
||||
OnCancelled: r.OnDataDownloadCancelled,
|
||||
OnProgress: r.OnDataDownloadProgress,
|
||||
}
|
||||
|
||||
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name)
|
||||
}
|
||||
|
||||
resumeComplete := false
|
||||
defer func() {
|
||||
if !resumeComplete {
|
||||
r.closeDataPath(ctx, dd.Name)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := asyncBR.Init(ctx, nil); err != nil {
|
||||
return errors.Wrapf(err, "error to init asyncBR watcher for dd %s", dd.Name)
|
||||
}
|
||||
|
||||
if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
|
||||
ByPath: res.ByPod.VolumeName,
|
||||
}, nil); err != nil {
|
||||
return errors.Wrapf(err, "error to resume asyncBR watche for dd %s", dd.Name)
|
||||
}
|
||||
|
||||
resumeComplete = true
|
||||
|
||||
log.Infof("asyncBR is resumed for dd %s", dd.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -33,10 +33,12 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clientgofake "k8s.io/client-go/kubernetes/fake"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
@@ -149,7 +151,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
|
||||
|
||||
dataPathMgr := datapath.NewManager(1)
|
||||
|
||||
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func TestDataDownloadReconcile(t *testing.T) {
|
||||
@@ -869,12 +871,11 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Name string
|
||||
needErrs []bool
|
||||
noChange bool
|
||||
ExpectErr bool
|
||||
}{
|
||||
{
|
||||
Name: "SuccessOnFirstAttempt",
|
||||
needErrs: []bool{false, false, false, false},
|
||||
ExpectErr: false,
|
||||
},
|
||||
{
|
||||
Name: "Error get",
|
||||
@@ -886,6 +887,11 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
|
||||
needErrs: []bool{false, false, true, false, false},
|
||||
ExpectErr: true,
|
||||
},
|
||||
{
|
||||
Name: "no change",
|
||||
noChange: true,
|
||||
needErrs: []bool{false, false, true, false, false},
|
||||
},
|
||||
{
|
||||
Name: "Conflict with error timeout",
|
||||
needErrs: []bool{false, false, false, false, true},
|
||||
@@ -901,8 +907,14 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = r.client.Create(ctx, dataDownloadBuilder().Result())
|
||||
require.NoError(t, err)
|
||||
updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) bool {
|
||||
if tc.noChange {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
|
||||
return true
|
||||
}
|
||||
err = UpdateDataDownloadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc)
|
||||
if tc.ExpectErr {
|
||||
@@ -914,136 +926,115 @@ func TestUpdateDataDownloadWithRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindDataDownloads(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
pod corev1.Pod
|
||||
du *velerov2alpha1api.DataDownload
|
||||
expectedUploads []velerov2alpha1api.DataDownload
|
||||
expectedError bool
|
||||
}{
|
||||
// Test case 1: Pod with matching nodeName and DataDownload label
|
||||
{
|
||||
name: "MatchingPod",
|
||||
pod: corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: "pod-1",
|
||||
Labels: map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
},
|
||||
},
|
||||
du: dataDownloadBuilder().Result(),
|
||||
expectedUploads: []velerov2alpha1api.DataDownload{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: dataDownloadName,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
// Test case 2: Pod with non-matching nodeName
|
||||
{
|
||||
name: "NonMatchingNodePod",
|
||||
pod: corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: "pod-2",
|
||||
Labels: map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
NodeName: "node-2",
|
||||
},
|
||||
},
|
||||
du: dataDownloadBuilder().Result(),
|
||||
expectedUploads: []velerov2alpha1api.DataDownload{},
|
||||
expectedError: false,
|
||||
},
|
||||
type ddResumeTestHelper struct {
|
||||
resumeErr error
|
||||
getExposeErr error
|
||||
exposeResult *exposer.ExposeResult
|
||||
asyncBR datapath.AsyncBR
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
r, err := initDataDownloadReconcilerWithError(nil)
|
||||
require.NoError(t, err)
|
||||
r.nodeName = "node-1"
|
||||
err = r.client.Create(ctx, test.du)
|
||||
require.NoError(t, err)
|
||||
err = r.client.Create(ctx, &test.pod)
|
||||
require.NoError(t, err)
|
||||
uploads, err := r.FindDataDownloads(context.Background(), r.client, "velero")
|
||||
func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error {
|
||||
return dt.resumeErr
|
||||
}
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(test.expectedUploads), len(uploads))
|
||||
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
func (dt *ddResumeTestHelper) GetExposed(context.Context, corev1.ObjectReference, kbclient.Client, string, time.Duration) (*exposer.ExposeResult, error) {
|
||||
return dt.exposeResult, dt.getExposeErr
|
||||
}
|
||||
|
||||
func (dt *ddResumeTestHelper) PeekExposed(context.Context, corev1.ObjectReference) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *ddResumeTestHelper) RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *ddResumeTestHelper) CleanUp(context.Context, corev1.ObjectReference) {}
|
||||
|
||||
func (dt *ddResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string,
|
||||
datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
return dt.asyncBR
|
||||
}
|
||||
|
||||
func TestAttemptDataDownloadResume(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataUploads []velerov2alpha1api.DataDownload
|
||||
du *velerov2alpha1api.DataDownload
|
||||
pod *corev1.Pod
|
||||
dd *velerov2alpha1api.DataDownload
|
||||
needErrs []bool
|
||||
resumeErr error
|
||||
acceptedDataDownloads []string
|
||||
prepareddDataDownloads []string
|
||||
cancelledDataDownloads []string
|
||||
expectedError bool
|
||||
inProgressDataDownloads []string
|
||||
expectedError string
|
||||
}{
|
||||
// Test case 1: Process Accepted DataDownload
|
||||
{
|
||||
name: "AcceptedDataDownload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
name: "accepted DataDownload with no dd label",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
cancelledDataDownloads: []string{dataDownloadName},
|
||||
acceptedDataDownloads: []string{dataDownloadName},
|
||||
expectedError: false,
|
||||
},
|
||||
// Test case 2: Cancel an Accepted DataDownload
|
||||
{
|
||||
name: "CancelAcceptedDataDownload",
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
name: "accepted DataDownload in the current node",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(),
|
||||
cancelledDataDownloads: []string{dataDownloadName},
|
||||
acceptedDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
// Test case 3: Process Accepted Prepared DataDownload
|
||||
{
|
||||
name: "PreparedDataDownload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
name: "accepted DataDownload with dd label but is canceled",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Cancel(true).Labels(map[string]string{
|
||||
acceptNodeLabelKey: "node-1",
|
||||
}).Result(),
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
acceptedDataDownloads: []string{dataDownloadName},
|
||||
cancelledDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
{
|
||||
name: "accepted DataDownload with dd label but cancel fail",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Labels(map[string]string{
|
||||
acceptNodeLabelKey: "node-1",
|
||||
}).Result(),
|
||||
needErrs: []bool{false, false, true, false, false, false},
|
||||
acceptedDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
{
|
||||
name: "prepared DataDownload",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
prepareddDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
// Test case 4: Process Accepted InProgress DataDownload
|
||||
{
|
||||
name: "InProgressDataDownload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
prepareddDataDownloads: []string{dataDownloadName},
|
||||
name: "InProgress DataDownload, not the current node",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
|
||||
inProgressDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
// Test case 5: get resume error
|
||||
{
|
||||
name: "ResumeError",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
name: "InProgress DataDownload, no resume error",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(),
|
||||
inProgressDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
{
|
||||
name: "InProgress DataDownload, resume error, cancel error",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(),
|
||||
resumeErr: errors.New("fake-resume-error"),
|
||||
needErrs: []bool{false, false, true, false, false, false},
|
||||
inProgressDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
{
|
||||
name: "InProgress DataDownload, resume error, cancel succeed",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Node("node-1").Result(),
|
||||
resumeErr: errors.New("fake-resume-error"),
|
||||
cancelledDataDownloads: []string{dataDownloadName},
|
||||
inProgressDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
{
|
||||
name: "Error",
|
||||
needErrs: []bool{false, false, false, false, false, true},
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
expectedError: true,
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
expectedError: "error to list datadownloads: List error",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1054,30 +1045,31 @@ func TestAttemptDataDownloadResume(t *testing.T) {
|
||||
r.nodeName = "node-1"
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
|
||||
if test.pod != nil {
|
||||
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
|
||||
}
|
||||
r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{})
|
||||
}()
|
||||
|
||||
assert.NoError(t, r.client.Create(ctx, test.du))
|
||||
if test.pod != nil {
|
||||
assert.NoError(t, r.client.Create(ctx, test.pod))
|
||||
}
|
||||
// Run the test
|
||||
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
|
||||
assert.NoError(t, r.client.Create(ctx, test.dd))
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
dt := &duResumeTestHelper{
|
||||
resumeErr: test.resumeErr,
|
||||
}
|
||||
|
||||
funcResumeCancellableDataBackup = dt.resumeCancellableDataPath
|
||||
|
||||
// Run the test
|
||||
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace)
|
||||
|
||||
if test.expectedError != "" {
|
||||
assert.EqualError(t, err, test.expectedError)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify DataDownload marked as Canceled
|
||||
for _, duName := range test.cancelledDataDownloads {
|
||||
dataUpload := &velerov2alpha1api.DataDownload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
dataDownload := &velerov2alpha1api.DataDownload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataDownload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase)
|
||||
assert.True(t, dataDownload.Spec.Cancel)
|
||||
}
|
||||
// Verify DataDownload marked as Accepted
|
||||
for _, duName := range test.acceptedDataDownloads {
|
||||
@@ -1097,3 +1089,108 @@ func TestAttemptDataDownloadResume(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResumeCancellableRestore(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataDownloads []velerov2alpha1api.DataDownload
|
||||
dd *velerov2alpha1api.DataDownload
|
||||
getExposeErr error
|
||||
exposeResult *exposer.ExposeResult
|
||||
createWatcherErr error
|
||||
initWatcherErr error
|
||||
startWatcherErr error
|
||||
mockInit bool
|
||||
mockStart bool
|
||||
mockClose bool
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "get expose failed",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
|
||||
getExposeErr: errors.New("fake-expose-error"),
|
||||
expectedError: fmt.Sprintf("error to get exposed volume for dd %s: fake-expose-error", dataDownloadName),
|
||||
},
|
||||
{
|
||||
name: "no expose",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
|
||||
expectedError: fmt.Sprintf("expose info missed for dd %s", dataDownloadName),
|
||||
},
|
||||
{
|
||||
name: "watcher init error",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
|
||||
exposeResult: &exposer.ExposeResult{
|
||||
ByPod: exposer.ExposeByPod{
|
||||
HostingPod: &corev1.Pod{},
|
||||
},
|
||||
},
|
||||
mockInit: true,
|
||||
mockClose: true,
|
||||
initWatcherErr: errors.New("fake-init-watcher-error"),
|
||||
expectedError: fmt.Sprintf("error to init asyncBR watcher for dd %s: fake-init-watcher-error", dataDownloadName),
|
||||
},
|
||||
{
|
||||
name: "start watcher error",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
|
||||
exposeResult: &exposer.ExposeResult{
|
||||
ByPod: exposer.ExposeByPod{
|
||||
HostingPod: &corev1.Pod{},
|
||||
},
|
||||
},
|
||||
mockInit: true,
|
||||
mockStart: true,
|
||||
mockClose: true,
|
||||
startWatcherErr: errors.New("fake-start-watcher-error"),
|
||||
expectedError: fmt.Sprintf("error to resume asyncBR watche for dd %s: fake-start-watcher-error", dataDownloadName),
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Node("node-1").Result(),
|
||||
exposeResult: &exposer.ExposeResult{
|
||||
ByPod: exposer.ExposeByPod{
|
||||
HostingPod: &corev1.Pod{},
|
||||
},
|
||||
},
|
||||
mockInit: true,
|
||||
mockStart: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
r, err := initDataDownloadReconciler(nil, false)
|
||||
r.nodeName = "node-1"
|
||||
require.NoError(t, err)
|
||||
|
||||
mockAsyncBR := datapathmockes.NewAsyncBR(t)
|
||||
|
||||
if test.mockInit {
|
||||
mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr)
|
||||
}
|
||||
|
||||
if test.mockStart {
|
||||
mockAsyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr)
|
||||
}
|
||||
|
||||
if test.mockClose {
|
||||
mockAsyncBR.On("Close", mock.Anything).Return()
|
||||
}
|
||||
|
||||
dt := &ddResumeTestHelper{
|
||||
getExposeErr: test.getExposeErr,
|
||||
exposeResult: test.exposeResult,
|
||||
asyncBR: mockAsyncBR,
|
||||
}
|
||||
|
||||
r.restoreExposer = dt
|
||||
|
||||
datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher
|
||||
|
||||
err = r.resumeCancellableDataPath(ctx, test.dd, velerotest.NewLogger())
|
||||
if test.expectedError != "" {
|
||||
assert.EqualError(t, err, test.expectedError)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
@@ -67,6 +68,7 @@ type DataUploadReconciler struct {
|
||||
client client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
csiSnapshotClient snapshotter.SnapshotV1Interface
|
||||
mgr manager.Manager
|
||||
repoEnsurer *repository.Ensurer
|
||||
Clock clocks.WithTickerAndDelayedExecution
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
@@ -80,11 +82,12 @@ type DataUploadReconciler struct {
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
|
||||
func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
|
||||
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
|
||||
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
|
||||
return &DataUploadReconciler{
|
||||
client: client,
|
||||
mgr: mgr,
|
||||
kubeClient: kubeClient,
|
||||
csiSnapshotClient: csiSnapshotClient,
|
||||
Clock: clock,
|
||||
@@ -150,9 +153,17 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
} else if controllerutil.ContainsFinalizer(du, DataUploadDownloadFinalizer) && !du.Spec.Cancel && !isDataUploadInFinalState(du) {
|
||||
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
|
||||
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
|
||||
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
log.Warnf("Cancel du under phase %s because it is being deleted", du.Status.Phase)
|
||||
|
||||
if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
||||
if dataUpload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", du.Namespace, du.Name)
|
||||
dataUpload.Status.Message = "Cancel dataupload because it is being deleted"
|
||||
|
||||
return true
|
||||
}); err != nil {
|
||||
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), du.Namespace, du.Name)
|
||||
return ctrl.Result{}, err
|
||||
@@ -599,9 +610,15 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
|
||||
}
|
||||
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
|
||||
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
||||
if dataUpload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
|
||||
dataUpload.Status.Message = fmt.Sprintf("Cancel dataupload because the exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -622,75 +639,6 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
|
||||
return []reconcile.Request{request}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
pods := &corev1.PodList{}
|
||||
var dataUploads []velerov2alpha1api.DataUpload
|
||||
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
|
||||
return nil, errors.Wrapf(err, "failed to list pods on current node")
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
if pod.Spec.NodeName != r.nodeName {
|
||||
r.logger.Debugf("Pod %s related data upload will not handled by %s nodes", pod.GetName(), r.nodeName)
|
||||
continue
|
||||
}
|
||||
du, err := findDataUploadByPod(cli, pod)
|
||||
if err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataUpload by pod")
|
||||
continue
|
||||
} else if du != nil {
|
||||
dataUploads = append(dataUploads, *du)
|
||||
}
|
||||
}
|
||||
return dataUploads, nil
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) findAcceptDataUploadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
dataUploads := &velerov2alpha1api.DataUploadList{}
|
||||
if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
|
||||
return nil, errors.Wrapf(err, "failed to list datauploads")
|
||||
}
|
||||
|
||||
var result []velerov2alpha1api.DataUpload
|
||||
for _, du := range dataUploads.Items {
|
||||
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
|
||||
continue
|
||||
}
|
||||
if du.Labels[acceptNodeLabelKey] == r.nodeName {
|
||||
result = append(result, du)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) CancelAcceptedDataupload(ctx context.Context, cli client.Client, ns string) {
|
||||
r.logger.Infof("Reset accepted dataupload for node %s", r.nodeName)
|
||||
dataUploads, err := r.findAcceptDataUploadsByNodeLabel(ctx, cli, ns)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Error("failed to find dataupload")
|
||||
return
|
||||
}
|
||||
|
||||
for _, du := range dataUploads {
|
||||
if du.Spec.Cancel {
|
||||
continue
|
||||
}
|
||||
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
|
||||
})
|
||||
|
||||
r.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
|
||||
if err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) {
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
|
||||
du.Status.Node = r.nodeName
|
||||
@@ -902,54 +850,145 @@ func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool {
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted
|
||||
}
|
||||
|
||||
func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataUpload *velerov2alpha1api.DataUpload)) error {
|
||||
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(*velerov2alpha1api.DataUpload) bool) error {
|
||||
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
du := &velerov2alpha1api.DataUpload{}
|
||||
if err := client.Get(ctx, namespacedName, du); err != nil {
|
||||
return false, errors.Wrap(err, "getting DataUpload")
|
||||
}
|
||||
|
||||
updateFunc(du)
|
||||
updateErr := client.Update(ctx, du)
|
||||
if updateErr != nil {
|
||||
if apierrors.IsConflict(updateErr) {
|
||||
if updateFunc(du) {
|
||||
err := client.Update(ctx, du)
|
||||
if err != nil {
|
||||
if apierrors.IsConflict(err) {
|
||||
log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name)
|
||||
return false, nil
|
||||
} else {
|
||||
return false, errors.Wrapf(err, "error updating dataupload with error %s/%s", du.Namespace, du.Name)
|
||||
}
|
||||
log.Errorf("failed to update dataupload with error %s for %s/%s", updateErr.Error(), du.Namespace, du.Name)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath
|
||||
|
||||
func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
|
||||
if dataUploads, err := r.FindDataUploadsByPod(ctx, cli, ns); err != nil {
|
||||
return errors.Wrap(err, "failed to find data uploads")
|
||||
} else {
|
||||
for _, du := range dataUploads {
|
||||
dataUploads := &velerov2alpha1api.DataUploadList{}
|
||||
if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
|
||||
return errors.Wrapf(err, "error to list datauploads")
|
||||
}
|
||||
|
||||
for i := range dataUploads.Items {
|
||||
du := &dataUploads.Items[i]
|
||||
if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
|
||||
// keep doing nothing let controller re-download the data
|
||||
// the Prepared CR could be still handled by dataupload controller after node-agent restart
|
||||
logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared")
|
||||
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q into canceled", du.GetName())
|
||||
if du.Status.Node != r.nodeName {
|
||||
logger.WithField("du", du.Name).WithField("current node", r.nodeName).Infof("DU should be resumed by another node %s", du.Status.Node)
|
||||
continue
|
||||
}
|
||||
logger.WithField("dataupload", du.GetName()).Debug("mark dataupload into canceled")
|
||||
|
||||
err := funcResumeCancellableDataBackup(r, ctx, du, logger)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it")
|
||||
|
||||
resumeErr := err
|
||||
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
||||
if dataUpload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("Resume InProgress dataupload failed with error %v, mark it as cancel", resumeErr)
|
||||
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
|
||||
}
|
||||
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
|
||||
r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase")
|
||||
|
||||
err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) bool {
|
||||
if dataUpload.Spec.Cancel {
|
||||
return false
|
||||
}
|
||||
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = "Dataupload is in Accepted status during the node-agent starting, mark it as cancel"
|
||||
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
r.logger.WithField("dataupload", du.GetName()).WithError(errors.WithStack(err)).Error("Failed to trigger dataupload cancel")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//If the data upload is in Accepted status, the volume snapshot may be deleted and the exposed pod may not be created
|
||||
// so we need to mark the data upload as canceled for it may not be recoverable
|
||||
r.CancelAcceptedDataupload(ctx, cli, ns)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) resumeCancellableDataPath(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) error {
|
||||
log.Info("Resume cancelable dataUpload")
|
||||
|
||||
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
|
||||
if !ok {
|
||||
return errors.Errorf("error to find exposer for du %s", du.Name)
|
||||
}
|
||||
|
||||
waitExposePara := r.setupWaitExposePara(du)
|
||||
res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to get exposed snapshot for du %s", du.Name)
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return errors.Errorf("expose info missed for du %s", du.Name)
|
||||
}
|
||||
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataUploadCompleted,
|
||||
OnFailed: r.OnDataUploadFailed,
|
||||
OnCancelled: r.OnDataUploadCancelled,
|
||||
OnProgress: r.OnDataUploadProgress,
|
||||
}
|
||||
|
||||
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, true, log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to create asyncBR watcher for du %s", du.Name)
|
||||
}
|
||||
|
||||
resumeComplete := false
|
||||
defer func() {
|
||||
if !resumeComplete {
|
||||
r.closeDataPath(ctx, du.Name)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := asyncBR.Init(ctx, nil); err != nil {
|
||||
return errors.Wrapf(err, "error to init asyncBR watcher for du %s", du.Name)
|
||||
}
|
||||
|
||||
if err := asyncBR.StartBackup(datapath.AccessPoint{
|
||||
ByPath: res.ByPod.VolumeName,
|
||||
}, du.Spec.DataMoverConfig, nil); err != nil {
|
||||
return errors.Wrapf(err, "error to resume asyncBR watche for du %s", du.Name)
|
||||
}
|
||||
|
||||
resumeComplete = true
|
||||
|
||||
log.Infof("asyncBR is resumed for du %s", du.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -35,6 +36,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clientgofake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/utils/clock"
|
||||
testclocks "k8s.io/utils/clock/testing"
|
||||
@@ -42,6 +44,7 @@ import (
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
@@ -49,6 +52,7 @@ import (
|
||||
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
datapathmocks "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
@@ -241,7 +245,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil,
|
||||
return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil,
|
||||
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
@@ -944,12 +948,11 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Name string
|
||||
needErrs []bool
|
||||
noChange bool
|
||||
ExpectErr bool
|
||||
}{
|
||||
{
|
||||
Name: "SuccessOnFirstAttempt",
|
||||
needErrs: []bool{false, false, false, false},
|
||||
ExpectErr: false,
|
||||
},
|
||||
{
|
||||
Name: "Error get",
|
||||
@@ -961,6 +964,11 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
|
||||
needErrs: []bool{false, false, true, false, false},
|
||||
ExpectErr: true,
|
||||
},
|
||||
{
|
||||
Name: "no change",
|
||||
noChange: true,
|
||||
needErrs: []bool{false, false, true, false, false},
|
||||
},
|
||||
{
|
||||
Name: "Conflict with error timeout",
|
||||
needErrs: []bool{false, false, false, false, true},
|
||||
@@ -976,8 +984,13 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = r.client.Create(ctx, dataUploadBuilder().Result())
|
||||
require.NoError(t, err)
|
||||
updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) {
|
||||
updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) bool {
|
||||
if tc.noChange {
|
||||
return false
|
||||
}
|
||||
|
||||
dataDownload.Spec.Cancel = true
|
||||
return true
|
||||
}
|
||||
err = UpdateDataUploadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc)
|
||||
if tc.ExpectErr {
|
||||
@@ -989,135 +1002,107 @@ func TestUpdateDataUploadWithRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindDataUploads(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
pod corev1.Pod
|
||||
du *velerov2alpha1api.DataUpload
|
||||
expectedUploads []velerov2alpha1api.DataUpload
|
||||
expectedError bool
|
||||
}{
|
||||
// Test case 1: Pod with matching nodeName and DataUpload label
|
||||
{
|
||||
name: "MatchingPod",
|
||||
pod: corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: "pod-1",
|
||||
Labels: map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
},
|
||||
},
|
||||
du: dataUploadBuilder().Result(),
|
||||
expectedUploads: []velerov2alpha1api.DataUpload{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: dataUploadName,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
// Test case 2: Pod with non-matching nodeName
|
||||
{
|
||||
name: "NonMatchingNodePod",
|
||||
pod: corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: "pod-2",
|
||||
Labels: map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
NodeName: "node-2",
|
||||
},
|
||||
},
|
||||
du: dataUploadBuilder().Result(),
|
||||
expectedUploads: []velerov2alpha1api.DataUpload{},
|
||||
expectedError: false,
|
||||
},
|
||||
type duResumeTestHelper struct {
|
||||
resumeErr error
|
||||
getExposeErr error
|
||||
exposeResult *exposer.ExposeResult
|
||||
asyncBR datapath.AsyncBR
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
r, err := initDataUploaderReconcilerWithError()
|
||||
require.NoError(t, err)
|
||||
r.nodeName = "node-1"
|
||||
err = r.client.Create(ctx, test.du)
|
||||
require.NoError(t, err)
|
||||
err = r.client.Create(ctx, &test.pod)
|
||||
require.NoError(t, err)
|
||||
uploads, err := r.FindDataUploadsByPod(context.Background(), r.client, "velero")
|
||||
func (dt *duResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, _ context.Context, _ *velerov2alpha1api.DataUpload, _ logrus.FieldLogger) error {
|
||||
return dt.resumeErr
|
||||
}
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(test.expectedUploads), len(uploads))
|
||||
func (dt *duResumeTestHelper) Expose(context.Context, corev1.ObjectReference, interface{}) error {
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
func (dt *duResumeTestHelper) GetExposed(context.Context, corev1.ObjectReference, time.Duration, interface{}) (*exposer.ExposeResult, error) {
|
||||
return dt.exposeResult, dt.getExposeErr
|
||||
}
|
||||
|
||||
func (dt *duResumeTestHelper) PeekExposed(context.Context, corev1.ObjectReference) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *duResumeTestHelper) CleanUp(context.Context, corev1.ObjectReference, string, string) {}
|
||||
|
||||
func (dt *duResumeTestHelper) newMicroServiceBRWatcher(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string,
|
||||
datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
return dt.asyncBR
|
||||
}
|
||||
|
||||
func TestAttemptDataUploadResume(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataUploads []velerov2alpha1api.DataUpload
|
||||
du *velerov2alpha1api.DataUpload
|
||||
pod *corev1.Pod
|
||||
needErrs []bool
|
||||
acceptedDataUploads []string
|
||||
prepareddDataUploads []string
|
||||
cancelledDataUploads []string
|
||||
expectedError bool
|
||||
inProgressDataUploads []string
|
||||
resumeErr error
|
||||
expectedError string
|
||||
}{
|
||||
// Test case 1: Process Accepted DataUpload
|
||||
{
|
||||
name: "AcceptedDataUpload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
name: "accepted DataUpload in other node",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
cancelledDataUploads: []string{dataUploadName},
|
||||
acceptedDataUploads: []string{dataUploadName},
|
||||
expectedError: false,
|
||||
},
|
||||
// Test case 2: Cancel an Accepted DataUpload
|
||||
{
|
||||
name: "CancelAcceptedDataUpload",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
name: "accepted DataUpload in the current node",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(),
|
||||
cancelledDataUploads: []string{dataUploadName},
|
||||
acceptedDataUploads: []string{dataUploadName},
|
||||
},
|
||||
// Test case 3: Process Accepted Prepared DataUpload
|
||||
{
|
||||
name: "PreparedDataUpload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
name: "accepted DataUpload in the current node but canceled",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Cancel(true).Result(),
|
||||
cancelledDataUploads: []string{dataUploadName},
|
||||
acceptedDataUploads: []string{dataUploadName},
|
||||
},
|
||||
{
|
||||
name: "accepted DataUpload in the current node but update error",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Labels(map[string]string{acceptNodeLabelKey: "node-1"}).Result(),
|
||||
needErrs: []bool{false, false, true, false, false, false},
|
||||
acceptedDataUploads: []string{dataUploadName},
|
||||
},
|
||||
{
|
||||
name: "prepared DataUpload",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
prepareddDataUploads: []string{dataUploadName},
|
||||
},
|
||||
// Test case 4: Process Accepted InProgress DataUpload
|
||||
{
|
||||
name: "InProgressDataUpload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
prepareddDataUploads: []string{dataUploadName},
|
||||
name: "InProgress DataUpload, not the current node",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
|
||||
inProgressDataUploads: []string{dataUploadName},
|
||||
},
|
||||
// Test case 5: get resume error
|
||||
{
|
||||
name: "ResumeError",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
name: "InProgress DataUpload, resume error and update error",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(),
|
||||
needErrs: []bool{false, false, true, false, false, false},
|
||||
resumeErr: errors.New("fake-resume-error"),
|
||||
inProgressDataUploads: []string{dataUploadName},
|
||||
},
|
||||
{
|
||||
name: "InProgress DataUpload, resume error and update succeed",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(),
|
||||
resumeErr: errors.New("fake-resume-error"),
|
||||
cancelledDataUploads: []string{dataUploadName},
|
||||
inProgressDataUploads: []string{dataUploadName},
|
||||
},
|
||||
{
|
||||
name: "InProgress DataUpload and resume succeed",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Node("node-1").Result(),
|
||||
inProgressDataUploads: []string{dataUploadName},
|
||||
},
|
||||
{
|
||||
name: "Error",
|
||||
needErrs: []bool{false, false, false, false, false, true},
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
expectedError: true,
|
||||
expectedError: "error to list datauploads: List error",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1127,22 +1112,20 @@ func TestAttemptDataUploadResume(t *testing.T) {
|
||||
r, err := initDataUploaderReconciler(test.needErrs...)
|
||||
r.nodeName = "node-1"
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
|
||||
if test.pod != nil {
|
||||
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
|
||||
}
|
||||
}()
|
||||
|
||||
assert.NoError(t, r.client.Create(ctx, test.du))
|
||||
if test.pod != nil {
|
||||
assert.NoError(t, r.client.Create(ctx, test.pod))
|
||||
|
||||
dt := &duResumeTestHelper{
|
||||
resumeErr: test.resumeErr,
|
||||
}
|
||||
|
||||
funcResumeCancellableDataBackup = dt.resumeCancellableDataPath
|
||||
|
||||
// Run the test
|
||||
err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
if test.expectedError != "" {
|
||||
assert.EqualError(t, err, test.expectedError)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -1151,7 +1134,7 @@ func TestAttemptDataUploadResume(t *testing.T) {
|
||||
dataUpload := &velerov2alpha1api.DataUpload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, dataUpload.Status.Phase)
|
||||
assert.True(t, dataUpload.Spec.Cancel)
|
||||
}
|
||||
// Verify DataUploads marked as Accepted
|
||||
for _, duName := range test.acceptedDataUploads {
|
||||
@@ -1167,6 +1150,123 @@ func TestAttemptDataUploadResume(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataUploadPhasePrepared, dataUpload.Status.Phase)
|
||||
}
|
||||
// Verify DataUploads marked as InProgress
|
||||
for _, duName := range test.inProgressDataUploads {
|
||||
dataUpload := &velerov2alpha1api.DataUpload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataUploadPhaseInProgress, dataUpload.Status.Phase)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResumeCancellableBackup(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataUploads []velerov2alpha1api.DataUpload
|
||||
du *velerov2alpha1api.DataUpload
|
||||
getExposeErr error
|
||||
exposeResult *exposer.ExposeResult
|
||||
createWatcherErr error
|
||||
initWatcherErr error
|
||||
startWatcherErr error
|
||||
mockInit bool
|
||||
mockStart bool
|
||||
mockClose bool
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "not find exposer",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType("").Result(),
|
||||
expectedError: fmt.Sprintf("error to find exposer for du %s", dataUploadName),
|
||||
},
|
||||
{
|
||||
name: "get expose failed",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(velerov2alpha1api.SnapshotTypeCSI).Result(),
|
||||
getExposeErr: errors.New("fake-expose-error"),
|
||||
expectedError: fmt.Sprintf("error to get exposed snapshot for du %s: fake-expose-error", dataUploadName),
|
||||
},
|
||||
{
|
||||
name: "no expose",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
|
||||
expectedError: fmt.Sprintf("expose info missed for du %s", dataUploadName),
|
||||
},
|
||||
{
|
||||
name: "watcher init error",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
|
||||
exposeResult: &exposer.ExposeResult{
|
||||
ByPod: exposer.ExposeByPod{
|
||||
HostingPod: &corev1.Pod{},
|
||||
},
|
||||
},
|
||||
mockInit: true,
|
||||
mockClose: true,
|
||||
initWatcherErr: errors.New("fake-init-watcher-error"),
|
||||
expectedError: fmt.Sprintf("error to init asyncBR watcher for du %s: fake-init-watcher-error", dataUploadName),
|
||||
},
|
||||
{
|
||||
name: "start watcher error",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
|
||||
exposeResult: &exposer.ExposeResult{
|
||||
ByPod: exposer.ExposeByPod{
|
||||
HostingPod: &corev1.Pod{},
|
||||
},
|
||||
},
|
||||
mockInit: true,
|
||||
mockStart: true,
|
||||
mockClose: true,
|
||||
startWatcherErr: errors.New("fake-start-watcher-error"),
|
||||
expectedError: fmt.Sprintf("error to resume asyncBR watche for du %s: fake-start-watcher-error", dataUploadName),
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Node("node-1").Result(),
|
||||
exposeResult: &exposer.ExposeResult{
|
||||
ByPod: exposer.ExposeByPod{
|
||||
HostingPod: &corev1.Pod{},
|
||||
},
|
||||
},
|
||||
mockInit: true,
|
||||
mockStart: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
r, err := initDataUploaderReconciler()
|
||||
r.nodeName = "node-1"
|
||||
require.NoError(t, err)
|
||||
|
||||
mockAsyncBR := datapathmocks.NewAsyncBR(t)
|
||||
|
||||
if test.mockInit {
|
||||
mockAsyncBR.On("Init", mock.Anything, mock.Anything).Return(test.initWatcherErr)
|
||||
}
|
||||
|
||||
if test.mockStart {
|
||||
mockAsyncBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startWatcherErr)
|
||||
}
|
||||
|
||||
if test.mockClose {
|
||||
mockAsyncBR.On("Close", mock.Anything).Return()
|
||||
}
|
||||
|
||||
dt := &duResumeTestHelper{
|
||||
getExposeErr: test.getExposeErr,
|
||||
exposeResult: test.exposeResult,
|
||||
asyncBR: mockAsyncBR,
|
||||
}
|
||||
|
||||
r.snapshotExposerList[velerov2alpha1api.SnapshotTypeCSI] = dt
|
||||
|
||||
datapath.MicroServiceBRWatcherCreator = dt.newMicroServiceBRWatcher
|
||||
|
||||
err = r.resumeCancellableDataPath(ctx, test.du, velerotest.NewLogger())
|
||||
if test.expectedError != "" {
|
||||
assert.EqualError(t, err, test.expectedError)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user