data mover ms smoking test

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2024-08-09 14:34:48 +08:00
parent dd3d05bbac
commit 4dea3a48e8
14 changed files with 330 additions and 216 deletions

View File

@@ -233,6 +233,7 @@ func (s *dataMoverBackup) runDataPath() {
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err)
return

View File

@@ -223,6 +223,7 @@ func (s *dataMoverRestore) runDataPath() {
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
dpService.Shutdown()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

View File

@@ -217,9 +217,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.TryCancelDataDownload(ctx, dd, "")
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
@@ -272,23 +272,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}
if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil {
log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name)
r.closeDataPath(ctx, dd.Name)
return r.errorOut(ctx, dd, err, "error initializing data path", log)
}
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name)
r.closeDataPath(ctx, dd.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
log.Info("Data download is marked as in progress")
reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil {
log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name)
r.closeDataPath(ctx, dd.Name)
return r.errorOut(ctx, dd, err, "error starting data path", log)
}
return reconcileResult, err
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel {
@@ -331,27 +343,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}
func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Init cancelable dataDownload")
if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log)
return errors.Wrap(err, "error initializing asyncBR")
}
log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Start cancelable dataDownload")
if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
}
log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)
log := r.logger.WithField("datadownload", ddName)
log.Info("Async fs restore data path completed")
@@ -384,9 +402,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
}
func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)
log := r.logger.WithField("datadownload", ddName)
@@ -396,16 +412,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names
if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil {
log.WithError(getErr).Warn("Failed to get data download on failure")
} else {
if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil {
log.WithError(err).Warnf("Failed to patch data download with err %v", errOut)
}
r.errorOut(ctx, &dd, err, "data path restore failed", log)
}
}
func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)
log := r.logger.WithField("datadownload", ddName)
@@ -432,9 +444,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
}
func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Async fs backup data path canceled")
log.Warn("Accepted data download is canceled")
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
@@ -442,7 +454,10 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
dataDownload.Status.Message = message
if message != "" {
dataDownload.Status.Message = message
}
})
if err != nil {
@@ -456,7 +471,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *
// success update
r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
r.closeDataPath(ctx, dd.Name)
}
func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {

View File

@@ -192,6 +192,10 @@ func TestDataDownloadReconcile(t *testing.T) {
isFSBRRestoreErr bool
notNilExpose bool
notMockCleanUp bool
mockInit bool
mockInitErr error
mockStart bool
mockStartErr error
mockCancel bool
mockClose bool
expected *velerov2alpha1api.DataDownload
@@ -264,13 +268,36 @@ func TestDataDownloadReconcile(t *testing.T) {
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "Unable to update status to in progress for data download",
name: "data path init error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, false, true},
mockInit: true,
mockInitErr: errors.New("fake-data-path-init-error"),
mockClose: true,
notNilExpose: true,
notMockCleanUp: true,
expectedStatusMsg: "Patch error",
expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error",
},
{
name: "Unable to update status to in progress for data download",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{false, false, false, true},
mockInit: true,
mockClose: true,
notNilExpose: true,
notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path start error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
mockInit: true,
mockStart: true,
mockStartErr: errors.New("fake-data-path-start-error"),
mockClose: true,
notNilExpose: true,
expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error",
},
{
name: "accept DataDownload error",
@@ -399,6 +426,14 @@ func TestDataDownloadReconcile(t *testing.T) {
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t)
if test.mockInit {
asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr)
}
if test.mockStart {
asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr)
}
if test.mockCancel {
asyncBR.On("Cancel").Return()
}
@@ -488,6 +523,10 @@ func TestDataDownloadReconcile(t *testing.T) {
assert.True(t, true, apierrors.IsNotFound(err))
}
if !test.needCreateFSBR {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name))
}
t.Logf("%s: \n %v \n", test.name, dd)
})
}
@@ -845,7 +884,7 @@ func TestTryCancelDataDownload(t *testing.T) {
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)
r.TryCancelDataDownload(ctx, test.dd, "")
r.tryCancelAcceptedDataDownload(ctx, test.dd, "")
if test.expectedErr == "" {
assert.NoError(t, err)

View File

@@ -230,9 +230,9 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action
// we could retry when the CR requeue in periodcally
log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase)
r.TryCancelDataUpload(ctx, du, "")
r.tryCancelAcceptedDataUpload(ctx, du, "")
} else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil {
r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr))
r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr))
log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr)
} else if du.Status.StartTimestamp != nil {
if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout {
@@ -283,21 +283,35 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.errorOut(ctx, du, err, "error to create data path", log)
}
}
if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil {
log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", du.Name)
r.closeDataPath(ctx, du.Name)
return r.errorOut(ctx, du, err, "error initializing data path", log)
}
// Update status to InProgress
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
return r.errorOut(ctx, du, err, "error updating dataupload status", log)
log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name)
r.closeDataPath(ctx, du.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
log.Info("Data upload is marked as in progress")
result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err)
if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil {
log.WithError(err).Errorf("Failed to start cancelable data path for %s", du.Name)
r.closeDataPath(ctx, du.Name)
return r.errorOut(ctx, du, err, "error starting data path", log)
}
return result, err
return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
log.Info("Data upload is in progress")
if du.Spec.Cancel {
@@ -340,29 +354,33 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")
func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Init cancelable dataUpload")
if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, du, err, "error to initialize asyncBR", log)
return errors.Wrap(err, "error initializing asyncBR")
}
log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Start cancelable dataUpload")
if err := asyncBR.StartBackup(datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, du.Spec.DataMoverConfig, nil); err != nil {
return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
}
log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}
func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) {
defer func() {
go r.closeDataPath(ctx, duName)
}()
defer r.dataPathMgr.RemoveAsyncBR(duName)
log := r.logger.WithField("dataupload", duName)
@@ -406,9 +424,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
}
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) {
defer func() {
go r.closeDataPath(ctx, duName)
}()
defer r.dataPathMgr.RemoveAsyncBR(duName)
log := r.logger.WithField("dataupload", duName)
@@ -418,16 +434,12 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace
if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil {
log.WithError(getErr).Warn("Failed to get dataupload on failure")
} else {
if _, errOut := r.errorOut(ctx, &du, err, "data path backup failed", log); err != nil {
log.WithError(err).Warnf("Failed to patch dataupload with err %v", errOut)
}
r.errorOut(ctx, &du, err, "data path backup failed", log)
}
}
func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) {
defer func() {
go r.closeDataPath(ctx, duName)
}()
defer r.dataPathMgr.RemoveAsyncBR(duName)
log := r.logger.WithField("dataupload", duName)
@@ -453,17 +465,19 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
}
}
// TryCancelDataUpload clear up resources only when update success
func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) {
func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) {
log := r.logger.WithField("dataupload", du.Name)
log.Warn("Async fs backup data path canceled")
log.Warn("Accepted data upload is canceled")
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled
if dataUpload.Status.StartTimestamp.IsZero() {
dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
dataUpload.Status.Message = message
if message != "" {
dataUpload.Status.Message = message
}
})
if err != nil {
@@ -478,7 +492,6 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele
r.metrics.RegisterDataUploadCancel(r.nodeName)
// cleans up any objects generated during the snapshot expose
r.cleanUp(ctx, du, log)
r.closeDataPath(ctx, du.Name)
}
func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) {
@@ -692,7 +705,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a
}
se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace)
} else {
err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType)
log.Warnf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType)
}
return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log)

