Make data mover fail early

Signed-off-by: Ming <mqiu@vmware.com>
This commit is contained in:
Ming
2023-11-02 11:05:39 +00:00
committed by Ming Qiu
parent 6ac7ff1230
commit 03dff100a3
8 changed files with 170 additions and 40 deletions

View File

@@ -485,10 +485,6 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}
if newObj.Status.Phase != v1.PodRunning {
return false
}
if newObj.Spec.NodeName == "" {
return false
}
@@ -510,43 +506,55 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)
dd, err := findDataDownloadByPod(r.client, *pod)
log := r.logger.WithField("pod", pod.Name)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
log.WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
} else if dd == nil {
r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload")
log.Error("get empty DataDownload")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"Dataddownload": dd.Name,
})
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
return []reconcile.Request{}
}
requests := make([]reconcile.Request, 1)
if pod.Status.Phase == v1.PodRunning {
log.Info("Preparing data download")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload")
return []reconcile.Request{}
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason)
})
r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload")
if err != nil {
log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Info("Exposed pod is in abnormal status, and datadownload is marked as cancel")
} else {
return []reconcile.Request{}
}
requests[0] = reconcile.Request{
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: dd.Namespace,
Name: dd.Name,
},
}
return requests
return []reconcile.Request{request}
}
func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {

View File

@@ -632,7 +632,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
{
name: "find dataDownload for pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)

View File

@@ -521,10 +521,6 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}
if newObj.Status.Phase != corev1.PodRunning {
return false
}
if newObj.Spec.NodeName != r.nodeName {
return false
}
@@ -547,37 +543,56 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*corev1.Pod)
du, err := findDataUploadByPod(r.client, *pod)
log := r.logger.WithFields(logrus.Fields{
"Backup pod": pod.Name,
})
if err != nil {
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload")
log.WithError(err).Error("unable to get dataupload")
return []reconcile.Request{}
} else if du == nil {
r.logger.WithField("Backup pod", pod.Name).Error("get empty DataUpload")
log.Error("get empty DataUpload")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"Datadupload": du.Name,
})
if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
return []reconcile.Request{}
}
r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Dataupload": du.Name,
"Backup pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload")
if pod.Status.Phase == corev1.PodRunning {
log.Info("Preparing dataupload")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload")
return []reconcile.Request{}
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
})
if err != nil {
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Info("Exposed pod is in abnormal status and dataupload is marked as cancel")
} else {
return []reconcile.Request{}
}
requests := reconcile.Request{
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: du.Namespace,
Name: du.Name,
},
}
return []reconcile.Request{requests}
return []reconcile.Request{request}
}
func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {

View File

@@ -683,7 +683,7 @@ func TestFindDataUploadForPod(t *testing.T) {
{
name: "find dataUpload for pod",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)