Compare commits

...

9 Commits

Author SHA1 Message Date
William Banfield
3b65c906b5 use random seed to power node generator 2022-06-14 14:10:28 -04:00
Sergio Mena
51685158fe added logs to transaction processing 2022-05-31 17:37:52 +02:00
Sergio Mena
dfeaa62ca9 Added optional seed-delta parameter 2022-05-31 17:34:13 +02:00
Sergio Mena
3faddd4c9a Set default tx size 2022-05-31 15:29:44 +02:00
Sergio Mena
07fbed5214 Increase concurrency 2022-05-31 14:02:51 +02:00
Sergio Mena
8db919637b Changed hard-coded port to Tendermint's value in the default config 2022-05-27 15:34:33 +02:00
Sergio Mena
f83db7ff8c Various fixes 2022-05-27 15:17:46 +02:00
Sergio Mena
f50a32aa65 remove printf 2022-05-27 14:49:53 +02:00
Sergio Mena
fdf3449fff New flag 'ip-list' for the 'load' command in e2e runner 2022-05-27 14:49:53 +02:00
3 changed files with 89 additions and 24 deletions

View File

@@ -106,7 +106,7 @@ type Node struct {
// The testnet generation must be deterministic, since it is generated
// separately by the runner and the test cases. For this reason, testnets use a
// random seed to generate e.g. keys.
func LoadTestnet(file string) (*Testnet, error) {
func LoadTestnet(file string, seed int64) (*Testnet, error) {
manifest, err := LoadManifest(file)
if err != nil {
return nil, err
@@ -124,7 +124,7 @@ func LoadTestnet(file string) (*Testnet, error) {
}
ipGen := newIPGenerator(ipNet)
keyGen := newKeyGenerator(randomSeed)
keyGen := newKeyGenerator(seed)
proxyPortGen := newPortGenerator(proxyPortFirst)
testnet := &Testnet{
@@ -462,9 +462,13 @@ func (n Node) AddressRPC() string {
return fmt.Sprintf("%v:26657", ip)
}
func NewClient(ip string, port uint32) (*rpchttp.HTTP, error) {
return rpchttp.New(fmt.Sprintf("http://%v:%v", ip, port))
}
// Client returns an RPC client for a node.
func (n Node) Client() (*rpchttp.HTTP, error) {
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort))
return NewClient("127.0.0.1", n.ProxyPort)
}
// Stateless returns true if the node is either a seed node or a light node

View File

@@ -16,13 +16,20 @@ import (
// Load generates transactions against the network until the given context is
// canceled.
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet) error {
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet, ips []string) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
// This also limits the number of TCP connections, since each worker has
// a connection to all nodes.
concurrency := len(testnet.Nodes) * 2
var nNodes int
if testnet == nil {
nNodes = len(ips)
} else {
nNodes = len(testnet.Nodes)
}
concurrency := nNodes * 20
if concurrency > 32 {
concurrency = 32
}
@@ -33,17 +40,21 @@ func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Tes
defer cancel()
// Spawn job generator and processors.
txSize := 1024
if testnet != nil {
txSize = testnet.TxSize
}
logger.Info("starting transaction load",
"workers", concurrency,
"nodes", len(testnet.Nodes),
"tx", testnet.TxSize)
"nodes", nNodes,
"tx", txSize)
started := time.Now()
go loadGenerate(ctx, r, chTx, testnet.TxSize, len(testnet.Nodes))
go loadGenerate(ctx, r, chTx, txSize, nNodes)
for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
go loadProcess(ctx, logger, testnet, ips, chTx, chSuccess)
}
// Montior transaction to ensure load propagates to the network
@@ -132,10 +143,7 @@ func loadGenerateWaitTime(r *rand.Rand, size int) time.Duration {
return time.Duration(baseJitter + sizeJitter)
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
func prepareClientsTestnet(testnet *e2e.Testnet) []*rpchttp.HTTP {
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
for idx := range testnet.Nodes {
@@ -154,6 +162,34 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
clients = append(clients, client)
}
return clients
}
func prepareClientsIps(ips []string) []*rpchttp.HTTP {
clients := make([]*rpchttp.HTTP, 0, len(ips))
for _, ip := range ips {
client, err := e2e.NewClient(ip, 26657) // Port is hard-coded for the moment
if err != nil {
continue
}
clients = append(clients, client)
}
return clients
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ips []string, chTx <-chan types.Tx, chSuccess chan<- int) {
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
var clients []*rpchttp.HTTP
if testnet == nil {
clients = prepareClientsIps(ips)
} else {
clients = prepareClientsTestnet(testnet)
}
if len(clients) == 0 {
panic("no clients to process load")
@@ -177,12 +213,18 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
client := clientRing.Value.(*rpchttp.HTTP)
if status, err := client.Status(ctx); err != nil {
logger.Error("problem with client status",
"err", err)
continue
} else if status.SyncInfo.CatchingUp {
logger.Error("problem catching up")
continue
}
if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
logger.Error("problem broadcasting txsync",
"tx", tx,
"err", err)
continue
}
successes++

View File

@@ -7,6 +7,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/spf13/cobra"
@@ -31,9 +32,11 @@ func main() {
// CLI is the Cobra-based command-line interface.
type CLI struct {
root *cobra.Command
testnet *e2e.Testnet
preserve bool
root *cobra.Command
testnet *e2e.Testnet
ips []string
seedDelta int
preserve bool
}
// NewCLI sets up the CLI.
@@ -45,16 +48,30 @@ func NewCLI(logger log.Logger) *CLI {
SilenceUsage: true,
SilenceErrors: true, // we'll output them ourselves in Run()
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
file, err := cmd.Flags().GetString("file")
ips, err := cmd.Flags().GetString("ip-list")
if err != nil {
// If flag is absent, no error is returned, but the default value (empty string)
return err
}
testnet, err := e2e.LoadTestnet(file)
cli.seedDelta, err = cmd.Flags().GetInt("seed-delta")
seed := randomSeed + cli.seedDelta // nolint: gosec
if len(ips) == 0 {
file, err := cmd.Flags().GetString("file")
if err != nil {
return err
}
testnet, err := e2e.LoadTestnet(file, int64(seed))
if err != nil {
return err
}
cli.testnet = testnet
return nil
}
cli.ips = strings.Split(ips, ",")
if err != nil {
// If flag is absent, no error is returned, but the default value (empty string)
return err
}
cli.testnet = testnet
return nil
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
@@ -84,7 +101,7 @@ func NewCLI(logger log.Logger) *CLI {
lctx, loadCancel := context.WithCancel(ctx)
defer loadCancel()
go func() {
chLoadResult <- Load(lctx, logger, r, cli.testnet)
chLoadResult <- Load(lctx, logger, r, cli.testnet, nil)
}()
startAt := time.Now()
if err = Start(ctx, logger, cli.testnet); err != nil {
@@ -142,7 +159,8 @@ func NewCLI(logger log.Logger) *CLI {
}
cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest")
_ = cli.root.MarkPersistentFlagRequired("file")
cli.root.PersistentFlags().StringP("ip-list", "i", "", "Comma-separated list of ip addresses for load generation")
cli.root.PersistentFlags().IntP("seed-delta", "s", 0, "Interger to be added to the initial hard-coded seed")
cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false,
"Preserves the running of the test net after tests are completed")
@@ -225,8 +243,9 @@ func NewCLI(logger log.Logger) *CLI {
return Load(
cmd.Context(),
logger,
rand.New(rand.NewSource(randomSeed)), // nolint: gosec
rand.New(rand.NewSource(randomSeed+int64(cli.seedDelta))), // nolint: gosec
cli.testnet,
cli.ips,
)
},
})
@@ -328,7 +347,7 @@ Does not run any perbutations.
lctx, loadCancel := context.WithCancel(ctx)
defer loadCancel()
go func() {
chLoadResult <- Load(lctx, logger, r, cli.testnet)
chLoadResult <- Load(lctx, logger, r, cli.testnet, nil)
}()
if err := Start(ctx, logger, cli.testnet); err != nil {