diff --git a/changelogs/unreleased/7437-Lyndon-Li b/changelogs/unreleased/7437-Lyndon-Li new file mode 100644 index 000000000..9e75bc58b --- /dev/null +++ b/changelogs/unreleased/7437-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #7036. Add the implementation of node selection for data mover backups \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index c1bad203e..183403178 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -138,6 +138,7 @@ type nodeAgentServer struct { kubeClient kubernetes.Interface csiSnapshotClient *snapshotv1client.Clientset dataPathMgr *datapath.Manager + dataPathConfigs *nodeagent.Configs } func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { @@ -226,8 +227,8 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi return nil, err } - dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum) - s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum) + s.getDataPathConfigs() + s.dataPathMgr = datapath.NewManager(s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)) return s, nil } @@ -284,7 +285,11 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + var loadAffinity *nodeagent.LoadAffinity + if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { + loadAffinity = s.dataPathConfigs.LoadAffinity[0] + } + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.attemptDataUploadResume(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") @@ -454,13 +459,24 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { var getConfigsFunc = nodeagent.GetConfigs -func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { +func (s *nodeAgentServer) getDataPathConfigs() { configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient) if err != nil { s.logger.WithError(err).Warn("Failed to get node agent configs") - return defaultNum + return } + if configs == nil { + s.logger.Infof("Node agent configs are not found") + return + } + + s.dataPathConfigs = configs +} + +func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { + configs := s.dataPathConfigs + if configs == nil || configs.LoadConcurrency == nil { s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum) return defaultNum diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index d062a7186..187cc6dc0 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -114,6 +114,64 @@ func Test_validatePodVolumesHostPath(t *testing.T) { } } +func Test_getDataPathConfigs(t *testing.T) { + configs := &nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: -1, + }, + } + + tests := []struct { + name string + getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) + expectConfigs *nodeagent.Configs + expectLog string + }{ + { + name: "failed to get configs", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, errors.New("fake-get-error") + }, + expectLog: "Failed to get node agent configs", + }, + { + name: "configs cm not found", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, nil + }, + expectLog: "Node agent configs are not found", + }, + + { + name: "succeed", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return configs, nil + }, + expectConfigs: configs, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logBuffer := "" + + s := &nodeAgentServer{ + logger: testutil.NewSingleLogger(&logBuffer), + } + + getConfigsFunc = test.getFunc + + s.getDataPathConfigs() + assert.Equal(t, test.expectConfigs, s.dataPathConfigs) + if test.expectLog == "" { + assert.Equal(t, "", logBuffer) + } else { + assert.True(t, strings.Contains(logBuffer, test.expectLog)) + } + }) + } +} + func Test_getDataPathConcurrentNum(t *testing.T) { defaultNum := 100001 globalNum := 6 @@ -142,72 +200,47 @@ func Test_getDataPathConcurrentNum(t *testing.T) { tests := []struct { name string - getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) + configs nodeagent.Configs setKubeClient bool kubeClientObj []runtime.Object expectNum int expectLog string }{ { - name: "failed to get configs", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return nil, errors.New("fake-get-error") - }, - expectLog: "Failed to get node agent configs", - expectNum: defaultNum, - }, - { - name: "configs cm not found", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return nil, nil - }, - expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), - expectNum: defaultNum, - }, - { - name: "configs cm's data path concurrency is nil", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{}, nil - }, + name: "configs cm's data path concurrency is nil", expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), expectNum: defaultNum, }, { name: "global number is invalid", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: -1, - }, - }, nil + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: -1, + }, }, expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum), expectNum: defaultNum, }, { name: "global number is valid", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - }, - }, nil + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + }, }, expectNum: globalNum, }, { name: "node is not found", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - Number: 100, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + Number: 100, }, }, - }, nil + }, }, setKubeClient: true, expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum), @@ -215,18 +248,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "failed to get selector", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: invalidLabelSelector, - Number: 100, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: invalidLabelSelector, + Number: 100, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node1}, @@ -235,18 +266,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "rule number is invalid", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: -1, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node1}, @@ -255,18 +284,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "label doesn't match", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: -1, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node1}, @@ -275,18 +302,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "match one rule", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: 66, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node2}, @@ -295,22 +320,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "match multiple rules", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: 66, - }, - { - NodeSelector: validLabelSelector2, - Number: 36, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, + }, + { + NodeSelector: validLabelSelector2, + Number: 36, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node2}, @@ -319,22 +342,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "match multiple rules 2", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: 36, - }, - { - NodeSelector: validLabelSelector2, - Number: 66, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 36, + }, + { + NodeSelector: validLabelSelector2, + Number: 66, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node2}, @@ -349,16 +370,15 @@ func Test_getDataPathConcurrentNum(t *testing.T) { logBuffer := "" s := &nodeAgentServer{ - nodeName: nodeName, - logger: testutil.NewSingleLogger(&logBuffer), + nodeName: nodeName, + dataPathConfigs: &test.configs, + logger: testutil.NewSingleLogger(&logBuffer), } if test.setKubeClient { s.kubeClient = fakeKubeClient } - getConfigsFunc = test.getFunc - num := s.getDataPathConcurrentNum(defaultNum) assert.Equal(t, test.expectNum, num) if test.expectLog == "" { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index f5f283ab5..68b8a17ed 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -214,7 +214,10 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.TryCancelDataDownload(ctx, dd) + r.TryCancelDataDownload(ctx, dd, "") + } else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { + r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) + log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) } else if dd.Status.StartTimestamp != nil { if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { r.onPrepareTimeout(ctx, dd) @@ -418,7 +421,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } } -func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) { +func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { log := r.logger.WithField("datadownload", dd.Name) log.Warn("Async fs backup data path canceled") @@ -428,6 +431,7 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + dataDownload.Status.Message = message }) if err != nil { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index b0abc7a39..1e7268fca 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -174,6 +174,7 @@ func TestDataDownloadReconcile(t *testing.T) { needCreateFSBR bool isExposeErr bool isGetExposeErr bool + isPeekExposeErr bool isNilExposer bool isFSBRInitErr bool isFSBRRestoreErr bool @@ -302,6 +303,12 @@ func TestDataDownloadReconcile(t *testing.T) { dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), }, + { + name: "peek error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + isPeekExposeErr: true, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), + }, { name: "dataDownload with enabled cancel", dd: func() *velerov2alpha1api.DataDownload { @@ -369,7 +376,7 @@ func TestDataDownloadReconcile(t *testing.T) { return fsBR } - if test.isExposeErr || test.isGetExposeErr || test.isNilExposer || test.notNilExpose { + if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose { if test.isNilExposer { r.restoreExposer = nil } else { @@ -383,6 +390,8 @@ func TestDataDownloadReconcile(t *testing.T) { ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil) } else if test.isGetExposeErr { ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer")) + } else if test.isPeekExposeErr { + ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error")) } if !test.notMockCleanUp { @@ -801,7 +810,7 @@ func TestTryCancelDataDownload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataDownload(ctx, test.dd) + r.TryCancelDataDownload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index de476e9af..c2bfd368c 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -48,6 +48,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" @@ -74,12 +75,13 @@ type DataUploadReconciler struct { logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager + loadAffinity *nodeagent.LoadAffinity preparingTimeout time.Duration metrics *metrics.ServerMetrics } func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, + dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, @@ -93,6 +95,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: dataPathMgr, + loadAffinity: loadAffinity, preparingTimeout: preparingTimeout, metrics: metrics, } @@ -224,7 +227,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action // we could retry when the CR requeue in periodcally log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) - r.TryCancelDataUpload(ctx, du) + r.TryCancelDataUpload(ctx, du, "") + } else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { + r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) + log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) } else if du.Status.StartTimestamp != nil { if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { r.onPrepareTimeout(ctx, du) @@ -440,7 +446,7 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } // TryCancelDataUpload clear up resources only when update success -func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) { +func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { log := r.logger.WithField("dataupload", du.Name) log.Warn("Async fs backup data path canceled") succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { @@ -449,6 +455,7 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + dataUpload.Status.Message = message }) if err != nil { @@ -825,6 +832,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload OperationTimeout: du.Spec.OperationTimeout.Duration, ExposeTimeout: r.preparingTimeout, VolumeSize: pvc.Spec.Resources.Requests[corev1.ResourceStorage], + Affinity: r.loadAffinity, }, nil } return nil, nil diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index eb440c768..11b76f5f7 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -232,7 +232,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci if err != nil { return nil, err } - return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, + return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } @@ -252,6 +252,7 @@ func dataUploadBuilder() *builder.DataUploadBuilder { type fakeSnapshotExposer struct { kubeClient kbclient.Client clock clock.WithTickerAndDelayedExecution + peekErr error } func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param interface{}) error { @@ -283,6 +284,10 @@ func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1.ObjectRe return &exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: pod, VolumeName: dataUploadName}}, nil } +func (f *fakeSnapshotExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error { + return f.peekErr +} + func (f *fakeSnapshotExposer) CleanUp(context.Context, corev1.ObjectReference, string, string) { } @@ -330,6 +335,7 @@ func TestReconcile(t *testing.T) { expectedRequeue ctrl.Result expectedErrMsg string needErrs []bool + peekErr error }{ { name: "Dataupload is not initialized", @@ -420,6 +426,13 @@ func TestReconcile(t *testing.T) { du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), }, + { + name: "peek error", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result(), + peekErr: errors.New("fake-peek-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), + }, { name: "Dataupload with enabled cancel", pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), @@ -494,7 +507,7 @@ func TestReconcile(t *testing.T) { } if test.du.Spec.SnapshotType == fakeSnapshotType { - r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock}} + r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.peekErr}} } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} } @@ -874,7 +887,7 @@ func TestTryCancelDataUpload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataUpload(ctx, test.dd) + r.TryCancelDataUpload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index d82baea0f..637d0805d 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/util/boolptr" corev1 "k8s.io/api/core/v1" @@ -67,6 +68,9 @@ type CSISnapshotExposeParam struct { // VolumeSize specifies the size of the source volume VolumeSize resource.Quantity + + // Affinity specifies the node affinity of the backup pod + Affinity *nodeagent.LoadAffinity } // CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots @@ -189,12 +193,12 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje } }() - backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels) + backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity) if err != nil { return errors.Wrap(err, "error to create backup pod") } - curLog.WithField("pod name", backupPod.Name).Info("Backup pod is created") + curLog.WithField("pod name", backupPod.Name).WithField("affinity", csiExposeParam.Affinity).Info("Backup pod is created") defer func() { if err != nil { @@ -255,6 +259,30 @@ func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1. return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, VolumeName: volumeName}}, nil } +func (e *csiSnapshotExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error { + backupPodName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + }) + + pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, backupPodName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + + if err != nil { + curLog.WithError(err).Warnf("error to peek backup pod %s", backupPodName) + return nil + } + + if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed { + return errors.New(message) + } + + return nil +} + func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference, vsName string, sourceNamespace string) { backupPodName := ownerObject.Name backupPVCName := ownerObject.Name @@ -382,7 +410,8 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co return created, err } -func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, label map[string]string) (*corev1.Pod, error) { +func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, + label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) { podName := ownerObject.Name volumeName := string(ownerObject.UID) @@ -430,6 +459,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co }, }, }, + Affinity: toSystemAffinity(affinity), Containers: []corev1.Container{ { Name: containerName, @@ -455,3 +485,42 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) } + +func toSystemAffinity(loadAffinity *nodeagent.LoadAffinity) *corev1.Affinity { + if loadAffinity == nil { + return nil + } + + requirements := []corev1.NodeSelectorRequirement{} + for k, v := range loadAffinity.NodeSelector.MatchLabels { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: k, + Values: []string{v}, + Operator: corev1.NodeSelectorOpIn, + }) + } + + for _, exp := range loadAffinity.NodeSelector.MatchExpressions { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: exp.Key, + Values: exp.Values, + Operator: corev1.NodeSelectorOperator(exp.Operator), + }) + } + + if len(requirements) == 0 { + return nil + } + + return &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: requirements, + }, + }, + }, + }, + } +} diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go index 85e8e96da..36168823c 100644 --- a/pkg/exposer/csi_snapshot_test.go +++ b/pkg/exposer/csi_snapshot_test.go @@ -18,6 +18,7 @@ package exposer import ( "context" + "reflect" "testing" "time" @@ -35,6 +36,7 @@ import ( corev1 "k8s.io/api/core/v1" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/nodeagent" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/util/boolptr" @@ -620,3 +622,211 @@ func TestGetExpose(t *testing.T) { }) } } + +func TestPeekExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodUrecoverable := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: backup.Name, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Reason: "Unschedulable", + Message: "unrecoverable", + }, + }, + }, + } + + backupPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: backup.Name, + }, + } + + scheme := runtime.NewScheme() + corev1.AddToScheme(scheme) + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + err string + }{ + { + name: "backup pod is not found", + ownerBackup: backup, + }, + { + name: "pod is unrecoverable", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + backupPodUrecoverable, + }, + err: "Pod is unschedulable: unrecoverable", + }, + { + name: "succeed", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + backupPod, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + exposer := csiSnapshotExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + err := exposer.PeekExposed(context.Background(), ownerObject) + if test.err == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestToSystemAffinity(t *testing.T) { + tests := []struct { + name string + loadAffinity *nodeagent.LoadAffinity + expected *corev1.Affinity + }{ + { + name: "loadAffinity is nil", + }, + { + name: "loadAffinity is empty", + loadAffinity: &nodeagent.LoadAffinity{}, + }, + { + name: "with match label", + loadAffinity: &nodeagent.LoadAffinity{ + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key-1": "value-1", + "key-2": "value-2", + }, + }, + }, + expected: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "key-1", + Values: []string{"value-1"}, + Operator: corev1.NodeSelectorOpIn, + }, + { + Key: "key-2", + Values: []string{"value-2"}, + Operator: corev1.NodeSelectorOpIn, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "with match expression", + loadAffinity: &nodeagent.LoadAffinity{ + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key-1": "value-1", + "key-2": "value-2", + }, + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "key-3", + Values: []string{"value-3-1", "value-3-2"}, + Operator: metav1.LabelSelectorOpNotIn, + }, + { + Key: "key-4", + Values: []string{"value-4-1", "value-4-2", "value-4-3"}, + Operator: metav1.LabelSelectorOpDoesNotExist, + }, + }, + }, + }, + expected: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "key-1", + Values: []string{"value-1"}, + Operator: corev1.NodeSelectorOpIn, + }, + { + Key: "key-2", + Values: []string{"value-2"}, + Operator: corev1.NodeSelectorOpIn, + }, + { + Key: "key-3", + Values: []string{"value-3-1", "value-3-2"}, + Operator: corev1.NodeSelectorOpNotIn, + }, + { + Key: "key-4", + Values: []string{"value-4-1", "value-4-2", "value-4-3"}, + Operator: corev1.NodeSelectorOpDoesNotExist, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + affinity := toSystemAffinity(test.loadAffinity) + assert.Equal(t, true, reflect.DeepEqual(affinity, test.expected)) + }) + } +} diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index c73e4a4c3..a1ebb7245 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -44,6 +44,11 @@ type GenericRestoreExposer interface { // Otherwise, it returns nil as the expose result without an error. GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) + // PeekExposed tests the status of the expose. + // If the expose is incomplete but not recoverable, it returns an error. + // Otherwise, it returns nil immediately. + PeekExposed(context.Context, corev1.ObjectReference) error + // RebindVolume unexposes the restored PV and rebind it to the target PVC RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error @@ -160,6 +165,30 @@ func (e *genericRestoreExposer) GetExposed(ctx context.Context, ownerObject core return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, VolumeName: volumeName}}, nil } +func (e *genericRestoreExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error { + restorePodName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + }) + + pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, restorePodName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + + if err != nil { + curLog.WithError(err).Warnf("error to peek restore pod %s", restorePodName) + return nil + } + + if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed { + return errors.New(message) + } + + return nil +} + func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index 3e45d180e..7721effc1 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -409,3 +409,97 @@ func TestRebindVolume(t *testing.T) { }) } } + +func TestRestorePeekExpose(t *testing.T) { + restore := &velerov1.Restore{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Restore", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-restore", + UID: "fake-uid", + }, + } + + restorePodUrecoverable := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: restore.Namespace, + Name: restore.Name, + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodPending, + Conditions: []corev1api.PodCondition{ + { + Type: corev1api.PodScheduled, + Reason: "Unschedulable", + Message: "unrecoverable", + }, + }, + }, + } + + restorePod := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: restore.Namespace, + Name: restore.Name, + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerRestore *velerov1.Restore + err string + }{ + { + name: "restore pod is not found", + ownerRestore: restore, + }, + { + name: "pod is unrecoverable", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + restorePodUrecoverable, + }, + err: "Pod is unschedulable: unrecoverable", + }, + { + name: "succeed", + ownerRestore: restore, + kubeClientObj: []runtime.Object{ + restorePod, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + exposer := genericRestoreExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerRestore != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerRestore.Kind, + Namespace: test.ownerRestore.Namespace, + Name: test.ownerRestore.Name, + UID: test.ownerRestore.UID, + APIVersion: test.ownerRestore.APIVersion, + } + } + + err := exposer.PeekExposed(context.Background(), ownerObject) + if test.err == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} diff --git a/pkg/exposer/mocks/generic_restore.go b/pkg/exposer/mocks/generic_restore.go index a7d20f87c..6981f5cef 100644 --- a/pkg/exposer/mocks/generic_restore.go +++ b/pkg/exposer/mocks/generic_restore.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -30,6 +30,10 @@ func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectRefer func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error { ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5) + if len(ret) == 0 { + panic("no return value specified for Expose") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok { r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5) @@ -44,6 +48,10 @@ func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectRefere func (_m *GenericRestoreExposer) GetExposed(_a0 context.Context, _a1 v1.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) { ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + if len(ret) == 0 { + panic("no return value specified for GetExposed") + } + var r0 *exposer.ExposeResult var r1 error if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok { @@ -66,10 +74,32 @@ func (_m *GenericRestoreExposer) GetExposed(_a0 context.Context, _a1 v1.ObjectRe return r0, r1 } +// PeekExposed provides a mock function with given fields: _a0, _a1 +func (_m *GenericRestoreExposer) PeekExposed(_a0 context.Context, _a1 v1.ObjectReference) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for PeekExposed") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // RebindVolume provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 func (_m *GenericRestoreExposer) RebindVolume(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 time.Duration) error { ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + if len(ret) == 0 { + panic("no return value specified for RebindVolume") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, time.Duration) error); ok { r0 = rf(_a0, _a1, _a2, _a3, _a4) @@ -80,13 +110,12 @@ func (_m *GenericRestoreExposer) RebindVolume(_a0 context.Context, _a1 v1.Object return r0 } -type mockConstructorTestingTNewGenericRestoreExposer interface { +// NewGenericRestoreExposer creates a new instance of GenericRestoreExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGenericRestoreExposer(t interface { mock.TestingT Cleanup(func()) -} - -// NewGenericRestoreExposer creates a new instance of GenericRestoreExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewGenericRestoreExposer(t mockConstructorTestingTNewGenericRestoreExposer) *GenericRestoreExposer { +}) *GenericRestoreExposer { mock := &GenericRestoreExposer{} mock.Mock.Test(t) diff --git a/pkg/exposer/snapshot.go b/pkg/exposer/snapshot.go index 193684044..63fee5e3a 100644 --- a/pkg/exposer/snapshot.go +++ b/pkg/exposer/snapshot.go @@ -32,6 +32,11 @@ type SnapshotExposer interface { // Otherwise, it returns nil as the expose result without an error. GetExposed(context.Context, corev1.ObjectReference, time.Duration, interface{}) (*ExposeResult, error) + // PeekExposed tests the status of the expose. + // If the expose is incomplete but not recoverable, it returns an error. + // Otherwise, it returns nil immediately. + PeekExposed(context.Context, corev1.ObjectReference) error + // CleanUp cleans up any objects generated during the snapshot expose CleanUp(context.Context, corev1.ObjectReference, string, string) } diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 67dbac17b..e3597e27a 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -50,6 +50,11 @@ type LoadConcurrency struct { PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"` } +type LoadAffinity struct { + // NodeSelector specifies the label selector to match nodes + NodeSelector metav1.LabelSelector `json:"nodeSelector"` +} + type RuledConfigs struct { // NodeSelector specifies the label selector to match nodes NodeSelector metav1.LabelSelector `json:"nodeSelector"` @@ -61,6 +66,9 @@ type RuledConfigs struct { type Configs struct { // LoadConcurrency is the config for data path load concurrency per node. LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"` + + // LoadAffinity is the config for data path load affinity. + LoadAffinity []*LoadAffinity `json:"loadAffinity,omitempty"` } // IsRunning checks if the node agent daemonset is running properly. If not, return the error found diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go index 0a7a2d431..2638b2213 100644 --- a/pkg/nodeagent/node_agent_test.go +++ b/pkg/nodeagent/node_agent_test.go @@ -294,7 +294,7 @@ func TestGetConfigs(t *testing.T) { kubeClientObj: []runtime.Object{ cmWithoutCocurrentData, }, - expectResult: &Configs{nil}, + expectResult: &Configs{}, }, { name: "success", diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index d15356ff3..03d1de1f9 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -121,6 +121,15 @@ func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, strin return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase) } + if pod.Status.Phase == corev1api.PodPending && len(pod.Status.Conditions) > 0 { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1api.PodScheduled && condition.Reason == "Unschedulable" { + log.Warnf("Pod is unschedulable %s", condition.Message) + return true, fmt.Sprintf("Pod is unschedulable: %s", condition.Message) + } + } + } + // Check the Status field for _, containerStatus := range pod.Status.ContainerStatuses { // If the container's image state is ImagePullBackOff, it indicates an image pull failure diff --git a/pkg/util/kube/pod_test.go b/pkg/util/kube/pod_test.go index f1cdac043..3dac79d31 100644 --- a/pkg/util/kube/pod_test.go +++ b/pkg/util/kube/pod_test.go @@ -401,6 +401,21 @@ func TestIsPodUnrecoverable(t *testing.T) { }, want: false, }, + { + name: "pod is unschedulable", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodPending, + Conditions: []corev1api.PodCondition{ + { + Type: corev1api.PodScheduled, + Reason: "Unschedulable", + }, + }, + }, + }, + want: true, + }, { name: "pod is normal", pod: &corev1api.Pod{