diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 37a641309..95d47ca8e 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -138,6 +138,7 @@ type nodeAgentServer struct { kubeClient kubernetes.Interface csiSnapshotClient *snapshotv1client.Clientset dataPathMgr *datapath.Manager + dataPathConfigs *nodeagent.Configs } func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { @@ -226,8 +227,8 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi return nil, err } - dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum) - s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum) + s.getDataPathConfigs() + s.dataPathMgr = datapath.NewManager(s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)) return s, nil } @@ -284,7 +285,11 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + var loadAffinity *nodeagent.LoadAffinity + if s.dataPathConfigs != nil { + loadAffinity = s.dataPathConfigs.LoadAffinity + } + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.attemptDataUploadResume(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") @@ -454,13 +459,24 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { var getConfigsFunc = nodeagent.GetConfigs -func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { +func (s *nodeAgentServer) getDataPathConfigs() { configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient) if err != nil { s.logger.WithError(err).Warn("Failed to get node agent configs") - return defaultNum + return } + if configs == nil { + s.logger.Infof("Node agent configs are not found") + return + } + + s.dataPathConfigs = configs +} + +func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { + configs := s.dataPathConfigs + if configs == nil || configs.LoadConcurrency == nil { s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum) return defaultNum diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index d062a7186..187cc6dc0 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -114,6 +114,64 @@ func Test_validatePodVolumesHostPath(t *testing.T) { } } +func Test_getDataPathConfigs(t *testing.T) { + configs := &nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: -1, + }, + } + + tests := []struct { + name string + getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) + expectConfigs *nodeagent.Configs + expectLog string + }{ + { + name: "failed to get configs", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, errors.New("fake-get-error") + }, + expectLog: "Failed to get node agent configs", + }, + { + name: "configs cm not found", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, nil + }, + expectLog: "Node agent configs are not found", + }, + + { + name: "succeed", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return configs, nil + }, + expectConfigs: configs, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logBuffer := "" + + s := &nodeAgentServer{ + logger: testutil.NewSingleLogger(&logBuffer), + } + + getConfigsFunc = test.getFunc + + s.getDataPathConfigs() + assert.Equal(t, test.expectConfigs, s.dataPathConfigs) + if test.expectLog == "" { + assert.Equal(t, "", logBuffer) + } else { + assert.True(t, strings.Contains(logBuffer, test.expectLog)) + } + }) + } +} + func Test_getDataPathConcurrentNum(t *testing.T) { defaultNum := 100001 globalNum := 6 @@ -142,72 +200,47 @@ func Test_getDataPathConcurrentNum(t *testing.T) { tests := []struct { name string - getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) + configs nodeagent.Configs setKubeClient bool kubeClientObj []runtime.Object expectNum int expectLog string }{ { - name: "failed to get configs", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return nil, errors.New("fake-get-error") - }, - expectLog: "Failed to get node agent configs", - expectNum: defaultNum, - }, - { - name: "configs cm not found", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return nil, nil - }, - expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), - expectNum: defaultNum, - }, - { - name: "configs cm's data path concurrency is nil", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{}, nil - }, + name: "configs cm's data path concurrency is nil", expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), expectNum: defaultNum, }, { name: "global number is invalid", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: -1, - }, - }, nil + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: -1, + }, }, expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum), expectNum: defaultNum, }, { name: "global number is valid", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - }, - }, nil + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + }, }, expectNum: globalNum, }, { name: "node is not found", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - Number: 100, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + Number: 100, }, }, - }, nil + }, }, setKubeClient: true, expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum), @@ -215,18 +248,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "failed to get selector", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: invalidLabelSelector, - Number: 100, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: invalidLabelSelector, + Number: 100, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node1}, @@ -235,18 +266,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "rule number is invalid", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: -1, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node1}, @@ -255,18 +284,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "label doesn't match", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: -1, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node1}, @@ -275,18 +302,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "match one rule", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: 66, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node2}, @@ -295,22 +320,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "match multiple rules", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: 66, - }, - { - NodeSelector: validLabelSelector2, - Number: 36, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, + }, + { + NodeSelector: validLabelSelector2, + Number: 36, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node2}, @@ -319,22 +342,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) { }, { name: "match multiple rules 2", - getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { - return &nodeagent.Configs{ - LoadConcurrency: &nodeagent.LoadConcurrency{ - GlobalConfig: globalNum, - PerNodeConfig: []nodeagent.RuledConfigs{ - { - NodeSelector: validLabelSelector1, - Number: 36, - }, - { - NodeSelector: validLabelSelector2, - Number: 66, - }, + configs: nodeagent.Configs{ + LoadConcurrency: &nodeagent.LoadConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 36, + }, + { + NodeSelector: validLabelSelector2, + Number: 66, }, }, - }, nil + }, }, setKubeClient: true, kubeClientObj: []runtime.Object{node2}, @@ -349,16 +370,15 @@ func Test_getDataPathConcurrentNum(t *testing.T) { logBuffer := "" s := &nodeAgentServer{ - nodeName: nodeName, - logger: testutil.NewSingleLogger(&logBuffer), + nodeName: nodeName, + dataPathConfigs: &test.configs, + logger: testutil.NewSingleLogger(&logBuffer), } if test.setKubeClient { s.kubeClient = fakeKubeClient } - getConfigsFunc = test.getFunc - num := s.getDataPathConcurrentNum(defaultNum) assert.Equal(t, test.expectNum, num) if test.expectLog == "" { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index b7c18e51d..e50b33a97 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -49,6 +49,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" @@ -75,12 +76,13 @@ type DataUploadReconciler struct { logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager + loadAffinity *nodeagent.LoadAffinity preparingTimeout time.Duration metrics *metrics.ServerMetrics } func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, + dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, @@ -94,6 +96,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: dataPathMgr, + loadAffinity: loadAffinity, preparingTimeout: preparingTimeout, metrics: metrics, } @@ -826,6 +829,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload OperationTimeout: du.Spec.OperationTimeout.Duration, ExposeTimeout: r.preparingTimeout, VolumeSize: pvc.Spec.Resources.Requests[corev1.ResourceStorage], + Affinity: r.loadAffinity, }, nil } return nil, nil diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 8d6518864..3a0bb5aa8 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -232,7 +232,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci if err != nil { return nil, err } - return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, + return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index 7113f2fe8..ed5b1b89f 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/util/boolptr" corev1 "k8s.io/api/core/v1" @@ -67,6 +68,9 @@ type CSISnapshotExposeParam struct { // VolumeSize specifies the size of the source volume VolumeSize resource.Quantity + + // Affinity specifies the node affinity of the backup pod + Affinity *nodeagent.LoadAffinity } // CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots @@ -189,12 +193,12 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje } }() - backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels) + backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity) if err != nil { return errors.Wrap(err, "error to create backup pod") } - curLog.WithField("pod name", backupPod.Name).Info("Backup pod is created") + curLog.WithField("pod name", backupPod.Name).WithField("affinity", csiExposeParam.Affinity).Info("Backup pod is created") defer func() { if err != nil { @@ -382,7 +386,8 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co return created, err } -func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, label map[string]string) (*corev1.Pod, error) { +func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, + label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) { podName := ownerObject.Name volumeName := string(ownerObject.UID) @@ -430,6 +435,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co }, }, }, + Affinity: toSystemAffinity(affinity), Containers: []corev1.Container{ { Name: containerName, @@ -455,3 +461,42 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) } + +func toSystemAffinity(loadAffinity *nodeagent.LoadAffinity) *corev1.Affinity { + if loadAffinity == nil { + return nil + } + + requirements := []corev1.NodeSelectorRequirement{} + for k, v := range loadAffinity.NodeSelector.MatchLabels { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: k, + Values: []string{v}, + Operator: corev1.NodeSelectorOpIn, + }) + } + + for _, exp := range loadAffinity.NodeSelector.MatchExpressions { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: exp.Key, + Values: exp.Values, + Operator: corev1.NodeSelectorOperator(exp.Operator), + }) + } + + if len(requirements) == 0 { + return nil + } + + return &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: requirements, + }, + }, + }, + }, + } +} diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 67dbac17b..7fad915ae 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -50,6 +50,11 @@ type LoadConcurrency struct { PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"` } +type LoadAffinity struct { + // NodeSelector specifies the label selector to match nodes + NodeSelector metav1.LabelSelector `json:"nodeSelector"` +} + type RuledConfigs struct { // NodeSelector specifies the label selector to match nodes NodeSelector metav1.LabelSelector `json:"nodeSelector"` @@ -61,6 +66,9 @@ type RuledConfigs struct { type Configs struct { // LoadConcurrency is the config for data path load concurrency per node. LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"` + + // LoadAffinity is the config for data path load affinity. + LoadAffinity *LoadAffinity `json:"loadAffinity,omitempty"` } // IsRunning checks if the node agent daemonset is running properly. If not, return the error found diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go index 0a7a2d431..2638b2213 100644 --- a/pkg/nodeagent/node_agent_test.go +++ b/pkg/nodeagent/node_agent_test.go @@ -294,7 +294,7 @@ func TestGetConfigs(t *testing.T) { kubeClientObj: []runtime.Object{ cmWithoutCocurrentData, }, - expectResult: &Configs{nil}, + expectResult: &Configs{}, }, { name: "success", diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index d93657878..892eb9338 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -121,6 +121,15 @@ func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, strin return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase) } + if pod.Status.Phase == corev1api.PodPending && len(pod.Status.Conditions) > 0 { + for _, condition := range pod.Status.Conditions { + if condition.Type == "PodScheduled" && condition.Reason == "Unschedulable" { + log.Warnf("Pod is unschedulable %s", condition.Message) + return true, fmt.Sprintf("Pod is unschedulable %s", condition.Message) + } + } + } + // Check the Status field for _, containerStatus := range pod.Status.ContainerStatuses { // If the container's image state is ImagePullBackOff, it indicates an image pull failure