Merge branch 'main' into data-mover-ms-node-agent-resume

Signed-off-by: lyndon-li <98304688+Lyndon-Li@users.noreply.github.com>
This commit is contained in:
lyndon-li
2024-08-07 17:13:35 +08:00
committed by GitHub
12 changed files with 101 additions and 103 deletions

View File

@@ -257,8 +257,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup != nil {
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
if asyncBR != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
@@ -281,7 +281,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
OnProgress: r.OnDataUploadProgress,
}
fsBackup, err = r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup,
du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
@@ -299,7 +300,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
log.Info("Data upload is marked as in progress")
result, err := r.runCancelableDataUpload(ctx, fsBackup, du, res, log)
result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err)
r.closeDataPath(ctx, du.Name)
@@ -310,8 +311,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if du.Spec.Cancel {
log.Info("Data upload is being canceled")
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup == nil {
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
if asyncBR == nil {
if du.Status.Node == r.nodeName {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
} else {
@@ -327,7 +328,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log.WithError(err).Error("error updating data upload into canceling status")
return ctrl.Result{}, err
}
fsBackup.Cancel()
asyncBR.Cancel()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
@@ -347,49 +348,29 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, du, err, "error to initialize asyncBR", log)
}
log.WithField("path", path.ByPath).Debug("Found host path")
log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: du.Spec.BackupStorageLocation,
SourceNamespace: du.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(du.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, du, err, "error to initialize data path", log)
if err := asyncBR.StartBackup(datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, du.Spec.DataMoverConfig, nil); err != nil {
return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
}
log.WithField("path", path.ByPath).Info("fs init")
tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}
if err := fsBackup.StartBackup(path, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
RealSource: datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}
log.WithField("path", path.ByPath).Info("Async fs backup data path started")
log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
}
func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) {
defer r.closeDataPath(ctx, duName)
defer func() {
go r.closeDataPath(ctx, duName)
}()
log := r.logger.WithField("dataupload", duName)
@@ -433,7 +414,9 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
}
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) {
defer r.closeDataPath(ctx, duName)
defer func() {
go r.closeDataPath(ctx, duName)
}()
log := r.logger.WithField("dataupload", duName)
@@ -450,7 +433,9 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace
}
func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) {
defer r.closeDataPath(ctx, duName)
defer func() {
go r.closeDataPath(ctx, duName)
}()
log := r.logger.WithField("dataupload", duName)
@@ -625,7 +610,7 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Info("Exposed pod is in abnormal status and dataupload is marked as cancel")
log.Infof("Exposed pod is in abnormal status(reason %s) and dataupload is marked as cancel", reason)
} else {
return []reconcile.Request{}
}
@@ -768,9 +753,9 @@ func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du
}
func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(duName)
if fsBackup != nil {
fsBackup.Close(ctx)
asyncBR := r.dataPathMgr.GetAsyncBR(duName)
if asyncBR != nil {
asyncBR.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(duName)