mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-08 06:15:33 +00:00
e2e: test runner generates loadtime formatted transactions. (#9779)
This commit is contained in:
@@ -2,4 +2,3 @@
|
||||
[node.validator02]
|
||||
[node.validator03]
|
||||
[node.validator04]
|
||||
|
||||
|
||||
@@ -68,6 +68,10 @@ type Manifest struct {
|
||||
ProcessProposalDelay time.Duration `toml:"process_proposal_delay"`
|
||||
CheckTxDelay time.Duration `toml:"check_tx_delay"`
|
||||
// TODO: add vote extension and finalize block delay (@cmwaters)
|
||||
|
||||
LoadTxSizeBytes int `toml:"load_tx_size_bytes"`
|
||||
LoadTxBatchSize int `toml:"load_tx_batch_size"`
|
||||
LoadTxConnections int `toml:"load_tx_connections"`
|
||||
}
|
||||
|
||||
// ManifestNode represents a node in a testnet manifest.
|
||||
@@ -145,6 +149,11 @@ type ManifestNode struct {
|
||||
// pause: temporarily pauses (freezes) the node
|
||||
// restart: restarts the node, shutting it down with SIGTERM
|
||||
Perturb []string `toml:"perturb"`
|
||||
|
||||
// SendNoLoad determines if the e2e test should send load to this node.
|
||||
// It defaults to false so unless the configured, the node will
|
||||
// receive load.
|
||||
SendNoLoad bool `toml:"send_no_laod"`
|
||||
}
|
||||
|
||||
// Save saves the testnet manifest to a file.
|
||||
|
||||
@@ -21,6 +21,10 @@ import (
|
||||
const (
|
||||
randomSeed int64 = 2308084734268
|
||||
proxyPortFirst uint32 = 5701
|
||||
|
||||
defaultBatchSize = 2
|
||||
defaultConnections = 1
|
||||
defaultTxSizeBytes = 1024
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -63,6 +67,9 @@ type Testnet struct {
|
||||
Nodes []*Node
|
||||
KeyType string
|
||||
Evidence int
|
||||
LoadTxSizeBytes int
|
||||
LoadTxBatchSize int
|
||||
LoadTxConnections int
|
||||
ABCIProtocol string
|
||||
PrepareProposalDelay time.Duration
|
||||
ProcessProposalDelay time.Duration
|
||||
@@ -92,6 +99,9 @@ type Node struct {
|
||||
Seeds []*Node
|
||||
PersistentPeers []*Node
|
||||
Perturbations []Perturbation
|
||||
|
||||
// SendNoLoad determines if the e2e test should send load to this node.
|
||||
SendNoLoad bool
|
||||
}
|
||||
|
||||
// LoadTestnet loads a testnet from a manifest file, using the filename to
|
||||
@@ -119,6 +129,9 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
|
||||
ValidatorUpdates: map[int64]map[*Node]int64{},
|
||||
Nodes: []*Node{},
|
||||
Evidence: manifest.Evidence,
|
||||
LoadTxSizeBytes: manifest.LoadTxSizeBytes,
|
||||
LoadTxBatchSize: manifest.LoadTxBatchSize,
|
||||
LoadTxConnections: manifest.LoadTxConnections,
|
||||
ABCIProtocol: manifest.ABCIProtocol,
|
||||
PrepareProposalDelay: manifest.PrepareProposalDelay,
|
||||
ProcessProposalDelay: manifest.ProcessProposalDelay,
|
||||
@@ -133,6 +146,15 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
|
||||
if testnet.ABCIProtocol == "" {
|
||||
testnet.ABCIProtocol = string(ProtocolBuiltin)
|
||||
}
|
||||
if testnet.LoadTxConnections == 0 {
|
||||
testnet.LoadTxConnections = defaultConnections
|
||||
}
|
||||
if testnet.LoadTxBatchSize == 0 {
|
||||
testnet.LoadTxBatchSize = defaultBatchSize
|
||||
}
|
||||
if testnet.LoadTxSizeBytes == 0 {
|
||||
testnet.LoadTxSizeBytes = defaultTxSizeBytes
|
||||
}
|
||||
|
||||
// Set up nodes, in alphabetical order (IPs and ports get same order).
|
||||
nodeNames := []string{}
|
||||
@@ -167,6 +189,7 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
|
||||
SnapshotInterval: nodeManifest.SnapshotInterval,
|
||||
RetainBlocks: nodeManifest.RetainBlocks,
|
||||
Perturbations: []Perturbation{},
|
||||
SendNoLoad: nodeManifest.SendNoLoad,
|
||||
}
|
||||
if node.StartAt == testnet.InitialHeight {
|
||||
node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this
|
||||
|
||||
@@ -2,47 +2,48 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
|
||||
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
|
||||
"github.com/tendermint/tendermint/test/loadtime/payload"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const workerPoolSize = 16
|
||||
|
||||
// Load generates transactions against the network until the given context is
|
||||
// canceled. A multiplier of greater than one can be supplied if load needs to
|
||||
// be generated beyond a minimum amount.
|
||||
func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
|
||||
// Since transactions are executed across all nodes in the network, we need
|
||||
// to reduce transaction load for larger networks to avoid using too much
|
||||
// CPU. This gives high-throughput small networks and low-throughput large ones.
|
||||
// This also limits the number of TCP connections, since each worker has
|
||||
// a connection to all nodes.
|
||||
concurrency := 64 / len(testnet.Nodes)
|
||||
if concurrency == 0 {
|
||||
concurrency = 1
|
||||
}
|
||||
// canceled.
|
||||
func Load(ctx context.Context, testnet *e2e.Testnet) error {
|
||||
initialTimeout := 1 * time.Minute
|
||||
stallTimeout := 30 * time.Second
|
||||
|
||||
chTx := make(chan types.Tx)
|
||||
chSuccess := make(chan types.Tx)
|
||||
chSuccess := make(chan struct{})
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Spawn job generator and processors.
|
||||
logger.Info("load", "msg", log.NewLazySprintf("Starting transaction load (%v workers)...", concurrency))
|
||||
started := time.Now()
|
||||
u := [16]byte(uuid.New()) // generate run ID on startup
|
||||
|
||||
go loadGenerate(ctx, chTx, multiplier)
|
||||
txCh := make(chan types.Tx)
|
||||
go loadGenerate(ctx, txCh, testnet, u[:])
|
||||
|
||||
for w := 0; w < concurrency; w++ {
|
||||
go loadProcess(ctx, testnet, chTx, chSuccess)
|
||||
for _, n := range testnet.Nodes {
|
||||
if n.SendNoLoad {
|
||||
continue
|
||||
}
|
||||
|
||||
for w := 0; w < testnet.LoadTxConnections; w++ {
|
||||
cli, err := n.Client()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go loadProcess(ctx, txCh, chSuccess, cli)
|
||||
}
|
||||
}
|
||||
|
||||
// Monitor successful transactions, and abort on stalls.
|
||||
@@ -67,58 +68,68 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
|
||||
}
|
||||
|
||||
// loadGenerate generates jobs until the context is canceled
|
||||
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
|
||||
for i := 0; i < math.MaxInt64; i++ {
|
||||
// We keep generating the same 1000 keys over and over, with different values.
|
||||
// This gives a reasonable load without putting too much data in the app.
|
||||
id := i % 1000
|
||||
|
||||
bz := make([]byte, 1024) // 1kb hex-encoded
|
||||
_, err := rand.Read(bz)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
|
||||
}
|
||||
tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
|
||||
|
||||
func loadGenerate(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) {
|
||||
t := time.NewTimer(0)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case chTx <- tx:
|
||||
time.Sleep(time.Second / time.Duration(multiplier))
|
||||
|
||||
case <-t.C:
|
||||
case <-ctx.Done():
|
||||
close(chTx)
|
||||
close(txCh)
|
||||
return
|
||||
}
|
||||
t.Reset(time.Second)
|
||||
|
||||
// A context with a timeout is created here to time the createTxBatch
|
||||
// function out. If createTxBatch has not completed its work by the time
|
||||
// the next batch is set to be sent out, then the context is cancled so that
|
||||
// the current batch is halted, allowing the next batch to begin.
|
||||
tctx, cf := context.WithTimeout(ctx, time.Second)
|
||||
createTxBatch(tctx, txCh, testnet, id)
|
||||
cf()
|
||||
}
|
||||
}
|
||||
|
||||
// loadProcess processes transactions
|
||||
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
|
||||
// Each worker gets its own client to each node, which allows for some
|
||||
// concurrency while still bounding it.
|
||||
clients := map[string]*rpchttp.HTTP{}
|
||||
// createTxBatch creates new transactions and sends them into the txCh. createTxBatch
|
||||
// returns when either a full batch has been sent to the txCh or the context
|
||||
// is canceled.
|
||||
func createTxBatch(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) {
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < workerPoolSize; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for i := 0; i < testnet.LoadTxBatchSize; i++ {
|
||||
tx, err := payload.NewBytes(&payload.Payload{
|
||||
Id: id,
|
||||
Size: uint64(testnet.LoadTxSizeBytes),
|
||||
Rate: uint64(testnet.LoadTxBatchSize),
|
||||
Connections: uint64(testnet.LoadTxConnections),
|
||||
})
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to generate tx: %v", err))
|
||||
}
|
||||
|
||||
select {
|
||||
case txCh <- tx:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// loadProcess processes transactions by sending transactions received on the txCh
|
||||
// to the client.
|
||||
func loadProcess(ctx context.Context, txCh <-chan types.Tx, chSuccess chan<- struct{}, client *rpchttp.HTTP) {
|
||||
var err error
|
||||
for tx := range chTx {
|
||||
node := testnet.RandomNode()
|
||||
client, ok := clients[node.Name]
|
||||
if !ok {
|
||||
client, err = node.Client()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// check that the node is up
|
||||
_, err = client.Health(ctx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
clients[node.Name] = client
|
||||
}
|
||||
|
||||
s := struct{}{}
|
||||
for tx := range txCh {
|
||||
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
|
||||
continue
|
||||
}
|
||||
chSuccess <- tx
|
||||
chSuccess <- s
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ func NewCLI() *CLI {
|
||||
ctx, loadCancel := context.WithCancel(context.Background())
|
||||
defer loadCancel()
|
||||
go func() {
|
||||
err := Load(ctx, cli.testnet, 1)
|
||||
err := Load(ctx, cli.testnet)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
|
||||
}
|
||||
@@ -216,20 +216,10 @@ func NewCLI() *CLI {
|
||||
})
|
||||
|
||||
cli.root.AddCommand(&cobra.Command{
|
||||
Use: "load [multiplier]",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Use: "load",
|
||||
Short: "Generates transaction load until the command is canceled",
|
||||
RunE: func(cmd *cobra.Command, args []string) (err error) {
|
||||
m := 1
|
||||
|
||||
if len(args) == 1 {
|
||||
m, err = strconv.Atoi(args[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return Load(context.Background(), cli.testnet, m)
|
||||
return Load(context.Background(), cli.testnet)
|
||||
},
|
||||
})
|
||||
|
||||
@@ -312,7 +302,7 @@ Does not run any perbutations.
|
||||
ctx, loadCancel := context.WithCancel(context.Background())
|
||||
defer loadCancel()
|
||||
go func() {
|
||||
err := Load(ctx, cli.testnet, 1)
|
||||
err := Load(ctx, cli.testnet)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user