mirror of
https://github.com/tendermint/tendermint.git
synced 2026-06-03 04:46:27 +00:00
These are mostly the timeouts that I think we're still hitting in CI. At this point, the tests (on master) pass on my local machine (which is quite beefy) so I think this is just the first in (perhaps?) a sequence of changes that attempt to change timeouts and load patterns so that the tests pass in CI more reliably.
182 lines
4.7 KiB
Go
182 lines
4.7 KiB
Go
package main
|
|
|
|
import (
|
|
"container/ring"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
|
|
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
|
|
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
// Load generates transactions against the network until the given context is
|
|
// canceled.
|
|
func Load(ctx context.Context, testnet *e2e.Testnet) error {
|
|
// Since transactions are executed across all nodes in the network, we need
|
|
// to reduce transaction load for larger networks to avoid using too much
|
|
// CPU. This gives high-throughput small networks and low-throughput large ones.
|
|
// This also limits the number of TCP connections, since each worker has
|
|
// a connection to all nodes.
|
|
concurrency := 64 / len(testnet.Nodes)
|
|
if concurrency == 0 {
|
|
concurrency = 1
|
|
}
|
|
|
|
chTx := make(chan types.Tx)
|
|
chSuccess := make(chan int) // success counts per iteration
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Spawn job generator and processors.
|
|
logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
|
|
started := time.Now()
|
|
|
|
go loadGenerate(ctx, chTx, testnet.TxSize)
|
|
|
|
for w := 0; w < concurrency; w++ {
|
|
go loadProcess(ctx, testnet, chTx, chSuccess)
|
|
}
|
|
|
|
// Montior transaction to ensure load propagates to the network
|
|
//
|
|
// This loop doesn't check or time out for stalls, since a stall here just
|
|
// aborts the load generator sooner and could obscure backpressure
|
|
// from the test harness, and there are other checks for
|
|
// stalls in the framework. Ideally we should monitor latency as a guide
|
|
// for when to give up, but we don't have a good way to track that yet.
|
|
success := 0
|
|
for {
|
|
select {
|
|
case numSeen := <-chSuccess:
|
|
success += numSeen
|
|
case <-ctx.Done():
|
|
if success == 0 {
|
|
return errors.New("failed to submit any transactions")
|
|
}
|
|
rate := float64(success) / time.Since(started).Seconds()
|
|
|
|
logger.Info("ending transaction load",
|
|
"dur_secs", time.Since(started).Seconds(),
|
|
"txns", success,
|
|
"rate", rate,
|
|
"slow", rate < 1)
|
|
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// loadGenerate generates jobs until the context is canceled.
|
|
//
|
|
// The chTx has multiple consumers, thus the rate limiting of the load
|
|
// generation is primarily the result of backpressure from the
|
|
// broadcast transaction, though there is still some timer-based
|
|
// limiting.
|
|
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, size int64) {
|
|
timer := time.NewTimer(0)
|
|
defer timer.Stop()
|
|
defer close(chTx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
// We keep generating the same 100 keys over and over, with different values.
|
|
// This gives a reasonable load without putting too much data in the app.
|
|
id := rand.Int63() % 100 // nolint: gosec
|
|
|
|
bz := make([]byte, size)
|
|
_, err := rand.Read(bz) // nolint: gosec
|
|
if err != nil {
|
|
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
|
|
}
|
|
tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chTx <- tx:
|
|
// sleep for a bit before sending the
|
|
// next transaction.
|
|
waitTime := (50 * time.Millisecond) + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
|
|
timer.Reset(waitTime)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// loadProcess processes transactions
|
|
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
|
|
// Each worker gets its own client to each usable node, which
|
|
// allows for some concurrency while still bounding it.
|
|
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
|
|
|
|
for idx := range testnet.Nodes {
|
|
// Construct a list of usable nodes for the creating
|
|
// load. Don't send load through seed nodes because
|
|
// they do not provide the RPC endpoints required to
|
|
// broadcast transaction.
|
|
if testnet.Nodes[idx].Mode == e2e.ModeSeed {
|
|
continue
|
|
}
|
|
|
|
client, err := testnet.Nodes[idx].Client()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
clients = append(clients, client)
|
|
}
|
|
|
|
if len(clients) == 0 {
|
|
panic("no clients to process load")
|
|
}
|
|
|
|
// Put the clients in a ring so they can be used in a
|
|
// round-robin fashion.
|
|
clientRing := ring.New(len(clients))
|
|
for idx := range clients {
|
|
clientRing.Value = clients[idx]
|
|
clientRing = clientRing.Next()
|
|
}
|
|
|
|
successes := 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case tx := <-chTx:
|
|
clientRing = clientRing.Next()
|
|
client := clientRing.Value.(*rpchttp.HTTP)
|
|
|
|
if status, err := client.Status(ctx); err != nil {
|
|
continue
|
|
} else if status.SyncInfo.CatchingUp {
|
|
continue
|
|
}
|
|
|
|
if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
|
|
continue
|
|
}
|
|
successes++
|
|
|
|
select {
|
|
case chSuccess <- successes:
|
|
successes = 0 // reset counter for the next iteration
|
|
continue
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|