mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-03 11:45:20 +00:00
@@ -161,7 +161,25 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
log.Info("Restore PVC is ready")
|
||||
log.Info("Restore PVC is ready and creating data path routine")
|
||||
|
||||
// Need to first create file system BR and get data path instance then update data upload status
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataDownloadCompleted,
|
||||
OnFailed: r.OnDataDownloadFailed,
|
||||
OnCancelled: r.OnDataDownloadCancelled,
|
||||
OnProgress: r.OnDataDownloadProgress,
|
||||
}
|
||||
|
||||
fsRestore, err = r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
if err == datapath.ConcurrentLimitExceed {
|
||||
log.Info("Data path instance is concurrent limited requeue later")
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
|
||||
} else {
|
||||
return r.errorOut(ctx, dd, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
// Update status to InProgress
|
||||
original := dd.DeepCopy()
|
||||
@@ -174,7 +192,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
|
||||
log.Info("Data download is marked as in progress")
|
||||
|
||||
return r.runCancelableDataPath(ctx, dd, result, log)
|
||||
reconcileResult, err := r.runCancelableDataPath(ctx, fsRestore, dd, result, log)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
|
||||
r.closeDataPath(ctx, dd.Name)
|
||||
}
|
||||
return reconcileResult, err
|
||||
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
|
||||
log.Info("Data download is in progress")
|
||||
if dd.Spec.Cancel {
|
||||
@@ -203,25 +226,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
|
||||
log.Info("Creating data path routine")
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataDownloadCompleted,
|
||||
OnFailed: r.OnDataDownloadFailed,
|
||||
OnCancelled: r.OnDataDownloadCancelled,
|
||||
OnProgress: r.OnDataDownloadProgress,
|
||||
}
|
||||
|
||||
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
if err == datapath.ConcurrentLimitExceed {
|
||||
log.Info("runCancelableDataDownload is concurrent limited")
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
|
||||
} else {
|
||||
return r.errorOut(ctx, dd, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRestore datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
|
||||
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
|
||||
if err != nil {
|
||||
return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log)
|
||||
@@ -437,12 +442,11 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
|
||||
dd.Status.Message = errors.WithMessage(err, msg).Error()
|
||||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
|
||||
if err = r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating DataDownload status")
|
||||
return err
|
||||
if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataDownload status")
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) {
|
||||
|
||||
@@ -192,7 +192,12 @@ func TestDataDownloadReconcile(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
require.Nil(t, err)
|
||||
if test.isGetExposeErr {
|
||||
assert.Contains(t, err.Error(), test.expectedStatusMsg)
|
||||
} else {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
require.NotNil(t, actualResult)
|
||||
|
||||
dd := velerov2alpha1api.DataDownload{}
|
||||
|
||||
@@ -160,7 +160,25 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
log.Info("Exposed snapshot is ready")
|
||||
log.Info("Exposed snapshot is ready and creating data path routine")
|
||||
|
||||
// Need to first create file system BR and get data path instance then update data upload status
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataUploadCompleted,
|
||||
OnFailed: r.OnDataUploadFailed,
|
||||
OnCancelled: r.OnDataUploadCancelled,
|
||||
OnProgress: r.OnDataUploadProgress,
|
||||
}
|
||||
|
||||
fsBackup, err = r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
if err == datapath.ConcurrentLimitExceed {
|
||||
log.Info("Data path instance is concurrent limited requeue later")
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
|
||||
} else {
|
||||
return r.errorOut(ctx, &du, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
// Update status to InProgress
|
||||
original := du.DeepCopy()
|
||||
@@ -171,7 +189,12 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
}
|
||||
|
||||
log.Info("Data upload is marked as in progress")
|
||||
return r.runCancelableDataUpload(ctx, &du, res, log)
|
||||
result, err := r.runCancelableDataUpload(ctx, fsBackup, &du, res, log)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err)
|
||||
r.closeDataPath(ctx, du.Name)
|
||||
}
|
||||
return result, err
|
||||
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
log.Info("Data upload is in progress")
|
||||
if du.Spec.Cancel {
|
||||
@@ -198,25 +221,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
|
||||
log.Info("Creating data path routine")
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataUploadCompleted,
|
||||
OnFailed: r.OnDataUploadFailed,
|
||||
OnCancelled: r.OnDataUploadCancelled,
|
||||
OnProgress: r.OnDataUploadProgress,
|
||||
}
|
||||
|
||||
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
if err == datapath.ConcurrentLimitExceed {
|
||||
log.Info("runCancelableDataUpload is concurrent limited")
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
|
||||
} else {
|
||||
return r.errorOut(ctx, du, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
|
||||
log.Info("Run cancelable dataUpload")
|
||||
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
|
||||
if err != nil {
|
||||
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
|
||||
@@ -460,12 +466,11 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
|
||||
}
|
||||
|
||||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
|
||||
if err = r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating DataUpload status")
|
||||
return err
|
||||
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataUpload status")
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) {
|
||||
|
||||
@@ -291,8 +291,8 @@ func TestReconcile(t *testing.T) {
|
||||
expectedProcessed: true,
|
||||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
|
||||
expectedRequeue: ctrl.Result{},
|
||||
},
|
||||
{
|
||||
expectedErrMsg: "unknown type type of snapshot exposer is not exist",
|
||||
}, {
|
||||
name: "Dataupload should be accepted",
|
||||
du: dataUploadBuilder().Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
|
||||
@@ -336,7 +336,7 @@ func TestReconcile(t *testing.T) {
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
|
||||
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Cancel(true).Result(),
|
||||
expectedProcessed: false,
|
||||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
|
||||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
|
||||
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute},
|
||||
},
|
||||
}
|
||||
@@ -400,7 +400,7 @@ func TestReconcile(t *testing.T) {
|
||||
if test.expectedErrMsg == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
assert.Equal(t, err.Error(), test.expectedErrMsg)
|
||||
assert.Contains(t, err.Error(), test.expectedErrMsg)
|
||||
}
|
||||
|
||||
du := velerov2alpha1api.DataUpload{}
|
||||
|
||||
Reference in New Issue
Block a user