issue-7036:data mover load affinties

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2024-01-29 18:25:43 +08:00
parent 1034d6aee0
commit 9a907a21f2
8 changed files with 227 additions and 125 deletions

View File

@@ -138,6 +138,7 @@ type nodeAgentServer struct {
kubeClient kubernetes.Interface
csiSnapshotClient *snapshotv1client.Clientset
dataPathMgr *datapath.Manager
dataPathConfigs *nodeagent.Configs
}
func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
@@ -226,8 +227,8 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
return nil, err
}
dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)
s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum)
s.getDataPathConfigs()
s.dataPathMgr = datapath.NewManager(s.getDataPathConcurrentNum(defaultDataPathConcurrentNum))
return s, nil
}
@@ -284,7 +285,11 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
var loadAffinity *nodeagent.LoadAffinity
if s.dataPathConfigs != nil {
loadAffinity = s.dataPathConfigs.LoadAffinity
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataUploadResume(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
@@ -454,13 +459,24 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
var getConfigsFunc = nodeagent.GetConfigs
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
func (s *nodeAgentServer) getDataPathConfigs() {
configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient)
if err != nil {
s.logger.WithError(err).Warn("Failed to get node agent configs")
return defaultNum
return
}
if configs == nil {
s.logger.Infof("Node agent configs are not found")
return
}
s.dataPathConfigs = configs
}
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
configs := s.dataPathConfigs
if configs == nil || configs.LoadConcurrency == nil {
s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum)
return defaultNum

View File

@@ -114,6 +114,64 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
}
}
func Test_getDataPathConfigs(t *testing.T) {
configs := &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: -1,
},
}
tests := []struct {
name string
getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error)
expectConfigs *nodeagent.Configs
expectLog string
}{
{
name: "failed to get configs",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, errors.New("fake-get-error")
},
expectLog: "Failed to get node agent configs",
},
{
name: "configs cm not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: "Node agent configs are not found",
},
{
name: "succeed",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return configs, nil
},
expectConfigs: configs,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logBuffer := ""
s := &nodeAgentServer{
logger: testutil.NewSingleLogger(&logBuffer),
}
getConfigsFunc = test.getFunc
s.getDataPathConfigs()
assert.Equal(t, test.expectConfigs, s.dataPathConfigs)
if test.expectLog == "" {
assert.Equal(t, "", logBuffer)
} else {
assert.True(t, strings.Contains(logBuffer, test.expectLog))
}
})
}
}
func Test_getDataPathConcurrentNum(t *testing.T) {
defaultNum := 100001
globalNum := 6
@@ -142,72 +200,47 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
tests := []struct {
name string
getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error)
configs nodeagent.Configs
setKubeClient bool
kubeClientObj []runtime.Object
expectNum int
expectLog string
}{
{
name: "failed to get configs",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, errors.New("fake-get-error")
},
expectLog: "Failed to get node agent configs",
expectNum: defaultNum,
},
{
name: "configs cm not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "configs cm's data path concurrency is nil",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{}, nil
},
name: "configs cm's data path concurrency is nil",
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "global number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: -1,
},
}, nil
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: -1,
},
},
expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum),
expectNum: defaultNum,
},
{
name: "global number is valid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
},
}, nil
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
},
},
expectNum: globalNum,
},
{
name: "node is not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
Number: 100,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
Number: 100,
},
},
}, nil
},
},
setKubeClient: true,
expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum),
@@ -215,18 +248,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "failed to get selector",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: invalidLabelSelector,
Number: 100,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: invalidLabelSelector,
Number: 100,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
@@ -235,18 +266,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "rule number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
@@ -255,18 +284,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "label doesn't match",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
@@ -275,18 +302,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "match one rule",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
@@ -295,22 +320,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "match multiple rules",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
{
NodeSelector: validLabelSelector2,
Number: 36,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
{
NodeSelector: validLabelSelector2,
Number: 36,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
@@ -319,22 +342,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "match multiple rules 2",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 36,
},
{
NodeSelector: validLabelSelector2,
Number: 66,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 36,
},
{
NodeSelector: validLabelSelector2,
Number: 66,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
@@ -349,16 +370,15 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
logBuffer := ""
s := &nodeAgentServer{
nodeName: nodeName,
logger: testutil.NewSingleLogger(&logBuffer),
nodeName: nodeName,
dataPathConfigs: &test.configs,
logger: testutil.NewSingleLogger(&logBuffer),
}
if test.setKubeClient {
s.kubeClient = fakeKubeClient
}
getConfigsFunc = test.getFunc
num := s.getDataPathConcurrentNum(defaultNum)
assert.Equal(t, test.expectNum, num)
if test.expectLog == "" {

View File

@@ -49,6 +49,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
@@ -75,12 +76,13 @@ type DataUploadReconciler struct {
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
loadAffinity *nodeagent.LoadAffinity
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
@@ -94,6 +96,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
repoEnsurer: repoEnsurer,
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)},
dataPathMgr: dataPathMgr,
loadAffinity: loadAffinity,
preparingTimeout: preparingTimeout,
metrics: metrics,
}
@@ -826,6 +829,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
OperationTimeout: du.Spec.OperationTimeout.Duration,
ExposeTimeout: r.preparingTimeout,
VolumeSize: pvc.Spec.Resources.Requests[corev1.ResourceStorage],
Affinity: r.loadAffinity,
}, nil
}
return nil, nil

