configurable data path concurrency: UT

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2023-11-06 18:36:57 +08:00
parent 262f10ff49
commit 68579448d6
6 changed files with 622 additions and 12 deletions

View File

@@ -41,6 +41,11 @@ func ForNode(name string) *NodeBuilder {
}
}
func (b *NodeBuilder) Labels(labels map[string]string) *NodeBuilder {
b.object.Labels = labels
return b
}
// Result returns the built Node.
func (b *NodeBuilder) Result() *corev1api.Node {
return b.object

View File

@@ -101,3 +101,8 @@ func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.Container
}
return b
}
func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder {
b.object.Status.Phase = phase
return b
}

View File

@@ -226,7 +226,7 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
return nil, err
}
dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum, s.logger)
dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)
s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum)
return s, nil
@@ -489,28 +489,34 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
}
}
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus.FieldLogger) int {
configs, err := nodeagent.GetConfigs(s.ctx, s.namespace, s.kubeClient)
var getConfigsFunc = nodeagent.GetConfigs
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient)
if err != nil {
logger.WithError(err).Warn("Failed to get node agent configs")
s.logger.WithError(err).Warn("Failed to get node agent configs")
return defaultNum
}
if configs == nil || configs.DataPathConcurrency == nil {
logger.Infof("Node agent configs are not found, use the default number %v", defaultNum)
s.logger.Infof("Node agent configs are not found, use the default number %v", defaultNum)
return defaultNum
}
globalNum := configs.DataPathConcurrency.GlobalConfig
if globalNum <= 0 {
logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum)
s.logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum)
globalNum = defaultNum
}
if len(configs.DataPathConcurrency.PerNodeConfig) == 0 {
return globalNum
}
curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{})
if err != nil {
logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum)
s.logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum)
return globalNum
}
@@ -519,12 +525,12 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus
for _, rule := range configs.DataPathConcurrency.PerNodeConfig {
selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector)
if err != nil {
logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String())
s.logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String())
continue
}
if rule.Number <= 0 {
logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number)
s.logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number)
continue
}
@@ -536,10 +542,10 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus
}
if concurrentNum == math.MaxInt32 {
logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum)
s.logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum)
concurrentNum = globalNum
} else {
logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName)
s.logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName)
}
return concurrentNum

View File

@@ -17,16 +17,22 @@ package nodeagent
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
testutil "github.com/vmware-tanzu/velero/pkg/test"
)
@@ -107,3 +113,259 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
})
}
}
func Test_getDataPathConcurrentNum(t *testing.T) {
defaultNum := 100001
globalNum := 6
nodeName := "node-agent-node"
node1 := builder.ForNode("node-agent-node").Result()
node2 := builder.ForNode("node-agent-node").Labels(map[string]string{
"host-name": "node-1",
"xxxx": "yyyyy",
}).Result()
invalidLabelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
"inva/lid": "inva/lid",
},
}
validLabelSelector1 := metav1.LabelSelector{
MatchLabels: map[string]string{
"host-name": "node-1",
},
}
validLabelSelector2 := metav1.LabelSelector{
MatchLabels: map[string]string{
"xxxx": "yyyyy",
},
}
tests := []struct {
name string
getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error)
setKubeClient bool
kubeClientObj []runtime.Object
expectNum int
expectLog string
}{
{
name: "failed to get configs",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, errors.New("fake-get-error")
},
expectLog: "Failed to get node agent configs",
expectNum: defaultNum,
},
{
name: "configs cm not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "configs cm's data path concurrency is nil",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{}, nil
},
expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "global number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: -1,
},
}, nil
},
expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum),
expectNum: defaultNum,
},
{
name: "global number is valid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
},
}, nil
},
expectNum: globalNum,
},
{
name: "node is not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
Number: 100,
},
},
},
}, nil
},
setKubeClient: true,
expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum),
expectNum: globalNum,
},
{
name: "failed to get selector",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: invalidLabelSelector,
Number: 100,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
expectLog: fmt.Sprintf("Failed to parse rule with label selector %s, skip it", invalidLabelSelector.String()),
expectNum: globalNum,
},
{
name: "rule number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
expectLog: fmt.Sprintf("Rule with label selector %s is with an invalid number %v, skip it", validLabelSelector1.String(), -1),
expectNum: globalNum,
},
{
name: "label doesn't match",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
expectLog: fmt.Sprintf("Per node number for node %s is not found, use the global number %v", nodeName, globalNum),
expectNum: globalNum,
},
{
name: "match one rule",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 66, globalNum, nodeName),
expectNum: 66,
},
{
name: "match multiple rules",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
{
NodeSelector: validLabelSelector2,
Number: 36,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName),
expectNum: 36,
},
{
name: "match multiple rules 2",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 36,
},
{
NodeSelector: validLabelSelector2,
Number: 66,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName),
expectNum: 36,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
logBuffer := ""
s := &nodeAgentServer{
nodeName: nodeName,
logger: testutil.NewSingleLogger(&logBuffer),
}
if test.setKubeClient {
s.kubeClient = fakeKubeClient
}
getConfigsFunc = test.getFunc
num := s.getDataPathConcurrentNum(defaultNum)
assert.Equal(t, test.expectNum, num)
if test.expectLog == "" {
assert.Equal(t, "", logBuffer)
} else {
assert.True(t, strings.Contains(logBuffer, test.expectLog))
}
})
}
}

