Files
tendermint/test/e2e/runner/load.go
Erik Grinaker 250c3aa92e test: add end-to-end testing framework (#5435)
Partial fix for #5291. For details, see [README.md](https://github.com/tendermint/tendermint/blob/erik/e2e-tests/test/e2e/README.md) and [RFC-001](https://github.com/tendermint/tendermint/blob/master/docs/rfc/rfc-001-end-to-end-testing.md).

This only includes a single test case under `test/e2e/tests/`, as a proof of concept - additional test cases will be submitted separately. A randomized testnet generator will also be submitted separately, there a currently just a handful of static testnets under `test/e2e/networks/`. This will eventually replace the current P2P tests and run in CI.
2020-10-05 09:35:01 +00:00

107 lines
2.6 KiB
Go

package main
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math"
"time"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/types"
)
// Load generates transactions against the network until the given
// context is cancelled.
func Load(ctx context.Context, testnet *e2e.Testnet) error {
concurrency := 50
initialTimeout := 1 * time.Minute
stallTimeout := 15 * time.Second
chTx := make(chan types.Tx)
chSuccess := make(chan types.Tx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Spawn job generator and processors.
logger.Info("Starting transaction load...")
started := time.Now()
go loadGenerate(ctx, chTx)
for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
}
// Monitor successful transactions, and abort on stalls.
success := 0
timeout := initialTimeout
for {
select {
case <-chSuccess:
success++
timeout = stallTimeout
case <-time.After(timeout):
return fmt.Errorf("unable to submit transactions for %v", timeout)
case <-ctx.Done():
if success == 0 {
return errors.New("failed to submit any transactions")
}
logger.Info(fmt.Sprintf("Ending transaction load after %v txs (%.1f tx/s)...",
success, float64(success)/time.Since(started).Seconds()))
return nil
}
}
}
// loadGenerate generates jobs until the context is cancelled
func loadGenerate(ctx context.Context, chTx chan<- types.Tx) {
for i := 0; i < math.MaxInt64; i++ {
// We keep generating the same 1000 keys over and over, with different values.
// This gives a reasonable load without putting too much data in the app.
id := i % 1000
bz := make([]byte, 2048) // 4kb hex-encoded
_, err := rand.Read(bz)
if err != nil {
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
}
tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
select {
case chTx <- tx:
time.Sleep(10 * time.Millisecond)
case <-ctx.Done():
close(chTx)
return
}
}
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
// Each worker gets its own client to each node, which allows for some
// concurrency while still bounding it.
clients := map[string]*rpchttp.HTTP{}
var err error
for tx := range chTx {
node := testnet.RandomNode()
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
clients[node.Name] = client
}
_, err = client.BroadcastTxCommit(ctx, tx)
if err != nil {
continue
}
chSuccess <- tx
}
}