View File

@@ -306,20 +306,16 @@ type fakeDataUploadFSBR struct {
du *velerov2alpha1api.DataUpload
kubeClient kbclient.Client
clock clock.WithTickerAndDelayedExecution
initErr error
startErr error
}
func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error {
return nil
return f.initErr
}
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
du := f.du
original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
du.Status.CompletionTimestamp = &metav1.Time{Time: f.clock.Now()}
f.kubeClient.Patch(context.Background(), du, kbclient.MergeFrom(original))
return nil
return f.startErr
}
func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error {
@@ -348,27 +344,24 @@ func TestReconcile(t *testing.T) {
needErrs []bool
peekErr error
notCreateFSBR bool
fsBRInitErr error
fsBRStartErr error
}{
{
name: "Dataupload is not initialized",
du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(),
expectedProcessed: false,
expected: nil,
expectedRequeue: ctrl.Result{},
name: "Dataupload is not initialized",
du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(),
expectedRequeue: ctrl.Result{},
}, {
name: "Error get Dataupload",
du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(),
expectedProcessed: false,
expected: nil,
expectedRequeue: ctrl.Result{},
expectedErrMsg: "getting DataUpload: Get error",
needErrs: []bool{true, false, false, false},
name: "Error get Dataupload",
du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "getting DataUpload: Get error",
needErrs: []bool{true, false, false, false},
}, {
name: "Unsupported data mover type",
du: dataUploadBuilder().DataMover("unknown type").Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase("").Result(),
expectedRequeue: ctrl.Result{},
name: "Unsupported data mover type",
du: dataUploadBuilder().DataMover("unknown type").Result(),
expected: dataUploadBuilder().Phase("").Result(),
expectedRequeue: ctrl.Result{},
}, {
name: "Unknown type of snapshot exposer is not initialized",
du: dataUploadBuilder().SnapshotType("unknown type").Result(),
@@ -377,13 +370,12 @@ func TestReconcile(t *testing.T) {
expectedRequeue: ctrl.Result{},
expectedErrMsg: "unknown type type of snapshot exposer is not exist",
}, {
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should fail to get PVC information",
@@ -395,34 +387,31 @@ func TestReconcile(t *testing.T) {
expectedErrMsg: "failed to get PVC",
},
{
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedRequeue: ctrl.Result{},
}, {
name: "Dataupload prepared should be completed",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCompleted).Result(),
expectedRequeue: ctrl.Result{},
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload with not enabled cancel",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
name: "Dataupload prepared should be completed",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should be cancel",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(),
expectedRequeue: ctrl.Result{},
name: "Dataupload with not enabled cancel",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should be cancel",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should be cancel with match node",
@@ -445,19 +434,43 @@ func TestReconcile(t *testing.T) {
du.Status.Node = "different_node"
return du
}(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
notCreateFSBR: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
notCreateFSBR: true,
},
{
name: "runCancelableDataUpload is concurrent limited",
dataMgr: datapath.NewManager(0),
name: "runCancelableDataUpload is concurrent limited",
dataMgr: datapath.NewManager(0),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path init error",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
fsBRInitErr: errors.New("fake-data-path-init-error"),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(),
expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error",
},
{
name: "Unable to update status to in progress for data download",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
needErrs: []bool{false, false, false, true},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "data path start error",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
fsBRStartErr: errors.New("fake-data-path-start-error"),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(),
expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error",
},
{
name: "prepare timeout",
@@ -480,7 +493,6 @@ func TestReconcile(t *testing.T) {
du.DeletionTimestamp = &metav1.Time{Time: time.Now()}
return du
}(),
expectedProcessed: false,
checkFunc: func(du velerov2alpha1api.DataUpload) bool {
return du.Spec.Cancel
},
@@ -496,7 +508,6 @@ func TestReconcile(t *testing.T) {
du.DeletionTimestamp = &metav1.Time{Time: time.Now()}
return du
}(),
expectedProcessed: false,
checkFunc: func(du velerov2alpha1api.DataUpload) bool {
return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer)
},
@@ -555,12 +566,16 @@ func TestReconcile(t *testing.T) {
du: test.du,
kubeClient: r.client,
clock: r.Clock,
initErr: test.fsBRInitErr,
startErr: test.fsBRStartErr,
}
}
}
testCreateFsBR := false
if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil {
testCreateFsBR = true
_, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger())
require.NoError(t, err)
}
@@ -605,6 +620,11 @@ func TestReconcile(t *testing.T) {
if test.checkFunc != nil {
assert.True(t, test.checkFunc(du))
}
if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress {
assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name))
}
})
}
}
@@ -926,7 +946,7 @@ func TestTryCancelDataUpload(t *testing.T) {
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)
r.TryCancelDataUpload(ctx, test.dd, "")
r.tryCancelAcceptedDataUpload(ctx, test.dd, "")
if test.expectedErr == "" {
assert.NoError(t, err)

View File

@@ -127,15 +127,13 @@ func (r *BackupMicroService) Init() error {
return err
}
var waitControllerTimeout time.Duration = time.Minute * 2
func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
"dataupload": r.dataUploadName,
})
du := &velerov2alpha1api.DataUpload{}
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) {
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.dataUploadName,
@@ -241,8 +239,6 @@ func (r *BackupMicroService) Shutdown() {
var funcMarshal = json.Marshal
func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) {
defer r.closeDataPath(ctx, duName)
log := r.logger.WithField("dataupload", duName)
backupBytes, err := funcMarshal(result.Backup)
@@ -262,8 +258,6 @@ func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespac
}
func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
defer r.closeDataPath(ctx, duName)
log := r.logger.WithField("dataupload", duName)
log.WithError(err).Error("Async fs backup data path failed")
@@ -274,8 +268,6 @@ func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace s
}
func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) {
defer r.closeDataPath(ctx, duName)
log := r.logger.WithField("dataupload", duName)
log.Warn("Async fs backup data path canceled")
@@ -296,8 +288,6 @@ func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace
return
}
log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes))
r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes))
}
@@ -313,7 +303,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) {
func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) {
r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled")
r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name)
r.eventRecorder.Event(du, false, datapath.EventReasonCancelling, "Canceling for data upload %s", du.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup == nil {

View File

@@ -301,12 +301,12 @@ func TestRunCancelableDataPath(t *testing.T) {
}{
{
name: "no du",
ctx: context.Background(),
ctx: ctxTimeout,
expectedErr: "error waiting for du: context deadline exceeded",
},
{
name: "du not in in-progress",
ctx: context.Background(),
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{du},
expectedErr: "error waiting for du: context deadline exceeded",
},
@@ -412,8 +412,6 @@ func TestRunCancelableDataPath(t *testing.T) {
return fsBR
}
waitControllerTimeout = time.Second
if test.result != nil {
go func() {
time.Sleep(time.Millisecond * 500)

View File

@@ -122,7 +122,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
})
dd := &velerov2alpha1api.DataDownload{}
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) {
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.dataDownloadName,
@@ -214,8 +214,6 @@ func (r *RestoreMicroService) Shutdown() {
}
func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
restoreBytes, err := funcMarshal(result.Restore)
@@ -235,8 +233,6 @@ func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, names
}
func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.WithError(err).Error("Async fs restore data path failed")
@@ -247,8 +243,6 @@ func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespac
}
func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer r.closeDataPath(ctx, ddName)
log := r.logger.WithField("datadownload", ddName)
log.Warn("Async fs restore data path canceled")
@@ -284,7 +278,7 @@ func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string)
func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) {
r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled")
r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name)
r.eventRecorder.Event(dd, false, datapath.EventReasonCancelling, "Canceling for data download %s", dd.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsBackup == nil {

View File

@@ -254,12 +254,12 @@ func TestRunCancelableRestore(t *testing.T) {
}{
{
name: "no dd",
ctx: context.Background(),
ctx: ctxTimeout,
expectedErr: "error waiting for dd: context deadline exceeded",
},
{
name: "dd not in in-progress",
ctx: context.Background(),
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{dd},
expectedErr: "error waiting for dd: context deadline exceeded",
},
@@ -365,8 +365,6 @@ func TestRunCancelableRestore(t *testing.T) {
return fsBR
}
waitControllerTimeout = time.Second
if test.result != nil {
go func() {
time.Sleep(time.Millisecond * 500)

View File

@@ -18,6 +18,7 @@ package datapath
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -66,6 +67,8 @@ type fileSystemBR struct {
callbacks Callbacks
jobName string
requestorType string
wgDataPath sync.WaitGroup
dataPathLock sync.Mutex
}
func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR {
@@ -75,6 +78,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client,
client: client,
namespace: namespace,
callbacks: callbacks,
wgDataPath: sync.WaitGroup{},
log: log,
}
@@ -134,6 +138,23 @@ func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error {
}
func (fs *fileSystemBR) Close(ctx context.Context) {
if fs.cancel != nil {
fs.cancel()
}
fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR")
fs.wgDataPath.Wait()
fs.close(ctx)
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}
func (fs *fileSystemBR) close(ctx context.Context) {
fs.dataPathLock.Lock()
defer fs.dataPathLock.Unlock()
if fs.uploaderProv != nil {
if err := fs.uploaderProv.Close(ctx); err != nil {
fs.log.Errorf("failed to close uploader provider with error %v", err)
@@ -141,13 +162,6 @@ func (fs *fileSystemBR) Close(ctx context.Context) {
fs.uploaderProv = nil
}
if fs.cancel != nil {
fs.cancel()
fs.cancel = nil
}
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}
func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
@@ -155,9 +169,18 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
return errors.New("file system data path is not initialized")
}
fs.wgDataPath.Add(1)
backupParam := param.(*FSBRStartParam)
go func() {
fs.log.Info("Start data path backup")
defer func() {
fs.close(context.Background())
fs.wgDataPath.Done()
}()
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)
@@ -182,7 +205,16 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
return errors.New("file system data path is not initialized")
}
fs.wgDataPath.Add(1)
go func() {
fs.log.Info("Start data path restore")
defer func() {
fs.close(context.Background())
fs.wgDataPath.Done()
}()
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
if err == provider.ErrorCanceled {

View File

@@ -96,6 +96,7 @@ func TestAsyncBackup(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
@@ -179,6 +180,7 @@ func TestAsyncRestore(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks

View File

@@ -46,11 +46,12 @@ const (
ErrCancelled = "data path is canceled"
EventReasonStarted = "Data-Path-Started"
EventReasonCompleted = "Data-Path-Completed"
EventReasonFailed = "Data-Path-Failed"
EventReasonCancelled = "Data-Path-Canceled"
EventReasonProgress = "Data-Path-Progress"
EventReasonStarted = "Data-Path-Started"
EventReasonCompleted = "Data-Path-Completed"
EventReasonFailed = "Data-Path-Failed"
EventReasonCancelled = "Data-Path-Canceled"
EventReasonProgress = "Data-Path-Progress"
EventReasonCancelling = "Data-Path-Canceling"
)
type microServiceBRWatcher struct {
@@ -76,6 +77,7 @@ type microServiceBRWatcher struct {
podInformer ctrlcache.Informer
eventHandler cache.ResourceEventHandlerRegistration
podHandler cache.ResourceEventHandlerRegistration
watcherLock sync.Mutex
}
func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string,
@@ -121,8 +123,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er
return
}
ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject)
ms.eventCh <- evt
},
UpdateFunc: func(_, obj interface{}) {
@@ -131,8 +131,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er
return
}
ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject)
ms.eventCh <- evt
},
},
@@ -177,12 +175,9 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er
}
}()
ms.log.WithFields(
logrus.Fields{
"taskType": ms.taskType,
"taskName": ms.taskName,
"thisPod": ms.thisPod,
}).Info("MicroServiceBR is initialized")
if err := ms.reEnsureThisPod(ctx); err != nil {
return err
}
ms.eventInformer = eventInformer
ms.podInformer = podInformer
@@ -191,63 +186,73 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er
ms.ctx, ms.cancel = context.WithCancel(ctx)
ms.log.WithFields(
logrus.Fields{
"taskType": ms.taskType,
"taskName": ms.taskName,
"thisPod": ms.thisPod,
}).Info("MicroServiceBR is initialized")
succeeded = true
return nil
}
func (ms *microServiceBRWatcher) Close(ctx context.Context) {
if ms.cancel != nil {
ms.cancel()
ms.cancel = nil
}
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR")
ms.wgWatcher.Wait()
if ms.eventInformer != nil && ms.eventHandler != nil {
if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove event handler")
}
}
if ms.podInformer != nil && ms.podHandler != nil {
if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove pod handler")
}
}
ms.close()
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed")
}
func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
ms.log.Infof("Start watching backup ms for source %v", source)
func (ms *microServiceBRWatcher) close() {
ms.watcherLock.Lock()
defer ms.watcherLock.Unlock()
if err := ms.reEnsureThisPod(); err != nil {
return err
if ms.eventHandler != nil {
if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove event handler")
}
ms.eventHandler = nil
}
if ms.podHandler != nil {
if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove pod handler")
}
ms.podHandler = nil
}
}
func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
ms.log.Infof("Start watching backup ms for source %v", source.ByPath)
ms.startWatch()
return nil
}
func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error {
ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID)
if err := ms.reEnsureThisPod(); err != nil {
return err
}
ms.log.Infof("Start watching restore ms to target %s, from snapshot %s", target.ByPath, snapshotID)
ms.startWatch()
return nil
}
func (ms *microServiceBRWatcher) reEnsureThisPod() error {
func (ms *microServiceBRWatcher) reEnsureThisPod(ctx context.Context) error {
thisPod := &v1.Pod{}
if err := ms.client.Get(ms.ctx, types.NamespacedName{
if err := ms.client.Get(ctx, types.NamespacedName{
Namespace: ms.namespace,
Name: ms.thisPod,
}, thisPod); err != nil {
@@ -275,6 +280,11 @@ func (ms *microServiceBRWatcher) startWatch() {
go func() {
ms.log.Info("Start watching data path pod")
defer func() {
ms.close()
ms.wgWatcher.Done()
}()
var lastPod *v1.Pod
watchLoop:
@@ -291,14 +301,16 @@ func (ms *microServiceBRWatcher) startWatch() {
}
if lastPod == nil {
ms.log.Warn("Data path pod watch loop is canceled")
ms.wgWatcher.Done()
ms.log.Warn("Watch loop is cancelled on waiting data path pod")
return
}
epilogLoop:
for !ms.startedFromEvent || !ms.terminatedFromEvent {
select {
case <-ms.ctx.Done():
ms.log.Warn("Watch loop is cancelled on waiting final event")
return
case <-time.After(eventWaitTimeout):
break epilogLoop
case evt := <-ms.eventCh:
@@ -339,8 +351,6 @@ func (ms *microServiceBRWatcher) startWatch() {
}
logger.Info("Complete callback on data path pod termination")
ms.wgWatcher.Done()
}()
}
@@ -348,20 +358,22 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) {
switch evt.Reason {
case EventReasonStarted:
ms.startedFromEvent = true
ms.log.Infof("Received data path start message %s", evt.Message)
ms.log.Infof("Received data path start message: %s", evt.Message)
case EventReasonProgress:
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log))
case EventReasonCompleted:
ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
ms.terminatedFromEvent = true
case EventReasonCancelled:
ms.log.Infof("Received data path canceled message %s", evt.Message)
ms.log.Infof("Received data path canceled message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonFailed:
ms.log.Infof("Received data path failed message %s", evt.Message)
ms.log.Infof("Received data path failed message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonCancelling:
ms.log.Infof("Received data path canceling message: %s", evt.Message)
default:
ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message)
ms.log.Infof("Received event for data path %s,reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message)
}
}

View File

@@ -102,7 +102,7 @@ func TestReEnsureThisPod(t *testing.T) {
log: velerotest.NewLogger(),
}
err := ms.reEnsureThisPod()
err := ms.reEnsureThisPod(context.Background())
if test.expectErr != "" {
assert.EqualError(t, err, test.expectErr)
} else {