Merge pull request #8594 from Lyndon-Li/data-mover-restore-for-windows

Data mover restore for Windows
This commit is contained in:
lyndon-li
2025-01-13 13:04:29 +08:00
committed by GitHub
19 changed files with 294 additions and 102 deletions

View File

@@ -183,28 +183,15 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}
hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(ctx, r.kubeClient, dd.Namespace, k, kube.NodeOSLinux); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
}
exposeParam, err := r.setupExposeParam(dd)
if err != nil {
return r.errorOut(ctx, dd, err, "failed to set exposer parameters", log)
}
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposer.GenericRestoreExposeParam{
TargetPVCName: dd.Spec.TargetVolume.PVC,
SourceNamespace: dd.Spec.TargetVolume.Namespace,
HostingPodLabels: hostingPodLabels,
Resources: r.podResources,
ExposeTimeout: dd.Spec.OperationTimeout.Duration,
RestorePVCConfig: r.restorePVCConfig,
})
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), exposeParam)
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
@@ -243,7 +230,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a datadownload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.AcceptedTimestamp != nil {
if time.Since(dd.Status.AcceptedTimestamp.Time) >= r.preparingTimeout {
@@ -737,6 +724,42 @@ func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName strin
r.dataPathMgr.RemoveAsyncBR(ddName)
}
func (r *DataDownloadReconciler) setupExposeParam(dd *velerov2alpha1api.DataDownload) (exposer.GenericRestoreExposeParam, error) {
log := r.logger.WithField("datadownload", dd.Name)
nodeOS := string(dd.Spec.NodeOS)
if nodeOS == "" {
log.Info("nodeOS is empty in DD, fallback to linux")
nodeOS = kube.NodeOSLinux
}
if err := kube.HasNodeWithOS(context.Background(), nodeOS, r.kubeClient.CoreV1()); err != nil {
return exposer.GenericRestoreExposeParam{}, errors.Wrapf(err, "no appropriate node to run datadownload %s/%s", dd.Namespace, dd.Name)
}
hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, dd.Namespace, k, nodeOS); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
}
}
return exposer.GenericRestoreExposeParam{
TargetPVCName: dd.Spec.TargetVolume.PVC,
TargetNamespace: dd.Spec.TargetVolume.Namespace,
HostingPodLabels: hostingPodLabels,
Resources: r.podResources,
OperationTimeout: dd.Spec.OperationTimeout.Duration,
ExposeTimeout: r.preparingTimeout,
NodeOS: nodeOS,
RestorePVCConfig: r.restorePVCConfig,
}, nil
}
func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectReference {
return v1.ObjectReference{
Kind: dd.Kind,

View File

@@ -53,6 +53,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks"
)
@@ -67,7 +68,7 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
PV: "test-pv",
PVC: "test-pvc",
Namespace: "test-ns",
})
}).NodeOS(velerov2alpha1api.NodeOS("linux"))
}
func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
@@ -167,6 +168,8 @@ func TestDataDownloadReconcile(t *testing.T) {
},
}
node := builder.ForNode("fake-node").Labels(map[string]string{kube.NodeOSLabel: kube.NodeOSLinux}).Result()
tests := []struct {
name string
dd *velerov2alpha1api.DataDownload
@@ -326,9 +329,15 @@ func TestDataDownloadReconcile(t *testing.T) {
},
{
name: "Restore is exposed",
dd: dataDownloadBuilder().Result(),
dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSLinux).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
},
{
name: "Expected node doesn't exist",
dd: dataDownloadBuilder().NodeOS(velerov2alpha1api.NodeOSWindows).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectedStatusMsg: "no appropriate node to run datadownload",
},
{
name: "Get empty restore exposer",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
@@ -388,9 +397,9 @@ func TestDataDownloadReconcile(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var objs []runtime.Object
objs := []runtime.Object{daemonSet, node}
if test.targetPVC != nil {
objs = []runtime.Object{test.targetPVC, daemonSet}
objs = append(objs, test.targetPVC)
}
r, err := initDataDownloadReconciler(objs, test.needErrs...)
require.NoError(t, err)

View File

@@ -285,6 +285,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}
if res.ByPod.NodeOS == nil {
return r.errorOut(ctx, du, errors.New("unsupported ambiguous node OS"), "invalid expose result", log)
}
log.Info("Exposed snapshot is ready and creating data path routine")
// Need to first create file system BR and get data path instance then update data upload status
@@ -317,6 +321,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS)
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name)
@@ -792,6 +797,8 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string)
}
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
log := r.logger.WithField("dataupload", du.Name)
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
pvc := &corev1.PersistentVolumeClaim{}
err := r.client.Get(context.Background(), types.NamespacedName{
@@ -803,7 +810,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}
nodeOS, err := kube.GetPVCAttachingNodeOS(pvc, r.kubeClient.CoreV1(), r.kubeClient.StorageV1(), r.logger)
nodeOS, err := kube.GetPVCAttachingNodeOS(pvc, r.kubeClient.CoreV1(), r.kubeClient.StorageV1(), log)
if err != nil {
return nil, errors.Wrapf(err, "failed to get attaching node OS for PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}
@@ -821,7 +828,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, du.Namespace, k, nodeOS); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
r.logger.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
} else {
hostingPodLabels[k] = v
@@ -843,6 +850,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
NodeOS: nodeOS,
}, nil
}
return nil, nil
}

