Merge pull request #7437 from Lyndon-Li/issue-fix-7036

Issue 7036: node selection for data mover backup
This commit is contained in:
lyndon-li
2024-03-29 17:04:40 +08:00
committed by GitHub
17 changed files with 678 additions and 139 deletions

View File

@@ -138,6 +138,7 @@ type nodeAgentServer struct {
kubeClient kubernetes.Interface
csiSnapshotClient *snapshotv1client.Clientset
dataPathMgr *datapath.Manager
dataPathConfigs *nodeagent.Configs
}
func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
@@ -226,8 +227,8 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
return nil, err
}
dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)
s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum)
s.getDataPathConfigs()
s.dataPathMgr = datapath.NewManager(s.getDataPathConcurrentNum(defaultDataPathConcurrentNum))
return s, nil
}
@@ -284,7 +285,11 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
var loadAffinity *nodeagent.LoadAffinity
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataUploadResume(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
@@ -454,13 +459,24 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
var getConfigsFunc = nodeagent.GetConfigs
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
func (s *nodeAgentServer) getDataPathConfigs() {
configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient)
if err != nil {
s.logger.WithError(err).Warn("Failed to get node agent configs")
return defaultNum
return
}
if configs == nil {
s.logger.Infof("Node agent configs are not found")
return
}
s.dataPathConfigs = configs
}
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
configs := s.dataPathConfigs
if configs == nil || configs.LoadConcurrency == nil {
s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum)
return defaultNum

View File

@@ -114,6 +114,64 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
}
}
func Test_getDataPathConfigs(t *testing.T) {
configs := &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: -1,
},
}
tests := []struct {
name string
getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error)
expectConfigs *nodeagent.Configs
expectLog string
}{
{
name: "failed to get configs",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, errors.New("fake-get-error")
},
expectLog: "Failed to get node agent configs",
},
{
name: "configs cm not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: "Node agent configs are not found",
},
{
name: "succeed",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return configs, nil
},
expectConfigs: configs,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logBuffer := ""
s := &nodeAgentServer{
logger: testutil.NewSingleLogger(&logBuffer),
}
getConfigsFunc = test.getFunc
s.getDataPathConfigs()
assert.Equal(t, test.expectConfigs, s.dataPathConfigs)
if test.expectLog == "" {
assert.Equal(t, "", logBuffer)
} else {
assert.True(t, strings.Contains(logBuffer, test.expectLog))
}
})
}
}
func Test_getDataPathConcurrentNum(t *testing.T) {
defaultNum := 100001
globalNum := 6
@@ -142,72 +200,47 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
tests := []struct {
name string
getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error)
configs nodeagent.Configs
setKubeClient bool
kubeClientObj []runtime.Object
expectNum int
expectLog string
}{
{
name: "failed to get configs",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, errors.New("fake-get-error")
},
expectLog: "Failed to get node agent configs",
expectNum: defaultNum,
},
{
name: "configs cm not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "configs cm's data path concurrency is nil",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{}, nil
},
name: "configs cm's data path concurrency is nil",
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "global number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: -1,
},
}, nil
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: -1,
},
},
expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum),
expectNum: defaultNum,
},
{
name: "global number is valid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
},
}, nil
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
},
},
expectNum: globalNum,
},
{
name: "node is not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
Number: 100,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
Number: 100,
},
},
}, nil
},
},
setKubeClient: true,
expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum),
@@ -215,18 +248,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "failed to get selector",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: invalidLabelSelector,
Number: 100,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: invalidLabelSelector,
Number: 100,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
@@ -235,18 +266,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "rule number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
@@ -255,18 +284,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "label doesn't match",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
@@ -275,18 +302,16 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "match one rule",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
@@ -295,22 +320,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "match multiple rules",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
{
NodeSelector: validLabelSelector2,
Number: 36,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
{
NodeSelector: validLabelSelector2,
Number: 36,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
@@ -319,22 +342,20 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
},
{
name: "match multiple rules 2",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 36,
},
{
NodeSelector: validLabelSelector2,
Number: 66,
},
configs: nodeagent.Configs{
LoadConcurrency: &nodeagent.LoadConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 36,
},
{
NodeSelector: validLabelSelector2,
Number: 66,
},
},
}, nil
},
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
@@ -349,16 +370,15 @@ func Test_getDataPathConcurrentNum(t *testing.T) {
logBuffer := ""
s := &nodeAgentServer{
nodeName: nodeName,
logger: testutil.NewSingleLogger(&logBuffer),
nodeName: nodeName,
dataPathConfigs: &test.configs,
logger: testutil.NewSingleLogger(&logBuffer),
}
if test.setKubeClient {
s.kubeClient = fakeKubeClient
}
getConfigsFunc = test.getFunc
num := s.getDataPathConcurrentNum(defaultNum)
assert.Equal(t, test.expectNum, num)
if test.expectLog == "" {