mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-06 21:36:30 +00:00
Node agent restart enhancement
Signed-off-by: Ming Qiu <mqiu@vmware.com>
This commit is contained in:
1
changelogs/unreleased/7130-qiuming-best
Normal file
1
changelogs/unreleased/7130-qiuming-best
Normal file
@@ -0,0 +1 @@
|
||||
Node agent restart enhancement
|
||||
@@ -285,13 +285,13 @@ func (s *nodeAgentServer) run() {
|
||||
}
|
||||
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
s.markDataUploadsCancel(dataUploadReconciler)
|
||||
s.attemptDataUploadResume(dataUploadReconciler)
|
||||
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
|
||||
}
|
||||
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
s.markDataDownloadsCancel(dataDownloadReconciler)
|
||||
s.attemptDataDownloadResume(dataDownloadReconciler)
|
||||
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data download controller")
|
||||
}
|
||||
@@ -365,65 +365,28 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
|
||||
s.markInProgressPVRsFailed(client)
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconciler) {
|
||||
func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {
|
||||
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
|
||||
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
|
||||
return
|
||||
}
|
||||
if dataUploads, err := r.FindDataUploads(s.ctx, client, s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to find data uploads")
|
||||
} else {
|
||||
for i := range dataUploads {
|
||||
du := dataUploads[i]
|
||||
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
|
||||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
|
||||
continue
|
||||
}
|
||||
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
|
||||
}
|
||||
}
|
||||
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReconciler) {
|
||||
func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {
|
||||
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
|
||||
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
|
||||
return
|
||||
}
|
||||
if dataDownloads, err := r.FindDataDownloads(s.ctx, client, s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to find data downloads")
|
||||
} else {
|
||||
for i := range dataDownloads {
|
||||
dd := dataDownloads[i]
|
||||
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
|
||||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
|
||||
err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
|
||||
continue
|
||||
}
|
||||
s.logger.WithField("datadownload", dd.GetName()).Warn(dd.Status.Message)
|
||||
}
|
||||
}
|
||||
if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
|
||||
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
|
||||
}); err != nil {
|
||||
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
|
||||
return ctrl.Result{}, err
|
||||
@@ -192,7 +192,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Restore is exposed")
|
||||
|
||||
// we need to get CR again for it may canceled by datadownload controller on other
|
||||
@@ -205,7 +204,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
}
|
||||
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")
|
||||
}
|
||||
|
||||
// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
|
||||
// and need to clean up resources again
|
||||
if isDataDownloadInFinalState(dd) {
|
||||
@@ -267,7 +265,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
return r.errorOut(ctx, dd, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
// Update status to InProgress
|
||||
original := dd.DeepCopy()
|
||||
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
|
||||
@@ -576,6 +573,51 @@ func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli clie
|
||||
return dataDownloads, nil
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) {
|
||||
dataDownloads := &velerov2alpha1api.DataDownloadList{}
|
||||
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
|
||||
return nil, errors.Wrapf(err, "failed to list datauploads")
|
||||
}
|
||||
|
||||
var result []velerov2alpha1api.DataDownload
|
||||
for _, dd := range dataDownloads.Items {
|
||||
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
|
||||
continue
|
||||
}
|
||||
if dd.Labels[acceptNodeLabelKey] == r.nodeName {
|
||||
result = append(result, dd)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// CancelAcceptedDataDownload will cancel the accepted data download
|
||||
func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) {
|
||||
r.logger.Infof("Canceling accepted data for node %s", r.nodeName)
|
||||
dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Error("failed to find data downloads")
|
||||
return
|
||||
}
|
||||
|
||||
for _, dd := range dataDownloads {
|
||||
if dd.Spec.Cancel {
|
||||
continue
|
||||
}
|
||||
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
|
||||
r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
|
||||
})
|
||||
|
||||
r.logger.Warn(dd.Status.Message)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
|
||||
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
|
||||
ssb.Status.Node = r.nodeName
|
||||
@@ -749,3 +791,35 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
|
||||
if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil {
|
||||
return errors.Wrapf(err, "failed to find data downloads")
|
||||
} else {
|
||||
for i := range dataDownloads {
|
||||
dd := dataDownloads[i]
|
||||
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
|
||||
// keep doing nothing let controller re-download the data
|
||||
// the Prepared CR could be still handled by datadownload controller after node-agent restart
|
||||
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
|
||||
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
|
||||
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
|
||||
func(dataDownload *velerov2alpha1api.DataDownload) {
|
||||
dataDownload.Spec.Cancel = true
|
||||
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
|
||||
continue
|
||||
}
|
||||
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//If the data download is in Accepted status, the expoded PVC may be not created
|
||||
// so we need to mark the data download as canceled for it may not be recoverable
|
||||
r.CancelAcceptedDataDownload(ctx, cli, ns)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
|
||||
}
|
||||
|
||||
func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
|
||||
var errs []error = make([]error, 5)
|
||||
var errs []error = make([]error, 6)
|
||||
for k, isError := range needError {
|
||||
if k == 0 && isError {
|
||||
errs[0] = fmt.Errorf("Get error")
|
||||
@@ -81,6 +81,8 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D
|
||||
errs[3] = fmt.Errorf("Patch error")
|
||||
} else if k == 4 && isError {
|
||||
errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict"))
|
||||
} else if k == 5 && isError {
|
||||
errs[5] = fmt.Errorf("List error")
|
||||
}
|
||||
}
|
||||
return initDataDownloadReconcilerWithError(objects, errs...)
|
||||
@@ -116,6 +118,8 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
|
||||
fakeClient.patchError = needError[3]
|
||||
} else if k == 4 {
|
||||
fakeClient.updateConflict = needError[4]
|
||||
} else if k == 5 {
|
||||
fakeClient.listError = needError[5]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -939,3 +943,111 @@ func TestFindDataDownloads(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAttemptDataDownloadResume(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataUploads []velerov2alpha1api.DataDownload
|
||||
du *velerov2alpha1api.DataDownload
|
||||
pod *corev1.Pod
|
||||
needErrs []bool
|
||||
acceptedDataDownloads []string
|
||||
prepareddDataDownloads []string
|
||||
cancelledDataDownloads []string
|
||||
expectedError bool
|
||||
}{
|
||||
// Test case 1: Process Accepted DataDownload
|
||||
{
|
||||
name: "AcceptedDataDownload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
acceptedDataDownloads: []string{dataDownloadName},
|
||||
expectedError: false,
|
||||
},
|
||||
// Test case 2: Cancel an Accepted DataDownload
|
||||
{
|
||||
name: "CancelAcceptedDataDownload",
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
|
||||
},
|
||||
// Test case 3: Process Accepted Prepared DataDownload
|
||||
{
|
||||
name: "PreparedDataDownload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
prepareddDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
// Test case 4: Process Accepted InProgress DataDownload
|
||||
{
|
||||
name: "InProgressDataDownload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
prepareddDataDownloads: []string{dataDownloadName},
|
||||
},
|
||||
// Test case 5: get resume error
|
||||
{
|
||||
name: "ResumeError",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Volumes(&corev1.Volume{Name: dataDownloadName}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataDownloadLabel: dataDownloadName,
|
||||
}).Result(),
|
||||
needErrs: []bool{false, false, false, false, false, true},
|
||||
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
r, err := initDataDownloadReconciler(nil, test.needErrs...)
|
||||
r.nodeName = "node-1"
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
|
||||
if test.pod != nil {
|
||||
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
|
||||
}
|
||||
}()
|
||||
|
||||
assert.NoError(t, r.client.Create(ctx, test.du))
|
||||
if test.pod != nil {
|
||||
assert.NoError(t, r.client.Create(ctx, test.pod))
|
||||
}
|
||||
// Run the test
|
||||
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify DataDownload marked as Cancelled
|
||||
for _, duName := range test.cancelledDataDownloads {
|
||||
dataUpload := &velerov2alpha1api.DataDownload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseCanceled, dataUpload.Status.Phase)
|
||||
}
|
||||
// Verify DataDownload marked as Accepted
|
||||
for _, duName := range test.acceptedDataDownloads {
|
||||
dataUpload := &velerov2alpha1api.DataDownload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataDownloadPhaseAccepted, dataUpload.Status.Phase)
|
||||
}
|
||||
// Verify DataDownload marked as Prepared
|
||||
for _, duName := range test.prepareddDataDownloads {
|
||||
dataUpload := &velerov2alpha1api.DataDownload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataDownloadPhasePrepared, dataUpload.Status.Phase)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,7 +274,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
return r.errorOut(ctx, du, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
// Update status to InProgress
|
||||
original := du.DeepCopy()
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
|
||||
@@ -581,7 +580,7 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
|
||||
return []reconcile.Request{requests}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
pods := &corev1.PodList{}
|
||||
var dataUploads []velerov2alpha1api.DataUpload
|
||||
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
@@ -605,6 +604,51 @@ func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.C
|
||||
return dataUploads, nil
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) findAcceptDataUploadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
|
||||
dataUploads := &velerov2alpha1api.DataUploadList{}
|
||||
if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
|
||||
return nil, errors.Wrapf(err, "failed to list datauploads")
|
||||
}
|
||||
|
||||
var result []velerov2alpha1api.DataUpload
|
||||
for _, du := range dataUploads.Items {
|
||||
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
|
||||
continue
|
||||
}
|
||||
if du.Labels[acceptNodeLabelKey] == r.nodeName {
|
||||
result = append(result, du)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) CancelAcceptedDataupload(ctx context.Context, cli client.Client, ns string) {
|
||||
r.logger.Infof("Reset accepted dataupload for node %s", r.nodeName)
|
||||
dataUploads, err := r.findAcceptDataUploadsByNodeLabel(ctx, cli, ns)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Error("failed to find dataupload")
|
||||
return
|
||||
}
|
||||
|
||||
for _, du := range dataUploads {
|
||||
if du.Spec.Cancel {
|
||||
continue
|
||||
}
|
||||
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
|
||||
})
|
||||
|
||||
r.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
|
||||
if err != nil {
|
||||
r.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) {
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
|
||||
du.Status.Node = r.nodeName
|
||||
@@ -833,3 +877,34 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
|
||||
if dataUploads, err := r.FindDataUploadsByPod(ctx, cli, ns); err != nil {
|
||||
return errors.Wrap(err, "failed to find data uploads")
|
||||
} else {
|
||||
for _, du := range dataUploads {
|
||||
if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
|
||||
// keep doing nothing let controller re-download the data
|
||||
// the Prepared CR could be still handled by dataupload controller after node-agent restart
|
||||
logger.WithField("dataupload", du.GetName()).Debug("find a dataupload with status prepared")
|
||||
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
|
||||
func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Spec.Cancel = true
|
||||
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q into canceled", du.GetName())
|
||||
continue
|
||||
}
|
||||
logger.WithField("dataupload", du.GetName()).Debug("mark dataupload into canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//If the data upload is in Accepted status, the volume snapshot may be deleted and the exposed pod may not be created
|
||||
// so we need to mark the data upload as canceled for it may not be recoverable
|
||||
r.CancelAcceptedDataupload(ctx, cli, ns)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ type FakeClient struct {
|
||||
updateError error
|
||||
patchError error
|
||||
updateConflict error
|
||||
listError error
|
||||
}
|
||||
|
||||
func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error {
|
||||
@@ -106,8 +107,16 @@ func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbcli
|
||||
return c.Client.Patch(ctx, obj, patch, opts...)
|
||||
}
|
||||
|
||||
func (c *FakeClient) List(ctx context.Context, list kbclient.ObjectList, opts ...kbclient.ListOption) error {
|
||||
if c.listError != nil {
|
||||
return c.listError
|
||||
}
|
||||
|
||||
return c.Client.List(ctx, list, opts...)
|
||||
}
|
||||
|
||||
func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) {
|
||||
var errs []error = make([]error, 5)
|
||||
var errs []error = make([]error, 6)
|
||||
for k, isError := range needError {
|
||||
if k == 0 && isError {
|
||||
errs[0] = fmt.Errorf("Get error")
|
||||
@@ -118,7 +127,9 @@ func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error
|
||||
} else if k == 3 && isError {
|
||||
errs[3] = fmt.Errorf("Patch error")
|
||||
} else if k == 4 && isError {
|
||||
errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict"))
|
||||
errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("dataupload"), dataUploadName, errors.New("conflict"))
|
||||
} else if k == 5 && isError {
|
||||
errs[5] = fmt.Errorf("List error")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,6 +209,8 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
|
||||
fakeClient.patchError = needError[3]
|
||||
} else if k == 4 {
|
||||
fakeClient.updateConflict = needError[4]
|
||||
} else if k == 5 {
|
||||
fakeClient.listError = needError[5]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -983,7 +996,7 @@ func TestFindDataUploads(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = r.client.Create(ctx, &test.pod)
|
||||
require.NoError(t, err)
|
||||
uploads, err := r.FindDataUploads(context.Background(), r.client, "velero")
|
||||
uploads, err := r.FindDataUploadsByPod(context.Background(), r.client, "velero")
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
@@ -994,3 +1007,110 @@ func TestFindDataUploads(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
func TestAttemptDataUploadResume(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dataUploads []velerov2alpha1api.DataUpload
|
||||
du *velerov2alpha1api.DataUpload
|
||||
pod *corev1.Pod
|
||||
needErrs []bool
|
||||
acceptedDataUploads []string
|
||||
prepareddDataUploads []string
|
||||
cancelledDataUploads []string
|
||||
expectedError bool
|
||||
}{
|
||||
// Test case 1: Process Accepted DataUpload
|
||||
{
|
||||
name: "AcceptedDataUpload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
acceptedDataUploads: []string{dataUploadName},
|
||||
expectedError: false,
|
||||
},
|
||||
// Test case 2: Cancel an Accepted DataUpload
|
||||
{
|
||||
name: "CancelAcceptedDataUpload",
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
|
||||
},
|
||||
// Test case 3: Process Accepted Prepared DataUpload
|
||||
{
|
||||
name: "PreparedDataUpload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
prepareddDataUploads: []string{dataUploadName},
|
||||
},
|
||||
// Test case 4: Process Accepted InProgress DataUpload
|
||||
{
|
||||
name: "InProgressDataUpload",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
prepareddDataUploads: []string{dataUploadName},
|
||||
},
|
||||
// Test case 5: get resume error
|
||||
{
|
||||
name: "ResumeError",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).NodeName("node-1").Labels(map[string]string{
|
||||
velerov1api.DataUploadLabel: dataUploadName,
|
||||
}).Result(),
|
||||
needErrs: []bool{false, false, false, false, false, true},
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
r, err := initDataUploaderReconciler(test.needErrs...)
|
||||
r.nodeName = "node-1"
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
r.client.Delete(ctx, test.du, &kbclient.DeleteOptions{})
|
||||
if test.pod != nil {
|
||||
r.client.Delete(ctx, test.pod, &kbclient.DeleteOptions{})
|
||||
}
|
||||
}()
|
||||
|
||||
assert.NoError(t, r.client.Create(ctx, test.du))
|
||||
if test.pod != nil {
|
||||
assert.NoError(t, r.client.Create(ctx, test.pod))
|
||||
}
|
||||
// Run the test
|
||||
err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
|
||||
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify DataUploads marked as Cancelled
|
||||
for _, duName := range test.cancelledDataUploads {
|
||||
dataUpload := &velerov2alpha1api.DataUpload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataUploadPhaseCanceled, dataUpload.Status.Phase)
|
||||
}
|
||||
// Verify DataUploads marked as Accepted
|
||||
for _, duName := range test.acceptedDataUploads {
|
||||
dataUpload := &velerov2alpha1api.DataUpload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataUploadPhaseAccepted, dataUpload.Status.Phase)
|
||||
}
|
||||
// Verify DataUploads marked as Prepared
|
||||
for _, duName := range test.prepareddDataUploads {
|
||||
dataUpload := &velerov2alpha1api.DataUpload{}
|
||||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: "velero", Name: duName}, dataUpload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, velerov2alpha1api.DataUploadPhasePrepared, dataUpload.Status.Phase)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user