From 21b2801c6029c6d510d65cc1cc83fbf055ff618f Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Wed, 30 Nov 2022 13:36:19 -0500 Subject: [PATCH] e2e: test runner generates loadtime formatted transactions. (#9779) --- test/e2e/networks/simple.toml | 1 - test/e2e/pkg/manifest.go | 9 +++ test/e2e/pkg/testnet.go | 23 ++++++ test/e2e/runner/load.go | 137 ++++++++++++++++++---------------- test/e2e/runner/main.go | 18 +---- 5 files changed, 110 insertions(+), 78 deletions(-) diff --git a/test/e2e/networks/simple.toml b/test/e2e/networks/simple.toml index 05cda1819..96b81f79f 100644 --- a/test/e2e/networks/simple.toml +++ b/test/e2e/networks/simple.toml @@ -2,4 +2,3 @@ [node.validator02] [node.validator03] [node.validator04] - diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index a91f21c63..402f64362 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -68,6 +68,10 @@ type Manifest struct { ProcessProposalDelay time.Duration `toml:"process_proposal_delay"` CheckTxDelay time.Duration `toml:"check_tx_delay"` // TODO: add vote extension and finalize block delay (@cmwaters) + + LoadTxSizeBytes int `toml:"load_tx_size_bytes"` + LoadTxBatchSize int `toml:"load_tx_batch_size"` + LoadTxConnections int `toml:"load_tx_connections"` } // ManifestNode represents a node in a testnet manifest. @@ -145,6 +149,11 @@ type ManifestNode struct { // pause: temporarily pauses (freezes) the node // restart: restarts the node, shutting it down with SIGTERM Perturb []string `toml:"perturb"` + + // SendNoLoad determines if the e2e test should send load to this node. + // It defaults to false so unless the configured, the node will + // receive load. + SendNoLoad bool `toml:"send_no_laod"` } // Save saves the testnet manifest to a file. diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 030d542f7..8d0d674db 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -21,6 +21,10 @@ import ( const ( randomSeed int64 = 2308084734268 proxyPortFirst uint32 = 5701 + + defaultBatchSize = 2 + defaultConnections = 1 + defaultTxSizeBytes = 1024 ) type ( @@ -63,6 +67,9 @@ type Testnet struct { Nodes []*Node KeyType string Evidence int + LoadTxSizeBytes int + LoadTxBatchSize int + LoadTxConnections int ABCIProtocol string PrepareProposalDelay time.Duration ProcessProposalDelay time.Duration @@ -92,6 +99,9 @@ type Node struct { Seeds []*Node PersistentPeers []*Node Perturbations []Perturbation + + // SendNoLoad determines if the e2e test should send load to this node. + SendNoLoad bool } // LoadTestnet loads a testnet from a manifest file, using the filename to @@ -119,6 +129,9 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test ValidatorUpdates: map[int64]map[*Node]int64{}, Nodes: []*Node{}, Evidence: manifest.Evidence, + LoadTxSizeBytes: manifest.LoadTxSizeBytes, + LoadTxBatchSize: manifest.LoadTxBatchSize, + LoadTxConnections: manifest.LoadTxConnections, ABCIProtocol: manifest.ABCIProtocol, PrepareProposalDelay: manifest.PrepareProposalDelay, ProcessProposalDelay: manifest.ProcessProposalDelay, @@ -133,6 +146,15 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test if testnet.ABCIProtocol == "" { testnet.ABCIProtocol = string(ProtocolBuiltin) } + if testnet.LoadTxConnections == 0 { + testnet.LoadTxConnections = defaultConnections + } + if testnet.LoadTxBatchSize == 0 { + testnet.LoadTxBatchSize = defaultBatchSize + } + if testnet.LoadTxSizeBytes == 0 { + testnet.LoadTxSizeBytes = defaultTxSizeBytes + } // Set up nodes, in alphabetical order (IPs and ports get same order). nodeNames := []string{} @@ -167,6 +189,7 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test SnapshotInterval: nodeManifest.SnapshotInterval, RetainBlocks: nodeManifest.RetainBlocks, Perturbations: []Perturbation{}, + SendNoLoad: nodeManifest.SendNoLoad, } if node.StartAt == testnet.InitialHeight { node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index 5f6d1e1ba..735af4821 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -2,47 +2,48 @@ package main import ( "context" - "crypto/rand" "errors" "fmt" - "math" + "sync" "time" + "github.com/google/uuid" "github.com/tendermint/tendermint/libs/log" rpchttp "github.com/tendermint/tendermint/rpc/client/http" e2e "github.com/tendermint/tendermint/test/e2e/pkg" + "github.com/tendermint/tendermint/test/loadtime/payload" "github.com/tendermint/tendermint/types" ) +const workerPoolSize = 16 + // Load generates transactions against the network until the given context is -// canceled. A multiplier of greater than one can be supplied if load needs to -// be generated beyond a minimum amount. -func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error { - // Since transactions are executed across all nodes in the network, we need - // to reduce transaction load for larger networks to avoid using too much - // CPU. This gives high-throughput small networks and low-throughput large ones. - // This also limits the number of TCP connections, since each worker has - // a connection to all nodes. - concurrency := 64 / len(testnet.Nodes) - if concurrency == 0 { - concurrency = 1 - } +// canceled. +func Load(ctx context.Context, testnet *e2e.Testnet) error { initialTimeout := 1 * time.Minute stallTimeout := 30 * time.Second - - chTx := make(chan types.Tx) - chSuccess := make(chan types.Tx) + chSuccess := make(chan struct{}) ctx, cancel := context.WithCancel(ctx) defer cancel() - // Spawn job generator and processors. - logger.Info("load", "msg", log.NewLazySprintf("Starting transaction load (%v workers)...", concurrency)) started := time.Now() + u := [16]byte(uuid.New()) // generate run ID on startup - go loadGenerate(ctx, chTx, multiplier) + txCh := make(chan types.Tx) + go loadGenerate(ctx, txCh, testnet, u[:]) - for w := 0; w < concurrency; w++ { - go loadProcess(ctx, testnet, chTx, chSuccess) + for _, n := range testnet.Nodes { + if n.SendNoLoad { + continue + } + + for w := 0; w < testnet.LoadTxConnections; w++ { + cli, err := n.Client() + if err != nil { + return err + } + go loadProcess(ctx, txCh, chSuccess, cli) + } } // Monitor successful transactions, and abort on stalls. @@ -67,58 +68,68 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error { } // loadGenerate generates jobs until the context is canceled -func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) { - for i := 0; i < math.MaxInt64; i++ { - // We keep generating the same 1000 keys over and over, with different values. - // This gives a reasonable load without putting too much data in the app. - id := i % 1000 - - bz := make([]byte, 1024) // 1kb hex-encoded - _, err := rand.Read(bz) - if err != nil { - panic(fmt.Sprintf("Failed to read random bytes: %v", err)) - } - tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz)) - +func loadGenerate(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) { + t := time.NewTimer(0) + defer t.Stop() + for { select { - case chTx <- tx: - time.Sleep(time.Second / time.Duration(multiplier)) - + case <-t.C: case <-ctx.Done(): - close(chTx) + close(txCh) return } + t.Reset(time.Second) + + // A context with a timeout is created here to time the createTxBatch + // function out. If createTxBatch has not completed its work by the time + // the next batch is set to be sent out, then the context is cancled so that + // the current batch is halted, allowing the next batch to begin. + tctx, cf := context.WithTimeout(ctx, time.Second) + createTxBatch(tctx, txCh, testnet, id) + cf() } } -// loadProcess processes transactions -func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) { - // Each worker gets its own client to each node, which allows for some - // concurrency while still bounding it. - clients := map[string]*rpchttp.HTTP{} +// createTxBatch creates new transactions and sends them into the txCh. createTxBatch +// returns when either a full batch has been sent to the txCh or the context +// is canceled. +func createTxBatch(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) { + wg := &sync.WaitGroup{} + for i := 0; i < workerPoolSize; i++ { + wg.Add(1) + go func() { + for i := 0; i < testnet.LoadTxBatchSize; i++ { + tx, err := payload.NewBytes(&payload.Payload{ + Id: id, + Size: uint64(testnet.LoadTxSizeBytes), + Rate: uint64(testnet.LoadTxBatchSize), + Connections: uint64(testnet.LoadTxConnections), + }) + if err != nil { + panic(fmt.Sprintf("Failed to generate tx: %v", err)) + } + select { + case txCh <- tx: + case <-ctx.Done(): + return + } + } + wg.Done() + }() + } + wg.Wait() +} + +// loadProcess processes transactions by sending transactions received on the txCh +// to the client. +func loadProcess(ctx context.Context, txCh <-chan types.Tx, chSuccess chan<- struct{}, client *rpchttp.HTTP) { var err error - for tx := range chTx { - node := testnet.RandomNode() - client, ok := clients[node.Name] - if !ok { - client, err = node.Client() - if err != nil { - continue - } - - // check that the node is up - _, err = client.Health(ctx) - if err != nil { - continue - } - - clients[node.Name] = client - } - + s := struct{}{} + for tx := range txCh { if _, err = client.BroadcastTxSync(ctx, tx); err != nil { continue } - chSuccess <- tx + chSuccess <- s } } diff --git a/test/e2e/runner/main.go b/test/e2e/runner/main.go index fdfec7bbe..3e9e2c66c 100644 --- a/test/e2e/runner/main.go +++ b/test/e2e/runner/main.go @@ -105,7 +105,7 @@ func NewCLI() *CLI { ctx, loadCancel := context.WithCancel(context.Background()) defer loadCancel() go func() { - err := Load(ctx, cli.testnet, 1) + err := Load(ctx, cli.testnet) if err != nil { logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error())) } @@ -216,20 +216,10 @@ func NewCLI() *CLI { }) cli.root.AddCommand(&cobra.Command{ - Use: "load [multiplier]", - Args: cobra.MaximumNArgs(1), + Use: "load", Short: "Generates transaction load until the command is canceled", RunE: func(cmd *cobra.Command, args []string) (err error) { - m := 1 - - if len(args) == 1 { - m, err = strconv.Atoi(args[0]) - if err != nil { - return err - } - } - - return Load(context.Background(), cli.testnet, m) + return Load(context.Background(), cli.testnet) }, }) @@ -312,7 +302,7 @@ Does not run any perbutations. ctx, loadCancel := context.WithCancel(context.Background()) defer loadCancel() go func() { - err := Load(ctx, cli.testnet, 1) + err := Load(ctx, cli.testnet) if err != nil { logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error())) }