data mover ms new controller

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2024-07-31 11:52:26 +08:00
parent 7b26673b29
commit d48e9762eb
10 changed files with 97 additions and 101 deletions

View File

@@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -67,6 +68,7 @@ type DataUploadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
csiSnapshotClient snapshotter.SnapshotV1Interface
mgr manager.Manager
repoEnsurer *repository.Ensurer
Clock clocks.WithTickerAndDelayedExecution
credentialGetter *credentials.CredentialGetter
@@ -80,11 +82,12 @@ type DataUploadReconciler struct {
metrics *metrics.ServerMetrics
}
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
mgr: mgr,
kubeClient: kubeClient,
csiSnapshotClient: csiSnapshotClient,
Clock: clock,
@@ -246,8 +249,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup != nil {
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
if asyncBR != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
@@ -270,7 +273,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
OnProgress: r.OnDataUploadProgress,
}
fsBackup, err = r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup,
du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
@@ -288,7 +292,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
log.Info("Data upload is marked as in progress")
result, err := r.runCancelableDataUpload(ctx, fsBackup, du, res, log)
result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err)
r.closeDataPath(ctx, du.Name)
@@ -299,8 +303,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if du.Spec.Cancel {
log.Info("Data upload is being canceled")
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup == nil {
asyncBR := r.dataPathMgr.GetAsyncBR(du.Name)
if asyncBR == nil {
if du.Status.Node == r.nodeName {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
} else {
@@ -316,7 +320,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log.WithError(err).Error("error updating data upload into canceling status")
return ctrl.Result{}, err
}
fsBackup.Cancel()
asyncBR.Cancel()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
@@ -336,44 +340,22 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, du, err, "error to initialize asyncBR", log)
}
log.WithField("path", path.ByPath).Debug("Found host path")
log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: du.Spec.BackupStorageLocation,
SourceNamespace: du.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(du.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, du, err, "error to initialize data path", log)
if err := asyncBR.StartBackup(datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, du.Spec.DataMoverConfig, nil); err != nil {
return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
}
log.WithField("path", path.ByPath).Info("fs init")
tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}
if err := fsBackup.StartBackup(path, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
RealSource: datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}
log.WithField("path", path.ByPath).Info("Async fs backup data path started")
log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
}
@@ -608,7 +590,7 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Info("Exposed pod is in abnormal status and dataupload is marked as cancel")
log.Infof("Exposed pod is in abnormal status(reason %s) and dataupload is marked as cancel", reason)
} else {
return []reconcile.Request{}
}
@@ -820,9 +802,9 @@ func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du
}
func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(duName)
if fsBackup != nil {
fsBackup.Close(ctx)
asyncBR := r.dataPathMgr.GetAsyncBR(duName)
if asyncBR != nil {
asyncBR.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(duName)