View File

@@ -232,7 +232,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
if err != nil {
return nil, err
}
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil,
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil,
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

View File

@@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
corev1 "k8s.io/api/core/v1"
@@ -67,6 +68,9 @@ type CSISnapshotExposeParam struct {
// VolumeSize specifies the size of the source volume
VolumeSize resource.Quantity
// Affinity specifies the node affinity of the backup pod
Affinity *nodeagent.LoadAffinity
}
// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
@@ -189,12 +193,12 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje
}
}()
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels)
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity)
if err != nil {
return errors.Wrap(err, "error to create backup pod")
}
curLog.WithField("pod name", backupPod.Name).Info("Backup pod is created")
curLog.WithField("pod name", backupPod.Name).WithField("affinity", csiExposeParam.Affinity).Info("Backup pod is created")
defer func() {
if err != nil {
@@ -382,7 +386,8 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co
return created, err
}
func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, label map[string]string) (*corev1.Pod, error) {
func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim,
label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) {
podName := ownerObject.Name
volumeName := string(ownerObject.UID)
@@ -430,6 +435,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
},
},
},
Affinity: toSystemAffinity(affinity),
Containers: []corev1.Container{
{
Name: containerName,
@@ -455,3 +461,42 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func toSystemAffinity(loadAffinity *nodeagent.LoadAffinity) *corev1.Affinity {
if loadAffinity == nil {
return nil
}
requirements := []corev1.NodeSelectorRequirement{}
for k, v := range loadAffinity.NodeSelector.MatchLabels {
requirements = append(requirements, corev1.NodeSelectorRequirement{
Key: k,
Values: []string{v},
Operator: corev1.NodeSelectorOpIn,
})
}
for _, exp := range loadAffinity.NodeSelector.MatchExpressions {
requirements = append(requirements, corev1.NodeSelectorRequirement{
Key: exp.Key,
Values: exp.Values,
Operator: corev1.NodeSelectorOperator(exp.Operator),
})
}
if len(requirements) == 0 {
return nil
}
return &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: requirements,
},
},
},
},
}
}

View File

@@ -50,6 +50,11 @@ type LoadConcurrency struct {
PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"`
}
type LoadAffinity struct {
// NodeSelector specifies the label selector to match nodes
NodeSelector metav1.LabelSelector `json:"nodeSelector"`
}
type RuledConfigs struct {
// NodeSelector specifies the label selector to match nodes
NodeSelector metav1.LabelSelector `json:"nodeSelector"`
@@ -61,6 +66,9 @@ type RuledConfigs struct {
type Configs struct {
// LoadConcurrency is the config for data path load concurrency per node.
LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"`
// LoadAffinity is the config for data path load affinity.
LoadAffinity *LoadAffinity `json:"loadAffinity,omitempty"`
}
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found

View File

@@ -294,7 +294,7 @@ func TestGetConfigs(t *testing.T) {
kubeClientObj: []runtime.Object{
cmWithoutCocurrentData,
},
expectResult: &Configs{nil},
expectResult: &Configs{},
},
{
name: "success",

View File

@@ -121,6 +121,15 @@ func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, strin
return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase)
}
if pod.Status.Phase == corev1api.PodPending && len(pod.Status.Conditions) > 0 {
for _, condition := range pod.Status.Conditions {
if condition.Type == "PodScheduled" && condition.Reason == "Unschedulable" {
log.Warnf("Pod is unschedulable %s", condition.Message)
return true, fmt.Sprintf("Pod is unschedulable %s", condition.Message)
}
}
}
// Check the Status field
for _, containerStatus := range pod.Status.ContainerStatuses {
// If the container's image state is ImagePullBackOff, it indicates an image pull failure