View File

@@ -166,6 +166,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
RestoreSize: &restoreSize,
},
}
daemonSet := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
@@ -265,9 +266,10 @@ func dataUploadBuilder() *builder.DataUploadBuilder {
}
type fakeSnapshotExposer struct {
kubeClient kbclient.Client
clock clock.WithTickerAndDelayedExecution
peekErr error
kubeClient kbclient.Client
clock clock.WithTickerAndDelayedExecution
ambiguousNodeOS bool
peekErr error
}
func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param interface{}) error {
@@ -296,7 +298,13 @@ func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1.ObjectRe
if err != nil {
return nil, err
}
return &exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: pod, VolumeName: dataUploadName}}, nil
nodeOS := "linux"
pNodeOS := &nodeOS
if f.ambiguousNodeOS {
pNodeOS = nil
}
return &exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: pod, VolumeName: dataUploadName, NodeOS: pNodeOS}}, nil
}
func (f *fakeSnapshotExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error {
@@ -350,6 +358,8 @@ func TestReconcile(t *testing.T) {
expectedRequeue ctrl.Result
expectedErrMsg string
needErrs []bool
removeNode bool
ambiguousNodeOS bool
peekErr error
notCreateFSBR bool
fsBRInitErr error
@@ -359,25 +369,29 @@ func TestReconcile(t *testing.T) {
name: "Dataupload is not initialized",
du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(),
expectedRequeue: ctrl.Result{},
}, {
},
{
name: "Error get Dataupload",
du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "getting DataUpload: Get error",
needErrs: []bool{true, false, false, false},
}, {
},
{
name: "Unsupported data mover type",
du: dataUploadBuilder().DataMover("unknown type").Result(),
expected: dataUploadBuilder().Phase("").Result(),
expectedRequeue: ctrl.Result{},
}, {
},
{
name: "Unknown type of snapshot exposer is not initialized",
du: dataUploadBuilder().SnapshotType("unknown type").Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "unknown type type of snapshot exposer is not exist",
}, {
},
{
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
@@ -394,6 +408,27 @@ func TestReconcile(t *testing.T) {
expectedRequeue: ctrl.Result{},
expectedErrMsg: "failed to get PVC",
},
{
name: "Dataupload should fail to get PVC attaching node",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").StorageClass("fake-sc").Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "error to get storage class",
},
{
name: "Dataupload should fail because expected node doesn't exist",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
removeNode: true,
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "no appropriate node to run data upload",
},
{
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
@@ -407,6 +442,15 @@ func TestReconcile(t *testing.T) {
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should fail if expose returns ambiguous nodeOS",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(),
ambiguousNodeOS: true,
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedErrMsg: "unsupported ambiguous node OS",
},
{
name: "Dataupload with not enabled cancel",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
@@ -557,6 +601,11 @@ func TestReconcile(t *testing.T) {
require.NoError(t, err)
}
if test.removeNode {
err = r.kubeClient.CoreV1().Nodes().Delete(ctx, "fake-node", metav1.DeleteOptions{})
require.NoError(t, err)
}
if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
@@ -564,7 +613,7 @@ func TestReconcile(t *testing.T) {
}
if test.du.Spec.SnapshotType == fakeSnapshotType {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.peekErr}}
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr}}
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())}
}