From fdf3449fff3ce8388d7de0e7cbc2efd5a4dd08d0 Mon Sep 17 00:00:00 2001 From: Sergio Mena Date: Wed, 25 May 2022 19:01:31 +0200 Subject: [PATCH] New flag 'ip-list' for the 'load' command in e2e runner --- test/e2e/pkg/testnet.go | 7 ++++- test/e2e/runner/load.go | 59 ++++++++++++++++++++++++++++++++++------- test/e2e/runner/main.go | 43 +++++++++++++++++++++++------- 3 files changed, 89 insertions(+), 20 deletions(-) diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index ad79c99c6..0af15efa8 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -462,9 +462,14 @@ func (n Node) AddressRPC() string { return fmt.Sprintf("%v:26657", ip) } +func NewClient(ip string, port uint32) (*rpchttp.HTTP, error) { + fmt.Printf("[NewClient] http://%v:%v\n", ip, port) + return rpchttp.New(fmt.Sprintf("http://%v:%v", ip, port)) +} + // Client returns an RPC client for a node. func (n Node) Client() (*rpchttp.HTTP, error) { - return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort)) + return NewClient("127.0.0.1", n.ProxyPort) } // Stateless returns true if the node is either a seed node or a light node diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index eac2d682a..17885b82f 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -14,15 +14,24 @@ import ( "github.com/tendermint/tendermint/types" ) +func nNodes(testnet *e2e.Testnet, ips []string) int { + if testnet == nil { + return len(ips) + } else { + return len(testnet.Nodes) + } +} + // Load generates transactions against the network until the given context is // canceled. -func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet) error { +func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet, ips []string) error { // Since transactions are executed across all nodes in the network, we need // to reduce transaction load for larger networks to avoid using too much // CPU. This gives high-throughput small networks and low-throughput large ones. // This also limits the number of TCP connections, since each worker has // a connection to all nodes. - concurrency := len(testnet.Nodes) * 2 + + concurrency := nNodes(testnet, ips) * 2 if concurrency > 32 { concurrency = 32 } @@ -33,17 +42,21 @@ func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Tes defer cancel() // Spawn job generator and processors. + txSize := 0 + if testnet != nil { + txSize = testnet.TxSize + } logger.Info("starting transaction load", "workers", concurrency, - "nodes", len(testnet.Nodes), - "tx", testnet.TxSize) + "nodes", nNodes(testnet, ips), + "tx", txSize) started := time.Now() - go loadGenerate(ctx, r, chTx, testnet.TxSize, len(testnet.Nodes)) + go loadGenerate(ctx, r, chTx, txSize, nNodes(testnet, ips)) for w := 0; w < concurrency; w++ { - go loadProcess(ctx, testnet, chTx, chSuccess) + go loadProcess(ctx, testnet, ips, chTx, chSuccess) } // Montior transaction to ensure load propagates to the network @@ -132,10 +145,7 @@ func loadGenerateWaitTime(r *rand.Rand, size int) time.Duration { return time.Duration(baseJitter + sizeJitter) } -// loadProcess processes transactions -func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) { - // Each worker gets its own client to each usable node, which - // allows for some concurrency while still bounding it. +func prepareClients(testnet *e2e.Testnet) []*rpchttp.HTTP { clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes)) for idx := range testnet.Nodes { @@ -154,6 +164,35 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx clients = append(clients, client) } + return clients +} + +func prepareClientsIp(ips []string) []*rpchttp.HTTP { + clients := make([]*rpchttp.HTTP, 0, len(ips)) + + for _, ip := range ips { + client, err := e2e.NewClient(ip, 6000) + if err != nil { + continue + } + + clients = append(clients, client) + } + return clients +} + +// loadProcess processes transactions +func loadProcess(ctx context.Context, testnet *e2e.Testnet, ips []string, chTx <-chan types.Tx, chSuccess chan<- int) { + // Each worker gets its own client to each usable node, which + // allows for some concurrency while still bounding it. + + var clients []*rpchttp.HTTP + if testnet == nil { + fmt.Printf("[loadProcess] %v\n", ips) + clients = prepareClientsIp(ips) + } else { + clients = prepareClients(testnet) + } if len(clients) == 0 { panic("no clients to process load") diff --git a/test/e2e/runner/main.go b/test/e2e/runner/main.go index c4a73d33f..547532a5f 100644 --- a/test/e2e/runner/main.go +++ b/test/e2e/runner/main.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "strconv" + "strings" "time" "github.com/spf13/cobra" @@ -33,6 +34,7 @@ func main() { type CLI struct { root *cobra.Command testnet *e2e.Testnet + ips []string preserve bool } @@ -45,16 +47,23 @@ func NewCLI(logger log.Logger) *CLI { SilenceUsage: true, SilenceErrors: true, // we'll output them ourselves in Run() PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - file, err := cmd.Flags().GetString("file") + ips, err := cmd.Flags().GetString("ip-list") if err != nil { return err } - testnet, err := e2e.LoadTestnet(file) - if err != nil { - return err + if len(ips) == 0 { + file, err := cmd.Flags().GetString("file") + if err != nil { + return err + } + testnet, err := e2e.LoadTestnet(file) + if err != nil { + return err + } + cli.testnet = testnet + return nil } - - cli.testnet = testnet + cli.ips = strings.Split(ips, ",") return nil }, RunE: func(cmd *cobra.Command, args []string) (err error) { @@ -84,7 +93,7 @@ func NewCLI(logger log.Logger) *CLI { lctx, loadCancel := context.WithCancel(ctx) defer loadCancel() go func() { - chLoadResult <- Load(lctx, logger, r, cli.testnet) + chLoadResult <- Load(lctx, logger, r, cli.testnet, nil) }() startAt := time.Now() if err = Start(ctx, logger, cli.testnet); err != nil { @@ -142,7 +151,7 @@ func NewCLI(logger log.Logger) *CLI { } cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest") - _ = cli.root.MarkPersistentFlagRequired("file") + cli.root.PersistentFlags().StringP("ip-list", "i", "", "Comma-separated list of ip addresses for load generation") cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false, "Preserves the running of the test net after tests are completed") @@ -227,6 +236,22 @@ func NewCLI(logger log.Logger) *CLI { logger, rand.New(rand.NewSource(randomSeed)), // nolint: gosec cli.testnet, + cli.ips, + ) + }, + }) + + cli.root.AddCommand(&cobra.Command{ + Use: "load-ip [comma-separated list of IPs]", + Args: cobra.MaximumNArgs(1), + Short: "Generates transaction load to a list of IPs until the command is canceled", + RunE: func(cmd *cobra.Command, args []string) (err error) { + return Load( + cmd.Context(), + logger, + rand.New(rand.NewSource(randomSeed)), // nolint: gosec + nil, + cli.ips, ) }, }) @@ -328,7 +353,7 @@ Does not run any perbutations. lctx, loadCancel := context.WithCancel(ctx) defer loadCancel() go func() { - chLoadResult <- Load(lctx, logger, r, cli.testnet) + chLoadResult <- Load(lctx, logger, r, cli.testnet, nil) }() if err := Start(ctx, logger, cli.testnet); err != nil {