mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-15 17:22:50 +00:00
Compare commits
9 Commits
master
...
sergio/e2e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b65c906b5 | ||
|
|
51685158fe | ||
|
|
dfeaa62ca9 | ||
|
|
3faddd4c9a | ||
|
|
07fbed5214 | ||
|
|
8db919637b | ||
|
|
f83db7ff8c | ||
|
|
f50a32aa65 | ||
|
|
fdf3449fff |
@@ -106,7 +106,7 @@ type Node struct {
|
||||
// The testnet generation must be deterministic, since it is generated
|
||||
// separately by the runner and the test cases. For this reason, testnets use a
|
||||
// random seed to generate e.g. keys.
|
||||
func LoadTestnet(file string) (*Testnet, error) {
|
||||
func LoadTestnet(file string, seed int64) (*Testnet, error) {
|
||||
manifest, err := LoadManifest(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -124,7 +124,7 @@ func LoadTestnet(file string) (*Testnet, error) {
|
||||
}
|
||||
|
||||
ipGen := newIPGenerator(ipNet)
|
||||
keyGen := newKeyGenerator(randomSeed)
|
||||
keyGen := newKeyGenerator(seed)
|
||||
proxyPortGen := newPortGenerator(proxyPortFirst)
|
||||
|
||||
testnet := &Testnet{
|
||||
@@ -462,9 +462,13 @@ func (n Node) AddressRPC() string {
|
||||
return fmt.Sprintf("%v:26657", ip)
|
||||
}
|
||||
|
||||
func NewClient(ip string, port uint32) (*rpchttp.HTTP, error) {
|
||||
return rpchttp.New(fmt.Sprintf("http://%v:%v", ip, port))
|
||||
}
|
||||
|
||||
// Client returns an RPC client for a node.
|
||||
func (n Node) Client() (*rpchttp.HTTP, error) {
|
||||
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort))
|
||||
return NewClient("127.0.0.1", n.ProxyPort)
|
||||
}
|
||||
|
||||
// Stateless returns true if the node is either a seed node or a light node
|
||||
|
||||
@@ -16,13 +16,20 @@ import (
|
||||
|
||||
// Load generates transactions against the network until the given context is
|
||||
// canceled.
|
||||
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet) error {
|
||||
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet, ips []string) error {
|
||||
// Since transactions are executed across all nodes in the network, we need
|
||||
// to reduce transaction load for larger networks to avoid using too much
|
||||
// CPU. This gives high-throughput small networks and low-throughput large ones.
|
||||
// This also limits the number of TCP connections, since each worker has
|
||||
// a connection to all nodes.
|
||||
concurrency := len(testnet.Nodes) * 2
|
||||
var nNodes int
|
||||
if testnet == nil {
|
||||
nNodes = len(ips)
|
||||
} else {
|
||||
nNodes = len(testnet.Nodes)
|
||||
}
|
||||
|
||||
concurrency := nNodes * 20
|
||||
if concurrency > 32 {
|
||||
concurrency = 32
|
||||
}
|
||||
@@ -33,17 +40,21 @@ func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Tes
|
||||
defer cancel()
|
||||
|
||||
// Spawn job generator and processors.
|
||||
txSize := 1024
|
||||
if testnet != nil {
|
||||
txSize = testnet.TxSize
|
||||
}
|
||||
logger.Info("starting transaction load",
|
||||
"workers", concurrency,
|
||||
"nodes", len(testnet.Nodes),
|
||||
"tx", testnet.TxSize)
|
||||
"nodes", nNodes,
|
||||
"tx", txSize)
|
||||
|
||||
started := time.Now()
|
||||
|
||||
go loadGenerate(ctx, r, chTx, testnet.TxSize, len(testnet.Nodes))
|
||||
go loadGenerate(ctx, r, chTx, txSize, nNodes)
|
||||
|
||||
for w := 0; w < concurrency; w++ {
|
||||
go loadProcess(ctx, testnet, chTx, chSuccess)
|
||||
go loadProcess(ctx, logger, testnet, ips, chTx, chSuccess)
|
||||
}
|
||||
|
||||
// Montior transaction to ensure load propagates to the network
|
||||
@@ -132,10 +143,7 @@ func loadGenerateWaitTime(r *rand.Rand, size int) time.Duration {
|
||||
return time.Duration(baseJitter + sizeJitter)
|
||||
}
|
||||
|
||||
// loadProcess processes transactions
|
||||
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
|
||||
// Each worker gets its own client to each usable node, which
|
||||
// allows for some concurrency while still bounding it.
|
||||
func prepareClientsTestnet(testnet *e2e.Testnet) []*rpchttp.HTTP {
|
||||
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
|
||||
|
||||
for idx := range testnet.Nodes {
|
||||
@@ -154,6 +162,34 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
|
||||
|
||||
clients = append(clients, client)
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
func prepareClientsIps(ips []string) []*rpchttp.HTTP {
|
||||
clients := make([]*rpchttp.HTTP, 0, len(ips))
|
||||
|
||||
for _, ip := range ips {
|
||||
client, err := e2e.NewClient(ip, 26657) // Port is hard-coded for the moment
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
clients = append(clients, client)
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
// loadProcess processes transactions
|
||||
func loadProcess(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ips []string, chTx <-chan types.Tx, chSuccess chan<- int) {
|
||||
// Each worker gets its own client to each usable node, which
|
||||
// allows for some concurrency while still bounding it.
|
||||
|
||||
var clients []*rpchttp.HTTP
|
||||
if testnet == nil {
|
||||
clients = prepareClientsIps(ips)
|
||||
} else {
|
||||
clients = prepareClientsTestnet(testnet)
|
||||
}
|
||||
|
||||
if len(clients) == 0 {
|
||||
panic("no clients to process load")
|
||||
@@ -177,12 +213,18 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
|
||||
client := clientRing.Value.(*rpchttp.HTTP)
|
||||
|
||||
if status, err := client.Status(ctx); err != nil {
|
||||
logger.Error("problem with client status",
|
||||
"err", err)
|
||||
continue
|
||||
} else if status.SyncInfo.CatchingUp {
|
||||
logger.Error("problem catching up")
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
|
||||
logger.Error("problem broadcasting txsync",
|
||||
"tx", tx,
|
||||
"err", err)
|
||||
continue
|
||||
}
|
||||
successes++
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@@ -31,9 +32,11 @@ func main() {
|
||||
|
||||
// CLI is the Cobra-based command-line interface.
|
||||
type CLI struct {
|
||||
root *cobra.Command
|
||||
testnet *e2e.Testnet
|
||||
preserve bool
|
||||
root *cobra.Command
|
||||
testnet *e2e.Testnet
|
||||
ips []string
|
||||
seedDelta int
|
||||
preserve bool
|
||||
}
|
||||
|
||||
// NewCLI sets up the CLI.
|
||||
@@ -45,16 +48,30 @@ func NewCLI(logger log.Logger) *CLI {
|
||||
SilenceUsage: true,
|
||||
SilenceErrors: true, // we'll output them ourselves in Run()
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
file, err := cmd.Flags().GetString("file")
|
||||
ips, err := cmd.Flags().GetString("ip-list")
|
||||
if err != nil {
|
||||
// If flag is absent, no error is returned, but the default value (empty string)
|
||||
return err
|
||||
}
|
||||
testnet, err := e2e.LoadTestnet(file)
|
||||
cli.seedDelta, err = cmd.Flags().GetInt("seed-delta")
|
||||
seed := randomSeed + cli.seedDelta // nolint: gosec
|
||||
if len(ips) == 0 {
|
||||
file, err := cmd.Flags().GetString("file")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
testnet, err := e2e.LoadTestnet(file, int64(seed))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cli.testnet = testnet
|
||||
return nil
|
||||
}
|
||||
cli.ips = strings.Split(ips, ",")
|
||||
if err != nil {
|
||||
// If flag is absent, no error is returned, but the default value (empty string)
|
||||
return err
|
||||
}
|
||||
|
||||
cli.testnet = testnet
|
||||
return nil
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) (err error) {
|
||||
@@ -84,7 +101,7 @@ func NewCLI(logger log.Logger) *CLI {
|
||||
lctx, loadCancel := context.WithCancel(ctx)
|
||||
defer loadCancel()
|
||||
go func() {
|
||||
chLoadResult <- Load(lctx, logger, r, cli.testnet)
|
||||
chLoadResult <- Load(lctx, logger, r, cli.testnet, nil)
|
||||
}()
|
||||
startAt := time.Now()
|
||||
if err = Start(ctx, logger, cli.testnet); err != nil {
|
||||
@@ -142,7 +159,8 @@ func NewCLI(logger log.Logger) *CLI {
|
||||
}
|
||||
|
||||
cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest")
|
||||
_ = cli.root.MarkPersistentFlagRequired("file")
|
||||
cli.root.PersistentFlags().StringP("ip-list", "i", "", "Comma-separated list of ip addresses for load generation")
|
||||
cli.root.PersistentFlags().IntP("seed-delta", "s", 0, "Interger to be added to the initial hard-coded seed")
|
||||
|
||||
cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false,
|
||||
"Preserves the running of the test net after tests are completed")
|
||||
@@ -225,8 +243,9 @@ func NewCLI(logger log.Logger) *CLI {
|
||||
return Load(
|
||||
cmd.Context(),
|
||||
logger,
|
||||
rand.New(rand.NewSource(randomSeed)), // nolint: gosec
|
||||
rand.New(rand.NewSource(randomSeed+int64(cli.seedDelta))), // nolint: gosec
|
||||
cli.testnet,
|
||||
cli.ips,
|
||||
)
|
||||
},
|
||||
})
|
||||
@@ -328,7 +347,7 @@ Does not run any perbutations.
|
||||
lctx, loadCancel := context.WithCancel(ctx)
|
||||
defer loadCancel()
|
||||
go func() {
|
||||
chLoadResult <- Load(lctx, logger, r, cli.testnet)
|
||||
chLoadResult <- Load(lctx, logger, r, cli.testnet, nil)
|
||||
}()
|
||||
|
||||
if err := Start(ctx, logger, cli.testnet); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user