New flag 'ip-list' for the 'load' command in e2e runner

This commit is contained in:
Sergio Mena
2022-05-25 19:01:31 +02:00
parent 9027401de4
commit fdf3449fff
3 changed files with 89 additions and 20 deletions

View File

@@ -462,9 +462,14 @@ func (n Node) AddressRPC() string {
return fmt.Sprintf("%v:26657", ip)
}
func NewClient(ip string, port uint32) (*rpchttp.HTTP, error) {
fmt.Printf("[NewClient] http://%v:%v\n", ip, port)
return rpchttp.New(fmt.Sprintf("http://%v:%v", ip, port))
}
// Client returns an RPC client for a node.
func (n Node) Client() (*rpchttp.HTTP, error) {
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort))
return NewClient("127.0.0.1", n.ProxyPort)
}
// Stateless returns true if the node is either a seed node or a light node

View File

@@ -14,15 +14,24 @@ import (
"github.com/tendermint/tendermint/types"
)
func nNodes(testnet *e2e.Testnet, ips []string) int {
if testnet == nil {
return len(ips)
} else {
return len(testnet.Nodes)
}
}
// Load generates transactions against the network until the given context is
// canceled.
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet) error {
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet, ips []string) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
// This also limits the number of TCP connections, since each worker has
// a connection to all nodes.
concurrency := len(testnet.Nodes) * 2
concurrency := nNodes(testnet, ips) * 2
if concurrency > 32 {
concurrency = 32
}
@@ -33,17 +42,21 @@ func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Tes
defer cancel()
// Spawn job generator and processors.
txSize := 0
if testnet != nil {
txSize = testnet.TxSize
}
logger.Info("starting transaction load",
"workers", concurrency,
"nodes", len(testnet.Nodes),
"tx", testnet.TxSize)
"nodes", nNodes(testnet, ips),
"tx", txSize)
started := time.Now()
go loadGenerate(ctx, r, chTx, testnet.TxSize, len(testnet.Nodes))
go loadGenerate(ctx, r, chTx, txSize, nNodes(testnet, ips))
for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
go loadProcess(ctx, testnet, ips, chTx, chSuccess)
}
// Montior transaction to ensure load propagates to the network
@@ -132,10 +145,7 @@ func loadGenerateWaitTime(r *rand.Rand, size int) time.Duration {
return time.Duration(baseJitter + sizeJitter)
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
func prepareClients(testnet *e2e.Testnet) []*rpchttp.HTTP {
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
for idx := range testnet.Nodes {
@@ -154,6 +164,35 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
clients = append(clients, client)
}
return clients
}
func prepareClientsIp(ips []string) []*rpchttp.HTTP {
clients := make([]*rpchttp.HTTP, 0, len(ips))
for _, ip := range ips {
client, err := e2e.NewClient(ip, 6000)
if err != nil {
continue
}
clients = append(clients, client)
}
return clients
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, ips []string, chTx <-chan types.Tx, chSuccess chan<- int) {
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
var clients []*rpchttp.HTTP
if testnet == nil {
fmt.Printf("[loadProcess] %v\n", ips)
clients = prepareClientsIp(ips)
} else {
clients = prepareClients(testnet)
}
if len(clients) == 0 {
panic("no clients to process load")

View File

@@ -7,6 +7,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/spf13/cobra"
@@ -33,6 +34,7 @@ func main() {
type CLI struct {
root *cobra.Command
testnet *e2e.Testnet
ips []string
preserve bool
}
@@ -45,16 +47,23 @@ func NewCLI(logger log.Logger) *CLI {
SilenceUsage: true,
SilenceErrors: true, // we'll output them ourselves in Run()
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
file, err := cmd.Flags().GetString("file")
ips, err := cmd.Flags().GetString("ip-list")
if err != nil {
return err
}
testnet, err := e2e.LoadTestnet(file)
if err != nil {
return err
if len(ips) == 0 {
file, err := cmd.Flags().GetString("file")
if err != nil {
return err
}
testnet, err := e2e.LoadTestnet(file)
if err != nil {
return err
}
cli.testnet = testnet
return nil
}
cli.testnet = testnet
cli.ips = strings.Split(ips, ",")
return nil
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
@@ -84,7 +93,7 @@ func NewCLI(logger log.Logger) *CLI {
lctx, loadCancel := context.WithCancel(ctx)
defer loadCancel()
go func() {
chLoadResult <- Load(lctx, logger, r, cli.testnet)
chLoadResult <- Load(lctx, logger, r, cli.testnet, nil)
}()
startAt := time.Now()
if err = Start(ctx, logger, cli.testnet); err != nil {
@@ -142,7 +151,7 @@ func NewCLI(logger log.Logger) *CLI {
}
cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest")
_ = cli.root.MarkPersistentFlagRequired("file")
cli.root.PersistentFlags().StringP("ip-list", "i", "", "Comma-separated list of ip addresses for load generation")
cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false,
"Preserves the running of the test net after tests are completed")
@@ -227,6 +236,22 @@ func NewCLI(logger log.Logger) *CLI {
logger,
rand.New(rand.NewSource(randomSeed)), // nolint: gosec
cli.testnet,
cli.ips,
)
},
})
cli.root.AddCommand(&cobra.Command{
Use: "load-ip [comma-separated list of IPs]",
Args: cobra.MaximumNArgs(1),
Short: "Generates transaction load to a list of IPs until the command is canceled",
RunE: func(cmd *cobra.Command, args []string) (err error) {
return Load(
cmd.Context(),
logger,
rand.New(rand.NewSource(randomSeed)), // nolint: gosec
nil,
cli.ips,
)
},
})
@@ -328,7 +353,7 @@ Does not run any perbutations.
lctx, loadCancel := context.WithCancel(ctx)
defer loadCancel()
go func() {
chLoadResult <- Load(lctx, logger, r, cli.testnet)
chLoadResult <- Load(lctx, logger, r, cli.testnet, nil)
}()
if err := Start(ctx, logger, cli.testnet); err != nil {