mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-06 21:36:30 +00:00
configurable data path concurrency: all in cm
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
1
changelogs/unreleased/7059-Lyndon-Li
Normal file
1
changelogs/unreleased/7059-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
Add the implementation for design #6950, configurable data path concurrency
|
||||
@@ -84,7 +84,6 @@ type nodeAgentServerConfig struct {
|
||||
metricsAddress string
|
||||
resourceTimeout time.Duration
|
||||
dataMoverPrepareTimeout time.Duration
|
||||
dataPathConcurrentNum int
|
||||
}
|
||||
|
||||
func NewServerCommand(f client.Factory) *cobra.Command {
|
||||
@@ -94,7 +93,6 @@ func NewServerCommand(f client.Factory) *cobra.Command {
|
||||
metricsAddress: defaultMetricsAddress,
|
||||
resourceTimeout: defaultResourceTimeout,
|
||||
dataMoverPrepareTimeout: defaultDataMoverPrepareTimeout,
|
||||
dataPathConcurrentNum: defaultDataPathConcurrentNum,
|
||||
}
|
||||
|
||||
command := &cobra.Command{
|
||||
@@ -122,7 +120,6 @@ func NewServerCommand(f client.Factory) *cobra.Command {
|
||||
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.")
|
||||
command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.")
|
||||
command.Flags().StringVar(&config.metricsAddress, "metrics-address", config.metricsAddress, "The address to expose prometheus metrics")
|
||||
command.Flags().IntVar(&config.dataPathConcurrentNum, "data-path-concurrent-num", config.dataPathConcurrentNum, "The concurrent number of data path in the current node. Default is 1.")
|
||||
|
||||
return command
|
||||
}
|
||||
@@ -229,13 +226,7 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dataPathConcurrentNum := config.dataPathConcurrentNum
|
||||
if dataPathConcurrentNum <= 0 {
|
||||
dataPathConcurrentNum = defaultDataPathConcurrentNum
|
||||
s.logger.Warnf("Data path concurrency number %v is invalid, use the default value %v", config.dataPathConcurrentNum, defaultDataPathConcurrentNum)
|
||||
}
|
||||
|
||||
dataPathConcurrentNum = s.getDataPathConcurrentNum(dataPathConcurrentNum, s.logger)
|
||||
dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum, s.logger)
|
||||
s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum)
|
||||
|
||||
return s, nil
|
||||
@@ -498,15 +489,23 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) getDataPathConcurrentNum(globalNum int, logger logrus.FieldLogger) int {
|
||||
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus.FieldLogger) int {
|
||||
configs, err := nodeagent.GetConfigs(s.ctx, s.namespace, s.kubeClient.CoreV1())
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("Failed to get node agent configs")
|
||||
return defaultNum
|
||||
}
|
||||
|
||||
if configs == nil || configs.DataPathConcurrency == nil {
|
||||
logger.Infof("Node agent configs are not found, use the global number %v", globalNum)
|
||||
return globalNum
|
||||
logger.Infof("Node agent configs are not found, use the default number %v", defaultNum)
|
||||
return defaultNum
|
||||
}
|
||||
|
||||
globalNum := configs.DataPathConcurrency.GlobalConfig
|
||||
|
||||
if globalNum <= 0 {
|
||||
logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum)
|
||||
globalNum = defaultNum
|
||||
}
|
||||
|
||||
curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{})
|
||||
@@ -515,27 +514,23 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(globalNum int, logger logrus.
|
||||
return globalNum
|
||||
}
|
||||
|
||||
selectors := map[labels.Selector]int{}
|
||||
for rule, number := range configs.DataPathConcurrency.ConfigRules {
|
||||
selector, err := labels.Parse(rule)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule)
|
||||
continue
|
||||
}
|
||||
|
||||
if number <= 0 {
|
||||
logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule, number)
|
||||
continue
|
||||
}
|
||||
|
||||
selectors[selector] = number
|
||||
}
|
||||
|
||||
concurrentNum := math.MaxInt32
|
||||
for selector, number := range selectors {
|
||||
|
||||
for _, rule := range configs.DataPathConcurrency.PerNodeConfig {
|
||||
selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String())
|
||||
continue
|
||||
}
|
||||
|
||||
if rule.Number <= 0 {
|
||||
logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number)
|
||||
continue
|
||||
}
|
||||
|
||||
if selector.Matches(labels.Set(curNode.GetLabels())) {
|
||||
if concurrentNum > number {
|
||||
concurrentNum = number
|
||||
if concurrentNum > rule.Number {
|
||||
concurrentNum = rule.Number
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,8 +44,19 @@ var (
|
||||
)
|
||||
|
||||
type DataPathConcurrency struct {
|
||||
// ConfigRules specifies the concurrency number to nodes matched by rules
|
||||
ConfigRules map[string]int `json:"configRules"`
|
||||
// GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified
|
||||
GlobalConfig int `json:"globalConfig,omitempty"`
|
||||
|
||||
// PerNodeConfig specifies the concurrency number to nodes matched by rules
|
||||
PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"`
|
||||
}
|
||||
|
||||
type RuledConfigs struct {
|
||||
// NodeSelector specifies the label selector to match nodes
|
||||
NodeSelector metav1.LabelSelector `json:"nodeSelector"`
|
||||
|
||||
// Number specifies the number value associated to the matched nodes
|
||||
Number int `json:"number"`
|
||||
}
|
||||
|
||||
type Configs struct {
|
||||
@@ -111,16 +122,16 @@ func GetConfigs(ctx context.Context, namespace string, cmClient corev1client.Con
|
||||
return nil, errors.Errorf("data is not available in config map %s", configName)
|
||||
}
|
||||
|
||||
jsonBytes, exist := cm.Data[dataPathConConfigName]
|
||||
jsonString, exist := cm.Data[dataPathConConfigName]
|
||||
if !exist {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
concurrencyConfigs := DataPathConcurrency{}
|
||||
err = json.Unmarshal([]byte(jsonBytes), &concurrencyConfigs)
|
||||
concurrencyConfigs := &DataPathConcurrency{}
|
||||
err = json.Unmarshal([]byte(jsonString), concurrencyConfigs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to unmarshall data path concurrency configs from %s", configName)
|
||||
}
|
||||
|
||||
return &Configs{DataPathConcurrency: &concurrencyConfigs}, nil
|
||||
return &Configs{DataPathConcurrency: concurrencyConfigs}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user