View File

@@ -0,0 +1,332 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeagent
import (
"context"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
clientTesting "k8s.io/client-go/testing"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
)
type reactor struct {
verb string
resource string
reactorFunc clientTesting.ReactionFunc
}
func TestIsRunning(t *testing.T) {
daemonSet := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
}
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
kubeReactors []reactor
expectErr string
}{
{
name: "ds is not found",
namespace: "fake-ns",
expectErr: "daemonset not found",
},
{
name: "ds get error",
namespace: "fake-ns",
kubeReactors: []reactor{
{
verb: "get",
resource: "daemonsets",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
expectErr: "fake-get-error",
},
{
name: "succeed",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
daemonSet,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
err := IsRunning(context.TODO(), fakeKubeClient, test.namespace)
if test.expectErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}
func TestIsRunningInNode(t *testing.T) {
scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)
nonNodeAgentPod := builder.ForPod("fake-ns", "fake-pod").Result()
nodeAgentPodNotRunning := builder.ForPod("fake-ns", "fake-pod").Labels(map[string]string{"name": "node-agent"}).Result()
nodeAgentPodRunning1 := builder.ForPod("fake-ns", "fake-pod-1").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result()
nodeAgentPodRunning2 := builder.ForPod("fake-ns", "fake-pod-2").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result()
nodeAgentPodRunning3 := builder.ForPod("fake-ns", "fake-pod-3").
Labels(map[string]string{"name": "node-agent"}).
Phase(corev1.PodRunning).
NodeName("fake-node").
Result()
tests := []struct {
name string
kubeClientObj []runtime.Object
nodeName string
expectErr string
}{
{
name: "node name is empty",
expectErr: "node name is empty",
},
{
name: "ds pod not found",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nonNodeAgentPod,
},
expectErr: "daemonset pod not found in running state in node fake-node",
},
{
name: "ds po are not all running",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nodeAgentPodNotRunning,
nodeAgentPodRunning1,
},
expectErr: "daemonset pod not found in running state in node fake-node",
},
{
name: "ds pods wrong node name",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nodeAgentPodNotRunning,
nodeAgentPodRunning1,
nodeAgentPodRunning2,
},
expectErr: "daemonset pod not found in running state in node fake-node",
},
{
name: "succeed",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nodeAgentPodNotRunning,
nodeAgentPodRunning1,
nodeAgentPodRunning2,
nodeAgentPodRunning3,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
err := IsRunningInNode(context.TODO(), "", test.nodeName, fakeClient)
if test.expectErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}
func TestGetPodSpec(t *testing.T) {
podSpec := corev1.PodSpec{
NodeName: "fake-node",
}
daemonSet := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
Spec: podSpec,
},
},
}
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
expectErr string
expectSpec corev1.PodSpec
}{
{
name: "ds is not found",
namespace: "fake-ns",
expectErr: "error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found",
},
{
name: "succeed",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
daemonSet,
},
expectSpec: podSpec,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
spec, err := GetPodSpec(context.TODO(), fakeKubeClient, test.namespace)
if test.expectErr == "" {
assert.NoError(t, err)
assert.Equal(t, *spec, test.expectSpec)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}
func TestGetConfigs(t *testing.T) {
cm := builder.ForConfigMap("fake-ns", "node-agent-configs").Result()
cmWithInvalidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "fake-value").Result()
cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "wrong").Result()
cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "{\"globalConfig\": 5}").Result()
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
kubeReactors []reactor
expectResult *DataPathConcurrency
expectErr string
}{
{
name: "cm is not found",
namespace: "fake-ns",
},
{
name: "cm get error",
namespace: "fake-ns",
kubeReactors: []reactor{
{
verb: "get",
resource: "configmaps",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
expectErr: "error to get node agent configs node-agent-configs: fake-get-error",
},
{
name: "cm's data is nil",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cm,
},
expectErr: "data is not available in config map node-agent-configs",
},
{
name: "cm's data is not found",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithInvalidData,
},
},
{
name: "cm's data is with invalid format",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithInvalidDataFormat,
},
expectErr: "error to unmarshall data path concurrency configs from node-agent-configs: invalid character 'w' looking for beginning of value",
},
{
name: "success",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithValidData,
},
expectResult: &DataPathConcurrency{
GlobalConfig: 5,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
result, err := GetConfigs(context.TODO(), test.namespace, fakeKubeClient)
if test.expectErr == "" {
assert.NoError(t, err)
if test.expectResult == nil {
assert.Nil(t, result)
} else {
assert.Equal(t, *test.expectResult, *result.DataPathConcurrency)
}
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}

View File

@@ -40,7 +40,7 @@ type singleLogRecorder struct {
}
func (s *singleLogRecorder) Write(p []byte) (n int, err error) {
*s.buffer = string(p[:])
*s.buffer = *s.buffer + string(p[:])
return len(p), nil
}