Merge branch 'feature/abci++vef' into cal/finalize-block

This commit is contained in:
Callum Waters
2022-11-16 14:46:37 +01:00
199 changed files with 6654 additions and 3376 deletions

View File

@@ -7,7 +7,27 @@ make
./build/runner -f networks/ci.toml
```
This creates and runs a testnet named `ci` under `networks/ci/` (determined by the manifest filename).
This creates and runs a testnet named `ci` under `networks/ci/`.
## Conceptual Overview
End-to-end testnets are used to test Tendermint functionality as a user would use it, by spinning up a set of nodes with various configurations and making sure the nodes and network behave correctly. The background for the E2E test suite is outlined in [RFC-001](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-066-e2e-testing.md).
The end-to-end tests can be thought of in this manner:
1. Does a certain (valid!) testnet configuration result in a block-producing network where all nodes eventually reach the latest height?
2. If so, does each node in that network satisfy all invariants specified by the Go E2E tests?
The above should hold for any arbitrary, valid network configuration, and that configuration space should be searched and tested by randomly generating testnets.
A testnet configuration is specified as a TOML testnet manifest (see below). The testnet runner uses the manifest to configure a set of Docker containers and start them in some order. The manifests can be written manually (to test specific configurations) or generated randomly by the testnet generator (to test a wide range of configuration permutations).
When running a testnet, the runner will first start the Docker nodes in some sequence, submit random transactions, and wait for the nodes to come online and the first blocks to be produced. This may involve e.g. waiting for nodes to block sync and/or state sync. If specified, it will then run any misbehaviors (e.g. double-signing) and perturbations (e.g. killing or disconnecting nodes). It then waits for the testnet to stabilize, with all nodes online and having reached the latest height.
Once the testnet stabilizes, a set of Go end-to-end tests are run against the live testnet to verify network invariants (for example that blocks are identical across nodes). These use the RPC client to interact with the network, and should consider the entire network as a black box (i.e. it should not test any network or node internals, only externally visible behavior via RPC). The tests may use the `testNode()` helper to run parallel tests against each individual testnet node, and/or inspect the full blockchain history via `fetchBlockChain()`.
The tests must take into account the network and/or node configuration, and tolerate that the network is still live and producing blocks. For example, validator tests should only run against nodes that are actually validators, and take into account the node's block retention and/or state sync configuration to not query blocks that don't exist.
## Testnet Manifests

View File

@@ -22,7 +22,6 @@ const appVersion = 1
// Application is an ABCI application for use by end-to-end tests. It is a
// simple key/value store for strings, storing data in memory and persisting
// to disk as JSON, taking state sync snapshots if requested.
type Application struct {
abci.BaseApplication
logger log.Logger
@@ -91,7 +90,7 @@ func DefaultConfig(dir string) *Config {
}
// NewApplication creates the application.
func NewApplication(cfg *Config) (*Application, error) {
func NewApplication(cfg *Config) (abci.Application, error) {
state, err := NewState(cfg.Dir, cfg.PersistInterval)
if err != nil {
return nil, err

100
test/e2e/app/sync_app.go Normal file
View File

@@ -0,0 +1,100 @@
package app
import (
"context"
"sync"
abci "github.com/tendermint/tendermint/abci/types"
)
// SyncApplication wraps the e2e Application, managing its own synchronization. This
// allows it to be called from an unsynchronized local client, as it is
// implemented in a thread-safe way.
type SyncApplication struct {
mtx sync.RWMutex
app *Application
}
var _ abci.Application = (*SyncApplication)(nil)
func NewSyncApplication(cfg *Config) (abci.Application, error) {
app, err := NewApplication(cfg)
if err != nil {
return nil, err
}
return &SyncApplication{
app: app.(*Application),
}, nil
}
func (app *SyncApplication) Info(ctx context.Context, req *abci.RequestInfo) (*abci.ResponseInfo, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.Info(ctx, req)
}
func (app *SyncApplication) InitChain(ctx context.Context, req *abci.RequestInitChain) (*abci.ResponseInitChain, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.InitChain(ctx, req)
}
func (app *SyncApplication) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.CheckTx(ctx, req)
}
func (app *SyncApplication) PrepareProposal(ctx context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
// app.app.PrepareProposal does not modify state
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.PrepareProposal(ctx, req)
}
func (app *SyncApplication) ProcessProposal(ctx context.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
// app.app.ProcessProposal does not modify state
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.ProcessProposal(ctx, req)
}
func (app *SyncApplication) FinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.FinalizeBlock(ctx, req)
}
func (app *SyncApplication) Commit(ctx context.Context, req *abci.RequestCommit) (*abci.ResponseCommit, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.Commit(ctx, req)
}
func (app *SyncApplication) Query(ctx context.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.Query(ctx, req)
}
func (app *SyncApplication) ApplySnapshotChunk(ctx context.Context, req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.ApplySnapshotChunk(ctx, req)
}
func (app *SyncApplication) ListSnapshots(ctx context.Context, req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) {
// Calls app.snapshots.List(), which is thread-safe.
return app.app.ListSnapshots(ctx, req)
}
func (app *SyncApplication) LoadSnapshotChunk(ctx context.Context, req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) {
// Calls app.snapshots.LoadChunk, which is thread-safe.
return app.app.LoadSnapshotChunk(ctx, req)
}
func (app *SyncApplication) OfferSnapshot(ctx context.Context, req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.OfferSnapshot(ctx, req)
}

View File

@@ -105,7 +105,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// First we generate seed nodes, starting at the initial height.
for i := 1; i <= numSeeds; i++ {
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = generateNode(
r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
r, e2e.ModeSeed, false, 0, manifest.InitialHeight, false)
}
// Next, we generate validators. We make sure a BFT quorum of validators start
@@ -120,8 +120,12 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
nextStartAt += 5
}
name := fmt.Sprintf("validator%02d", i)
syncApp := false
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
syncApp = r.Intn(100) >= 50
}
manifest.Nodes[name] = generateNode(
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
r, e2e.ModeValidator, syncApp, startAt, manifest.InitialHeight, i <= 2)
if startAt == 0 {
(*manifest.Validators)[name] = int64(30 + r.Intn(71))
@@ -149,8 +153,12 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
startAt = nextStartAt
nextStartAt += 5
}
syncApp := false
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
syncApp = r.Intn(100) >= 50
}
manifest.Nodes[fmt.Sprintf("full%02d", i)] = generateNode(
r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
r, e2e.ModeFull, syncApp, startAt, manifest.InitialHeight, false)
}
// We now set up peer discovery for nodes. Seed nodes are fully meshed with
@@ -213,10 +221,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// here, since we need to know the overall network topology and startup
// sequencing.
func generateNode(
r *rand.Rand, mode e2e.Mode, startAt int64, initialHeight int64, forceArchive bool,
r *rand.Rand, mode e2e.Mode, syncApp bool, startAt int64, initialHeight int64, forceArchive bool,
) *e2e.ManifestNode {
node := e2e.ManifestNode{
Mode: string(mode),
SyncApp: syncApp,
StartAt: startAt,
Database: nodeDatabases.Choose(r).(string),
PrivvalProtocol: nodePrivvalProtocols.Choose(r).(string),

View File

@@ -0,0 +1,26 @@
package main
import (
"fmt"
"math/rand"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// TestGenerator tests that only valid manifests are generated
func TestGenerator(t *testing.T) {
manifests, err := Generate(rand.New(rand.NewSource(randomSeed)))
require.NoError(t, err)
require.True(t, len(manifests) >= 24, "insufficient combinations %d", len(manifests))
for idx, m := range manifests {
t.Run(fmt.Sprintf("Case%04d", idx), func(t *testing.T) {
_, err := e2e.NewTestnetFromManifest(m, filepath.Join(t.TempDir(), fmt.Sprintf("Case%04d", idx)), e2e.InfrastructureData{})
require.NoError(t, err)
})
}
}

View File

@@ -60,6 +60,7 @@ perturb = ["kill"]
persistent_peers = ["validator01"]
database = "rocksdb"
abci_protocol = "builtin"
sync_app = true
perturb = ["pause"]
[node.validator05]

View File

@@ -1,5 +1,4 @@
[node.validator01]
[node.validator02]
[node.validator03]
[node.validator04]
[node.validator04]

View File

@@ -7,15 +7,17 @@ import (
"github.com/BurntSushi/toml"
"github.com/tendermint/tendermint/test/e2e/app"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// Config is the application configuration.
type Config struct {
ChainID string `toml:"chain_id"`
Listen string
Protocol string
Dir string
ChainID string `toml:"chain_id"`
Listen string `toml:"listen"`
Protocol string `toml:"protocol"`
Dir string `toml:"dir"`
Mode string `toml:"mode"`
SyncApp bool `toml:"sync_app"`
PersistInterval uint64 `toml:"persist_interval"`
SnapshotInterval uint64 `toml:"snapshot_interval"`
RetainBlocks uint64 `toml:"retain_blocks"`
@@ -23,7 +25,6 @@ type Config struct {
PrivValServer string `toml:"privval_server"`
PrivValKey string `toml:"privval_key"`
PrivValState string `toml:"privval_state"`
Misbehaviors map[string]string `toml:"misbehaviors"`
KeyType string `toml:"key_type"`
}
@@ -63,6 +64,10 @@ func (cfg Config) Validate() error {
return errors.New("chain_id parameter is required")
case cfg.Listen == "" && cfg.Protocol != "builtin":
return errors.New("listen parameter is required")
case cfg.SyncApp && cfg.Protocol != string(e2e.ProtocolBuiltin):
return errors.New("sync_app parameter is only relevant for builtin applications")
case cfg.SyncApp && cfg.Mode != string(e2e.ModeFull) && cfg.Mode != string(e2e.ModeValidator):
return errors.New("sync_app parameter is only relevant to full nodes and validators")
default:
return nil
}

View File

@@ -113,9 +113,22 @@ func startApp(cfg *Config) error {
//
// FIXME There is no way to simply load the configuration from a file, so we need to pull in Viper.
func startNode(cfg *Config) error {
app, err := app.NewApplication(cfg.App())
if err != nil {
return err
var cc proxy.ClientCreator
if cfg.SyncApp {
app, err := app.NewSyncApplication(cfg.App())
if err != nil {
return err
}
cc = proxy.NewUnsyncLocalClientCreator(app)
logger.Info("Using synchronized app with unsynchronized local client")
} else {
app, err := app.NewApplication(cfg.App())
if err != nil {
return err
}
cc = proxy.NewLocalClientCreator(app)
logger.Info("Using regular app with synchronized (regular) local client")
}
tmcfg, nodeLogger, nodeKey, err := setupNode()
@@ -126,7 +139,7 @@ func startNode(cfg *Config) error {
n, err := node.NewNode(tmcfg,
privval.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
cc,
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),

View File

@@ -0,0 +1,85 @@
package docker
import (
"bytes"
"os"
"path/filepath"
"text/template"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
)
var _ infra.Provider = &Provider{}
// Provider implements a docker-compose backed infrastructure provider.
type Provider struct {
Testnet *e2e.Testnet
}
// Setup generates the docker-compose file and write it to disk, erroring if
// any of these operations fail.
func (p *Provider) Setup() error {
compose, err := dockerComposeBytes(p.Testnet)
if err != nil {
return err
}
//nolint: gosec
// G306: Expect WriteFile permissions to be 0600 or less
err = os.WriteFile(filepath.Join(p.Testnet.Dir, "docker-compose.yml"), compose, 0644)
if err != nil {
return err
}
return nil
}
// dockerComposeBytes generates a Docker Compose config file for a testnet and returns the
// file as bytes to be written out to disk.
func dockerComposeBytes(testnet *e2e.Testnet) ([]byte, error) {
// Must use version 2 Docker Compose format, to support IPv6.
tmpl, err := template.New("docker-compose").Parse(`version: '2.4'
networks:
{{ .Name }}:
labels:
e2e: true
driver: bridge
{{- if .IPv6 }}
enable_ipv6: true
{{- end }}
ipam:
driver: default
config:
- subnet: {{ .IP }}
services:
{{- range .Nodes }}
{{ .Name }}:
labels:
e2e: true
container_name: {{ .Name }}
image: tendermint/e2e-node
{{- if eq .ABCIProtocol "builtin" }}
entrypoint: /usr/bin/entrypoint-builtin
{{- end }}
init: true
ports:
- 26656
- {{ if .ProxyPort }}{{ .ProxyPort }}:{{ end }}26657
- 6060
volumes:
- ./{{ .Name }}:/tendermint
networks:
{{ $.Name }}:
ipv{{ if $.IPv6 }}6{{ else }}4{{ end}}_address: {{ .IP }}
{{end}}`)
if err != nil {
return nil, err
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, testnet)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

View File

@@ -0,0 +1,20 @@
package infra
// Provider defines an API for manipulating the infrastructure of a
// specific set of testnet infrastructure.
type Provider interface {
// Setup generates any necessary configuration for the infrastructure
// provider during testnet setup.
Setup() error
}
// NoopProvider implements the provider interface by performing noops for every
// interface method. This may be useful if the infrastructure is managed by a
// separate process.
type NoopProvider struct {
}
func (NoopProvider) Setup() error { return nil }
var _ Provider = NoopProvider{}

View File

@@ -0,0 +1,80 @@
package e2e
import (
"encoding/json"
"fmt"
"net"
"os"
)
const (
dockerIPv4CIDR = "10.186.73.0/24"
dockerIPv6CIDR = "fd80:b10c::/48"
globalIPv4CIDR = "0.0.0.0/0"
)
// InfrastructureData contains the relevant information for a set of existing
// infrastructure that is to be used for running a testnet.
type InfrastructureData struct {
// Provider is the name of infrastructure provider backing the testnet.
// For example, 'docker' if it is running locally in a docker network or
// 'digital-ocean', 'aws', 'google', etc. if it is from a cloud provider.
Provider string `json:"provider"`
// Instances is a map of all of the machine instances on which to run
// processes for a testnet.
// The key of the map is the name of the instance, which each must correspond
// to the names of one of the testnet nodes defined in the testnet manifest.
Instances map[string]InstanceData `json:"instances"`
// Network is the CIDR notation range of IP addresses that all of the instances'
// IP addresses are expected to be within.
Network string `json:"network"`
}
// InstanceData contains the relevant information for a machine instance backing
// one of the nodes in the testnet.
type InstanceData struct {
IPAddress net.IP `json:"ip_address"`
}
func NewDockerInfrastructureData(m Manifest) (InfrastructureData, error) {
netAddress := dockerIPv4CIDR
if m.IPv6 {
netAddress = dockerIPv6CIDR
}
_, ipNet, err := net.ParseCIDR(netAddress)
if err != nil {
return InfrastructureData{}, fmt.Errorf("invalid IP network address %q: %w", netAddress, err)
}
ipGen := newIPGenerator(ipNet)
ifd := InfrastructureData{
Provider: "docker",
Instances: make(map[string]InstanceData),
Network: netAddress,
}
for name := range m.Nodes {
ifd.Instances[name] = InstanceData{
IPAddress: ipGen.Next(),
}
}
return ifd, nil
}
func InfrastructureDataFromFile(p string) (InfrastructureData, error) {
ifd := InfrastructureData{}
b, err := os.ReadFile(p)
if err != nil {
return InfrastructureData{}, err
}
err = json.Unmarshal(b, &ifd)
if err != nil {
return InfrastructureData{}, err
}
if ifd.Network == "" {
ifd.Network = globalIPv4CIDR
}
return ifd, nil
}

View File

@@ -77,6 +77,15 @@ type ManifestNode struct {
// is generated), and seed nodes run in seed mode with the PEX reactor enabled.
Mode string `toml:"mode"`
// SyncApp specifies whether this node should use a synchronized application
// with an unsynchronized local client. By default this is `false`, meaning
// that the node will run an unsynchronized application with a synchronized
// local client.
//
// Only applies to validators and full nodes where their ABCI protocol is
// "builtin".
SyncApp bool `toml:"sync_app"`
// Seeds is the list of node names to use as P2P seed nodes. Defaults to none.
Seeds []string `toml:"seeds"`
@@ -124,8 +133,8 @@ type ManifestNode struct {
SnapshotInterval uint64 `toml:"snapshot_interval"`
// RetainBlocks specifies the number of recent blocks to retain. Defaults to
// 0, which retains all blocks. Must be greater that PersistInterval and
// SnapshotInterval.
// 0, which retains all blocks. Must be greater that PersistInterval,
// SnapshotInterval and EvidenceAgeHeight.
RetainBlocks uint64 `toml:"retain_blocks"`
// Perturb lists perturbations to apply to the node after it has been

View File

@@ -21,8 +21,6 @@ import (
const (
randomSeed int64 = 2308084734268
proxyPortFirst uint32 = 5701
networkIPv4 = "10.186.73.0/24"
networkIPv6 = "fd80:b10c::/48"
)
type (
@@ -76,6 +74,7 @@ type Node struct {
Name string
Testnet *Testnet
Mode Mode
SyncApp bool // Should we use a synchronized app with an unsynchronized local client?
PrivvalKey crypto.PrivKey
NodeKey crypto.PrivKey
IP net.IP
@@ -100,32 +99,30 @@ type Node struct {
// The testnet generation must be deterministic, since it is generated
// separately by the runner and the test cases. For this reason, testnets use a
// random seed to generate e.g. keys.
func LoadTestnet(file string) (*Testnet, error) {
func LoadTestnet(file string, ifd InfrastructureData) (*Testnet, error) {
manifest, err := LoadManifest(file)
if err != nil {
return nil, err
}
return NewTestnetFromManifest(manifest, file, ifd)
}
// NewTestnetFromManifest creates and validates a testnet from a manifest
func NewTestnetFromManifest(manifest Manifest, file string, ifd InfrastructureData) (*Testnet, error) {
dir := strings.TrimSuffix(file, filepath.Ext(file))
// Set up resource generators. These must be deterministic.
netAddress := networkIPv4
if manifest.IPv6 {
netAddress = networkIPv6
}
_, ipNet, err := net.ParseCIDR(netAddress)
if err != nil {
return nil, fmt.Errorf("invalid IP network address %q: %w", netAddress, err)
}
ipGen := newIPGenerator(ipNet)
keyGen := newKeyGenerator(randomSeed)
proxyPortGen := newPortGenerator(proxyPortFirst)
_, ipNet, err := net.ParseCIDR(ifd.Network)
if err != nil {
return nil, fmt.Errorf("invalid IP network address %q: %w", ifd.Network, err)
}
testnet := &Testnet{
Name: filepath.Base(dir),
File: file,
Dir: dir,
IP: ipGen.Network(),
IP: ipNet,
InitialHeight: 1,
InitialState: manifest.InitialState,
Validators: map[*Node]int64{},
@@ -156,14 +153,19 @@ func LoadTestnet(file string) (*Testnet, error) {
for _, name := range nodeNames {
nodeManifest := manifest.Nodes[name]
ind, ok := ifd.Instances[name]
if !ok {
return nil, fmt.Errorf("information for node '%s' missing from infrastucture data", name)
}
node := &Node{
Name: name,
Testnet: testnet,
PrivvalKey: keyGen.Generate(manifest.KeyType),
NodeKey: keyGen.Generate("ed25519"),
IP: ipGen.Next(),
IP: ind.IPAddress,
ProxyPort: proxyPortGen.Next(),
Mode: ModeValidator,
SyncApp: nodeManifest.SyncApp,
Database: "goleveldb",
ABCIProtocol: Protocol(testnet.ABCIProtocol),
PrivvalProtocol: ProtocolFile,

View File

@@ -19,7 +19,7 @@ FAILED=()
for MANIFEST in "$@"; do
START=$SECONDS
echo "==> Running testnet $MANIFEST..."
echo "==> Running testnet: $MANIFEST"
if ! ./build/runner -f "$MANIFEST"; then
echo "==> Testnet $MANIFEST failed, dumping manifest..."

View File

@@ -2,8 +2,10 @@ package main
import (
"context"
"encoding/json"
"fmt"
"math"
"path/filepath"
"time"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
@@ -19,26 +21,28 @@ import (
//
// Metrics are based of the `benchmarkLength`, the amount of consecutive blocks
// sampled from in the testnet
func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error {
block, _, err := waitForHeight(testnet, 0)
func Benchmark(ctx context.Context, testnet *e2e.Testnet, benchmarkLength int64) error {
block, _, err := waitForHeight(ctx, testnet, 0)
if err != nil {
return err
}
logger.Info("Beginning benchmark period...", "height", block.Height)
startAt := time.Now()
// wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block
// which should be sufficient.
waitingTime := time.Duration(benchmarkLength*5) * time.Second
endHeight, err := waitForAllNodes(testnet, block.Height+benchmarkLength, waitingTime)
endHeight, err := waitForAllNodes(ctx, testnet, block.Height+benchmarkLength, waitingTime)
if err != nil {
return err
}
dur := time.Since(startAt)
logger.Info("Ending benchmark period", "height", endHeight)
// fetch a sample of blocks
blocks, err := fetchBlockChainSample(testnet, benchmarkLength)
blocks, err := fetchBlockChainSample(ctx, testnet, benchmarkLength)
if err != nil {
return err
}
@@ -46,18 +50,29 @@ func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error {
// slice into time intervals and collate data
timeIntervals := splitIntoBlockIntervals(blocks)
testnetStats := extractTestnetStats(timeIntervals)
testnetStats.populateTxns(blocks)
testnetStats.totalTime = dur
testnetStats.startHeight = blocks[0].Header.Height
testnetStats.endHeight = blocks[len(blocks)-1].Header.Height
// print and return
logger.Info(testnetStats.String())
logger.Info(testnetStats.OutputJSON(testnet))
return nil
}
func (t *testnetStats) populateTxns(blocks []*types.BlockMeta) {
t.numtxns = 0
for _, b := range blocks {
t.numtxns += int64(b.NumTxs)
}
}
type testnetStats struct {
startHeight int64
endHeight int64
numtxns int64
totalTime time.Duration
// average time to produce a block
mean time.Duration
// standard deviation of block production
@@ -68,6 +83,28 @@ type testnetStats struct {
min time.Duration
}
func (t *testnetStats) OutputJSON(net *e2e.Testnet) string {
jsn, err := json.Marshal(map[string]interface{}{
"case": filepath.Base(net.File),
"start_height": t.startHeight,
"end_height": t.endHeight,
"blocks": t.endHeight - t.startHeight,
"stddev": t.std,
"mean": t.mean.Seconds(),
"max": t.max.Seconds(),
"min": t.min.Seconds(),
"size": len(net.Nodes),
"txns": t.numtxns,
"dur": t.totalTime.Seconds(),
})
if err != nil {
return ""
}
return string(jsn)
}
func (t *testnetStats) String() string {
return fmt.Sprintf(`Benchmarked from height %v to %v
Mean Block Interval: %v
@@ -86,7 +123,7 @@ func (t *testnetStats) String() string {
// fetchBlockChainSample waits for `benchmarkLength` amount of blocks to pass, fetching
// all of the headers for these blocks from an archive node and returning it.
func fetchBlockChainSample(testnet *e2e.Testnet, benchmarkLength int64) ([]*types.BlockMeta, error) {
func fetchBlockChainSample(ctx context.Context, testnet *e2e.Testnet, benchmarkLength int64) ([]*types.BlockMeta, error) {
var blocks []*types.BlockMeta
// Find the first archive node
@@ -97,7 +134,6 @@ func fetchBlockChainSample(testnet *e2e.Testnet, benchmarkLength int64) ([]*type
}
// find the latest height
ctx := context.Background()
s, err := c.Status(ctx)
if err != nil {
return nil, err

View File

@@ -82,7 +82,7 @@ func InjectEvidence(ctx context.Context, r *rand.Rand, testnet *e2e.Testnet, amo
// wait for the node to reach the height above the forged height so that
// it is able to validate the evidence
_, err = waitForNode(targetNode, waitHeight, time.Minute)
_, err = waitForNode(ctx, targetNode, waitHeight, time.Minute)
if err != nil {
return err
}
@@ -110,7 +110,7 @@ func InjectEvidence(ctx context.Context, r *rand.Rand, testnet *e2e.Testnet, amo
// wait for the node to reach the height above the forged height so that
// it is able to validate the evidence
_, err = waitForNode(targetNode, blockRes.Block.Height+2, 30*time.Second)
_, err = waitForNode(ctx, targetNode, blockRes.Block.Height+2, 30*time.Second)
if err != nil {
return err
}

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"errors"
"fmt"
"math/rand"
"os"
@@ -11,6 +12,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
"github.com/tendermint/tendermint/test/e2e/pkg/infra/docker"
)
const randomSeed = 2308084734268
@@ -26,6 +29,7 @@ type CLI struct {
root *cobra.Command
testnet *e2e.Testnet
preserve bool
infp infra.Provider
}
// NewCLI sets up the CLI.
@@ -41,19 +45,57 @@ func NewCLI() *CLI {
if err != nil {
return err
}
testnet, err := e2e.LoadTestnet(file)
m, err := e2e.LoadManifest(file)
if err != nil {
return err
}
inft, err := cmd.Flags().GetString("infrastructure-type")
if err != nil {
return err
}
var ifd e2e.InfrastructureData
switch inft {
case "docker":
var err error
ifd, err = e2e.NewDockerInfrastructureData(m)
if err != nil {
return err
}
case "digital-ocean":
p, err := cmd.Flags().GetString("infrastructure-data")
if err != nil {
return err
}
if p == "" {
return errors.New("'--infrastructure-data' must be set when using the 'digital-ocean' infrastructure-type")
}
ifd, err = e2e.InfrastructureDataFromFile(p)
if err != nil {
return fmt.Errorf("parsing infrastructure data: %s", err)
}
default:
return fmt.Errorf("unknown infrastructure type '%s'", inft)
}
testnet, err := e2e.LoadTestnet(file, ifd)
if err != nil {
return fmt.Errorf("loading testnet: %s", err)
}
cli.testnet = testnet
cli.infp = &infra.NoopProvider{}
if inft == "docker" {
cli.infp = &docker.Provider{Testnet: testnet}
}
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
if err := Cleanup(cli.testnet); err != nil {
return err
}
if err := Setup(cli.testnet); err != nil {
if err := Setup(cli.testnet, cli.infp); err != nil {
return err
}
@@ -70,19 +112,19 @@ func NewCLI() *CLI {
chLoadResult <- err
}()
if err := Start(cli.testnet); err != nil {
if err := Start(cmd.Context(), cli.testnet); err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(cmd.Context(), cli.testnet, 5); err != nil { // allow some txs to go through
return err
}
if cli.testnet.HasPerturbations() {
if err := Perturb(cli.testnet); err != nil {
if err := Perturb(cmd.Context(), cli.testnet); err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(cmd.Context(), cli.testnet, 5); err != nil { // allow some txs to go through
return err
}
}
@@ -91,7 +133,7 @@ func NewCLI() *CLI {
if err := InjectEvidence(ctx, r, cli.testnet, cli.testnet.Evidence); err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // ensure chain progress
if err := Wait(cmd.Context(), cli.testnet, 5); err != nil { // ensure chain progress
return err
}
}
@@ -100,7 +142,7 @@ func NewCLI() *CLI {
if err := <-chLoadResult; err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // wait for network to settle before tests
if err := Wait(cmd.Context(), cli.testnet, 5); err != nil { // wait for network to settle before tests
return err
}
if err := Test(cli.testnet); err != nil {
@@ -118,6 +160,10 @@ func NewCLI() *CLI {
cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest")
_ = cli.root.MarkPersistentFlagRequired("file")
cli.root.PersistentFlags().StringP("infrastructure-type", "", "docker", "Backing infrastructure used to run the testnet. Either 'digital-ocean' or 'docker'")
cli.root.PersistentFlags().StringP("infrastructure-data", "", "", "path to the json file containing the infrastructure data. Only used if the 'infrastructure-type' is set to a value other than 'docker'")
cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false,
"Preserves the running of the test net after tests are completed")
@@ -125,7 +171,7 @@ func NewCLI() *CLI {
Use: "setup",
Short: "Generates the testnet directory and configuration",
RunE: func(cmd *cobra.Command, args []string) error {
return Setup(cli.testnet)
return Setup(cli.testnet, cli.infp)
},
})
@@ -135,12 +181,12 @@ func NewCLI() *CLI {
RunE: func(cmd *cobra.Command, args []string) error {
_, err := os.Stat(cli.testnet.Dir)
if os.IsNotExist(err) {
err = Setup(cli.testnet)
err = Setup(cli.testnet, cli.infp)
}
if err != nil {
return err
}
return Start(cli.testnet)
return Start(cmd.Context(), cli.testnet)
},
})
@@ -148,7 +194,7 @@ func NewCLI() *CLI {
Use: "perturb",
Short: "Perturbs the Docker testnet, e.g. by restarting or disconnecting nodes",
RunE: func(cmd *cobra.Command, args []string) error {
return Perturb(cli.testnet)
return Perturb(cmd.Context(), cli.testnet)
},
})
@@ -156,7 +202,7 @@ func NewCLI() *CLI {
Use: "wait",
Short: "Waits for a few blocks to be produced and all nodes to catch up",
RunE: func(cmd *cobra.Command, args []string) error {
return Wait(cli.testnet, 5)
return Wait(cmd.Context(), cli.testnet, 5)
},
})
@@ -258,12 +304,12 @@ Does not run any perbutations.
if err := Cleanup(cli.testnet); err != nil {
return err
}
if err := Setup(cli.testnet); err != nil {
if err := Setup(cli.testnet, cli.infp); err != nil {
return err
}
chLoadResult := make(chan error)
ctx, loadCancel := context.WithCancel(context.Background())
ctx, loadCancel := context.WithCancel(cmd.Context())
defer loadCancel()
go func() {
err := Load(ctx, cli.testnet, 1)
@@ -273,16 +319,16 @@ Does not run any perbutations.
chLoadResult <- err
}()
if err := Start(cli.testnet); err != nil {
if err := Start(cmd.Context(), cli.testnet); err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(cmd.Context(), cli.testnet, 5); err != nil { // allow some txs to go through
return err
}
// we benchmark performance over the next 100 blocks
if err := Benchmark(cli.testnet, 100); err != nil {
if err := Benchmark(cmd.Context(), cli.testnet, 100); err != nil {
return err
}

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"time"
@@ -10,10 +11,10 @@ import (
)
// Perturbs a running testnet.
func Perturb(testnet *e2e.Testnet) error {
func Perturb(ctx context.Context, testnet *e2e.Testnet) error {
for _, node := range testnet.Nodes {
for _, perturbation := range node.Perturbations {
_, err := PerturbNode(node, perturbation)
_, err := PerturbNode(ctx, node, perturbation)
if err != nil {
return err
}
@@ -25,7 +26,7 @@ func Perturb(testnet *e2e.Testnet) error {
// PerturbNode perturbs a node with a given perturbation, returning its status
// after recovering.
func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) {
func PerturbNode(ctx context.Context, node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) {
testnet := node.Testnet
switch perturbation {
case e2e.PerturbationDisconnect:
@@ -67,7 +68,7 @@ func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.Resul
return nil, fmt.Errorf("unexpected perturbation %q", perturbation)
}
status, err := waitForNode(node, 0, 20*time.Second)
status, err := waitForNode(ctx, node, 0, 20*time.Second)
if err != nil {
return nil, err
}

View File

@@ -15,7 +15,7 @@ import (
// waitForHeight waits for the network to reach a certain height (or above),
// returning the highest height seen. Errors if the network is not making
// progress at all.
func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) {
func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) {
var (
err error
maxResult *rpctypes.ResultBlock
@@ -23,73 +23,92 @@ func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.Blo
lastIncrease = time.Now()
)
timer := time.NewTimer(0)
defer timer.Stop()
for {
for _, node := range testnet.Nodes {
if node.Mode == e2e.ModeSeed {
continue
}
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-timer.C:
for _, node := range testnet.Nodes {
if node.Stateless() {
continue
}
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
clients[node.Name] = client
}
subctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
result, err := client.Block(subctx, nil)
if err == context.DeadlineExceeded || err == context.Canceled {
return nil, nil, ctx.Err()
}
if err != nil {
continue
}
clients[node.Name] = client
if result.Block != nil && (maxResult == nil || result.Block.Height >= maxResult.Block.Height) {
maxResult = result
lastIncrease = time.Now()
}
if maxResult != nil && maxResult.Block.Height >= height {
return maxResult.Block, &maxResult.BlockID, nil
}
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := client.Block(ctx, nil)
if err != nil {
continue
if len(clients) == 0 {
return nil, nil, errors.New("unable to connect to any network nodes")
}
if result.Block != nil && (maxResult == nil || result.Block.Height >= maxResult.Block.Height) {
maxResult = result
lastIncrease = time.Now()
}
if maxResult != nil && maxResult.Block.Height >= height {
return maxResult.Block, &maxResult.BlockID, nil
if time.Since(lastIncrease) >= 20*time.Second {
if maxResult == nil {
return nil, nil, errors.New("chain stalled at unknown height")
}
return nil, nil, fmt.Errorf("chain stalled at height %v", maxResult.Block.Height)
}
timer.Reset(1 * time.Second)
}
if len(clients) == 0 {
return nil, nil, errors.New("unable to connect to any network nodes")
}
if time.Since(lastIncrease) >= 20*time.Second {
if maxResult == nil {
return nil, nil, errors.New("chain stalled at unknown height")
}
return nil, nil, fmt.Errorf("chain stalled at height %v", maxResult.Block.Height)
}
time.Sleep(1 * time.Second)
}
}
// waitForNode waits for a node to become available and catch up to the given block height.
func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes.ResultStatus, error) {
func waitForNode(ctx context.Context, node *e2e.Node, height int64, timeout time.Duration) (*rpctypes.ResultStatus, error) {
client, err := node.Client()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
status, err := client.Status(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height)
case errors.Is(err, context.Canceled):
return nil, err
case err == nil && status.SyncInfo.LatestBlockHeight >= height:
return status, nil
}
time.Sleep(300 * time.Millisecond)
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
subctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
status, err := client.Status(subctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height)
case errors.Is(err, context.Canceled):
return nil, err
case err == nil && status.SyncInfo.LatestBlockHeight >= height:
return status, nil
}
timer.Reset(300 * time.Millisecond)
}
}
}
// waitForAllNodes waits for all nodes to become available and catch up to the given block height.
func waitForAllNodes(testnet *e2e.Testnet, height int64, timeout time.Duration) (int64, error) {
func waitForAllNodes(ctx context.Context, testnet *e2e.Testnet, height int64, timeout time.Duration) (int64, error) {
var lastHeight int64
for _, node := range testnet.Nodes {
@@ -97,7 +116,7 @@ func waitForAllNodes(testnet *e2e.Testnet, height int64, timeout time.Duration)
continue
}
status, err := waitForNode(node, height, timeout)
status, err := waitForNode(ctx, node, height, timeout)
if err != nil {
return 0, err
}

View File

@@ -10,9 +10,7 @@ import (
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"text/template"
"time"
"github.com/BurntSushi/toml"
@@ -23,6 +21,7 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
"github.com/tendermint/tendermint/types"
)
@@ -39,7 +38,7 @@ const (
)
// Setup sets up the testnet configuration.
func Setup(testnet *e2e.Testnet) error {
func Setup(testnet *e2e.Testnet, infp infra.Provider) error {
logger.Info("setup", "msg", log.NewLazySprintf("Generating testnet files in %q", testnet.Dir))
err := os.MkdirAll(testnet.Dir, os.ModePerm)
@@ -47,11 +46,7 @@ func Setup(testnet *e2e.Testnet) error {
return err
}
compose, err := MakeDockerCompose(testnet)
if err != nil {
return err
}
err = os.WriteFile(filepath.Join(testnet.Dir, "docker-compose.yml"), compose, 0o644) //nolint:gosec
err = infp.Setup()
if err != nil {
return err
}
@@ -126,70 +121,6 @@ func Setup(testnet *e2e.Testnet) error {
return nil
}
// MakeDockerCompose generates a Docker Compose config for a testnet.
func MakeDockerCompose(testnet *e2e.Testnet) ([]byte, error) {
// Must use version 2 Docker Compose format, to support IPv6.
tmpl, err := template.New("docker-compose").Funcs(template.FuncMap{
"misbehaviorsToString": func(misbehaviors map[int64]string) string {
str := ""
for height, misbehavior := range misbehaviors {
// after the first behavior set, a comma must be prepended
if str != "" {
str += ","
}
heightString := strconv.Itoa(int(height))
str += misbehavior + "," + heightString
}
return str
},
}).Parse(`version: '2.4'
networks:
{{ .Name }}:
labels:
e2e: true
driver: bridge
{{- if .IPv6 }}
enable_ipv6: true
{{- end }}
ipam:
driver: default
config:
- subnet: {{ .IP }}
services:
{{- range .Nodes }}
{{ .Name }}:
labels:
e2e: true
container_name: {{ .Name }}
image: tendermint/e2e-node
{{- if eq .ABCIProtocol "builtin" }}
entrypoint: /usr/bin/entrypoint-builtin
{{- end }}
init: true
ports:
- 26656
- {{ if .ProxyPort }}{{ .ProxyPort }}:{{ end }}26657
- 6060
volumes:
- ./{{ .Name }}:/tendermint
networks:
{{ $.Name }}:
ipv{{ if $.IPv6 }}6{{ else }}4{{ end}}_address: {{ .IP }}
{{end}}`)
if err != nil {
return nil, err
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, testnet)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// MakeGenesis generates a genesis document.
func MakeGenesis(testnet *e2e.Testnet) (types.GenesisDoc, error) {
genesis := types.GenesisDoc{
@@ -327,6 +258,7 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"sync_app": node.SyncApp,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"sort"
"time"
@@ -9,7 +10,7 @@ import (
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
func Start(testnet *e2e.Testnet) error {
func Start(ctx context.Context, testnet *e2e.Testnet) error {
if len(testnet.Nodes) == 0 {
return fmt.Errorf("no nodes in testnet")
}
@@ -46,7 +47,7 @@ func Start(testnet *e2e.Testnet) error {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err
}
if _, err := waitForNode(node, 0, 15*time.Second); err != nil {
if _, err := waitForNode(ctx, node, 0, 15*time.Second); err != nil {
return err
}
logger.Info("start", "msg", log.NewLazySprintf("Node %v up on http://127.0.0.1:%v", node.Name, node.ProxyPort))
@@ -60,7 +61,7 @@ func Start(testnet *e2e.Testnet) error {
"nodes", len(testnet.Nodes)-len(nodeQueue),
"pending", len(nodeQueue))
block, blockID, err := waitForHeight(testnet, networkHeight)
block, blockID, err := waitForHeight(ctx, testnet, networkHeight)
if err != nil {
return err
}
@@ -90,7 +91,7 @@ func Start(testnet *e2e.Testnet) error {
"node", node.Name,
"height", networkHeight)
if _, _, err := waitForHeight(testnet, networkHeight); err != nil {
if _, _, err := waitForHeight(ctx, testnet, networkHeight); err != nil {
return err
}
}
@@ -100,7 +101,7 @@ func Start(testnet *e2e.Testnet) error {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err
}
status, err := waitForNode(node, node.StartAt, 3*time.Minute)
status, err := waitForNode(ctx, node, node.StartAt, 3*time.Minute)
if err != nil {
return err
}

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"time"
"github.com/tendermint/tendermint/libs/log"
@@ -9,18 +10,18 @@ import (
// Wait waits for a number of blocks to be produced, and for all nodes to catch
// up with it.
func Wait(testnet *e2e.Testnet, blocks int64) error {
block, _, err := waitForHeight(testnet, 0)
func Wait(ctx context.Context, testnet *e2e.Testnet, blocks int64) error {
block, _, err := waitForHeight(ctx, testnet, 0)
if err != nil {
return err
}
return WaitUntil(testnet, block.Height+blocks)
return WaitUntil(ctx, testnet, block.Height+blocks)
}
// WaitUntil waits until a given height has been reached.
func WaitUntil(testnet *e2e.Testnet, height int64) error {
func WaitUntil(ctx context.Context, testnet *e2e.Testnet, height int64) error {
logger.Info("wait until", "msg", log.NewLazySprintf("Waiting for all nodes to reach height %v...", height))
_, err := waitForAllNodes(testnet, height, waitingTime(len(testnet.Nodes)))
_, err := waitForAllNodes(ctx, testnet, height, waitingTime(len(testnet.Nodes)))
if err != nil {
return err
}

View File

@@ -66,23 +66,27 @@ func testNode(t *testing.T, testFunc func(*testing.T, e2e.Node)) {
func loadTestnet(t *testing.T) e2e.Testnet {
t.Helper()
manifest := os.Getenv("E2E_MANIFEST")
if manifest == "" {
manifestFile := os.Getenv("E2E_MANIFEST")
if manifestFile == "" {
t.Skip("E2E_MANIFEST not set, not an end-to-end test run")
}
if !filepath.IsAbs(manifest) {
manifest = filepath.Join("..", manifest)
if !filepath.IsAbs(manifestFile) {
manifestFile = filepath.Join("..", manifestFile)
}
testnetCacheMtx.Lock()
defer testnetCacheMtx.Unlock()
if testnet, ok := testnetCache[manifest]; ok {
if testnet, ok := testnetCache[manifestFile]; ok {
return testnet
}
testnet, err := e2e.LoadTestnet(manifest)
m, err := e2e.LoadManifest(manifestFile)
require.NoError(t, err)
testnetCache[manifest] = *testnet
ifd, err := e2e.NewDockerInfrastructureData(m)
require.NoError(t, err)
testnet, err := e2e.LoadTestnet(manifestFile, ifd)
require.NoError(t, err)
testnetCache[manifestFile] = *testnet
return *testnet
}

View File

@@ -21,7 +21,7 @@ func FuzzRPCJSONRPCServer(f *testing.F) {
I int `json:"i"`
}
var rpcFuncMap = map[string]*rpcserver.RPCFunc{
"c": rpcserver.NewRPCFunc(func(ctx *rpctypes.Context, args *args) (string, error) {
"c": rpcserver.NewRPCFunc(func(ctx *rpctypes.Context, args *args, options ...rpcserver.Option) (string, error) {
return "foo", nil
}, "args"),
}

View File

@@ -3,7 +3,7 @@ package payload
import (
"bytes"
"crypto/rand"
"errors"
"encoding/hex"
"fmt"
"math"
@@ -12,6 +12,7 @@ import (
)
const keyPrefix = "a="
const maxPayloadSize = 4 * 1024 * 1024
// NewBytes generates a new payload and returns the encoded representation of
// the payload as a slice of bytes. NewBytes uses the fields on the Options
@@ -25,10 +26,16 @@ func NewBytes(p *Payload) ([]byte, error) {
if err != nil {
return nil, err
}
if p.Size < uint64(us) {
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", p.Size, us)
if p.Size > maxPayloadSize {
return nil, fmt.Errorf("configured size %d is too large (>%d)", p.Size, maxPayloadSize)
}
p.Padding = make([]byte, p.Size-uint64(us))
pSize := int(p.Size) // #nosec -- The "if" above makes this cast safe
if pSize < us {
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", pSize, us)
}
// We halve the padding size because we transform the TX to hex
p.Padding = make([]byte, (pSize-us)/2)
_, err = rand.Read(p.Padding)
if err != nil {
return nil, err
@@ -37,22 +44,28 @@ func NewBytes(p *Payload) ([]byte, error) {
if err != nil {
return nil, err
}
h := []byte(hex.EncodeToString(b))
// prepend a single key so that the kv store only ever stores a single
// transaction instead of storing all tx and ballooning in size.
return append([]byte(keyPrefix), b...), nil
return append([]byte(keyPrefix), h...), nil
}
// FromBytes extracts a paylod from the byte representation of the payload.
// FromBytes leaves the padding untouched, returning it to the caller to handle
// or discard per their preference.
func FromBytes(b []byte) (*Payload, error) {
p := &Payload{}
tr := bytes.TrimPrefix(b, []byte(keyPrefix))
if bytes.Equal(b, tr) {
return nil, errors.New("payload bytes missing key prefix")
trH := bytes.TrimPrefix(b, []byte(keyPrefix))
if bytes.Equal(b, trH) {
return nil, fmt.Errorf("payload bytes missing key prefix '%s'", keyPrefix)
}
err := proto.Unmarshal(tr, p)
trB, err := hex.DecodeString(string(trH))
if err != nil {
return nil, err
}
p := &Payload{}
err = proto.Unmarshal(trB, p)
if err != nil {
return nil, err
}
@@ -83,5 +96,6 @@ func CalculateUnpaddedSize(p *Payload) (int, error) {
if err != nil {
return 0, err
}
return len(b) + len(keyPrefix), nil
h := []byte(hex.EncodeToString(b))
return len(h) + len(keyPrefix), nil
}