Merge branch 'master' into conr2d/doc-go-built-in

This commit is contained in:
Callum Waters
2021-08-27 15:19:14 +02:00
14 changed files with 167 additions and 122 deletions

View File

@@ -121,7 +121,7 @@ jobs:
- run: |
cat ./*profile.out | grep -v "mode: atomic" >> coverage.txt
if: env.GIT_DIFF
- uses: codecov/codecov-action@v2.0.2
- uses: codecov/codecov-action@v2.0.3
with:
file: ./coverage.txt
if: env.GIT_DIFF

View File

@@ -26,7 +26,7 @@ linters:
# - maligned
- nakedret
- prealloc
- scopelint
- exportloopref
- staticcheck
- structcheck
- stylecheck

View File

@@ -694,13 +694,14 @@ type P2PConfig struct { //nolint: maligned
// Force dial to fail
TestDialFail bool `mapstructure:"test-dial-fail"`
// DisableLegacy is used mostly for testing to enable or disable the legacy
// P2P stack.
DisableLegacy bool `mapstructure:"disable-legacy"`
// UseLegacy enables the "legacy" P2P implementation and
// disables the newer default implementation. This flag will
// be removed in a future release.
UseLegacy bool `mapstructure:"use-legacy"`
// Makes it possible to configure which queue backend the p2p
// layer uses. Options are: "fifo", "priority" and "wdrr",
// with the default being "fifo".
// with the default being "priority".
QueueType string `mapstructure:"queue-type"`
}
@@ -732,6 +733,7 @@ func DefaultP2PConfig() *P2PConfig {
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "priority",
UseLegacy: false,
}
}

View File

@@ -271,7 +271,7 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}"
[p2p]
# Enable the new p2p layer.
disable-legacy = {{ .P2P.DisableLegacy }}
use-legacy = {{ .P2P.UseLegacy }}
# Select the p2p internal queue
queue-type = "{{ .P2P.QueueType }}"

View File

@@ -916,8 +916,8 @@ func (cs *State) handleMsg(mi msgInfo) {
"height", cs.Height,
"round", cs.Round,
"peer", peerID,
"msg_type", fmt.Sprintf("%T", msg),
"err", err,
"msg", msg,
)
}
}

View File

@@ -319,12 +319,12 @@ func makeNode(config *cfg.Config,
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
if config.P2P.UseLegacy {
channels = getChannelsFromShim(stateSyncReactorShim)
peerUpdates = stateSyncReactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
stateSyncReactor = statesync.NewReactor(
@@ -373,13 +373,7 @@ func makeNode(config *cfg.Config,
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
addrBook = nil
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
if config.P2P.UseLegacy {
// setup Transport and Switch
sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
@@ -402,6 +396,12 @@ func makeNode(config *cfg.Config,
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
} else {
addrBook = nil
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
}
if config.RPC.PprofListenAddress != "" {
@@ -461,6 +461,17 @@ func makeNode(config *cfg.Config,
},
}
// this is a terrible, because typed nil interfaces are not ==
// nil, so this is just cleanup to avoid having a non-nil
// value in the RPC environment that has the semantic
// properties of nil.
if sw == nil {
node.rpcEnv.P2PPeers = nil
} else if peerManager == nil {
node.rpcEnv.PeerManager = nil
}
// end hack
node.rpcEnv.P2PTransport = node
node.BaseService = *service.NewBaseService(logger, "Node", node)
@@ -519,12 +530,8 @@ func makeSeedNode(config *cfg.Config,
// p2p stack is removed.
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
if config.P2P.UseLegacy {
sw = createSwitch(
config, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
@@ -546,6 +553,11 @@ func makeSeedNode(config *cfg.Config,
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
} else {
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
}
if config.RPC.PprofListenAddress != "" {
@@ -608,18 +620,16 @@ func (n *nodeImpl) OnStart() error {
}
n.isListening = true
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy)
n.Logger.Info("p2p service", "legacy_enabled", n.config.P2P.UseLegacy)
if n.config.P2P.DisableLegacy {
if err = n.router.Start(); err != nil {
return err
}
} else {
if n.config.P2P.UseLegacy {
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
if err = n.sw.Start(); err != nil {
return err
}
} else if err = n.router.Start(); err != nil {
return err
}
if n.config.Mode != cfg.ModeSeed {
@@ -650,16 +660,14 @@ func (n *nodeImpl) OnStart() error {
}
}
if n.config.P2P.DisableLegacy {
if err := n.pexReactor.Start(); err != nil {
return err
}
} else {
if n.config.P2P.UseLegacy {
// Always connect to persistent peers
err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
}
} else if err := n.pexReactor.Start(); err != nil {
return err
}
// Run state sync
@@ -738,14 +746,14 @@ func (n *nodeImpl) OnStop() {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
if n.config.P2P.DisableLegacy {
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
} else {
if n.config.P2P.UseLegacy {
if err := n.sw.Stop(); err != nil {
n.Logger.Error("failed to stop switch", "err", err)
}
} else {
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
}
if err := n.transport.Close(); err != nil {

View File

@@ -43,6 +43,7 @@ import (
func TestNodeStartStop(t *testing.T) {
config := cfg.ResetTestRoot("node_node_test")
defer os.RemoveAll(config.RootDir)
// create & start node
@@ -53,8 +54,6 @@ func TestNodeStartStop(t *testing.T) {
n, ok := ns.(*nodeImpl)
require.True(t, ok)
t.Logf("Started node %v", n.sw.NodeInfo())
// wait for the node to produce a block
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
require.NoError(t, err)

View File

@@ -166,12 +166,12 @@ func createMempoolReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
} else {
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
}
switch config.Mempool.Version {
@@ -260,12 +260,12 @@ func createEvidenceReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
evidenceReactor := evidence.NewReactor(
@@ -302,12 +302,12 @@ func createBlockchainReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
reactor, err := bcv0.NewReactor(
@@ -366,12 +366,12 @@ func createConsensusReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
reactor := cs.NewReactor(

View File

@@ -107,11 +107,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
switch opt["p2p"].(P2PMode) {
case NewP2PMode:
manifest.DisableLegacyP2P = true
manifest.UseLegacyP2P = true
case LegacyP2PMode:
manifest.DisableLegacyP2P = false
manifest.UseLegacyP2P = false
case HybridP2PMode:
manifest.DisableLegacyP2P = false
manifest.UseLegacyP2P = true
p2pNodeFactor = 2
default:
return manifest, fmt.Errorf("unknown p2p mode %s", opt["p2p"])
@@ -138,9 +138,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
if p2pNodeFactor == 0 {
node.DisableLegacyP2P = manifest.DisableLegacyP2P
node.UseLegacyP2P = manifest.UseLegacyP2P
} else if p2pNodeFactor%i == 0 {
node.DisableLegacyP2P = !manifest.DisableLegacyP2P
node.UseLegacyP2P = !manifest.UseLegacyP2P
}
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node
@@ -162,9 +162,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
if p2pNodeFactor == 0 {
node.DisableLegacyP2P = manifest.DisableLegacyP2P
node.UseLegacyP2P = manifest.UseLegacyP2P
} else if p2pNodeFactor%i == 0 {
node.DisableLegacyP2P = !manifest.DisableLegacyP2P
node.UseLegacyP2P = !manifest.UseLegacyP2P
}
manifest.Nodes[name] = node
@@ -198,9 +198,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
if p2pNodeFactor == 0 {
node.DisableLegacyP2P = manifest.DisableLegacyP2P
node.UseLegacyP2P = manifest.UseLegacyP2P
} else if p2pNodeFactor%i == 0 {
node.DisableLegacyP2P = !manifest.DisableLegacyP2P
node.UseLegacyP2P = !manifest.UseLegacyP2P
}
manifest.Nodes[fmt.Sprintf("full%02d", i)] = node
}

View File

@@ -59,8 +59,8 @@ type Manifest struct {
// by individual nodes.
LogLevel string `toml:"log_level"`
// DisableLegacyP2P enables use of the new p2p layer for all nodes in a test.
DisableLegacyP2P bool `toml:"disable_legacy_p2p"`
// UseLegacyP2P uses the legacy p2p layer for all nodes in a test.
UseLegacyP2P bool `toml:"use_legacy_p2p"`
// QueueType describes the type of queue that the system uses internally
QueueType string `toml:"queue_type"`
@@ -147,8 +147,8 @@ type ManifestNode struct {
// level.
LogLevel string `toml:"log_level"`
// UseNewP2P enables use of the new p2p layer for this node.
DisableLegacyP2P bool `toml:"disable_legacy_p2p"`
// UseLegacyP2P enables use of the legacy p2p layer for this node.
UseLegacyP2P bool `toml:"use_legacy_p2p"`
}
// Save saves the testnet manifest to a file.

View File

@@ -92,7 +92,7 @@ type Node struct {
PersistentPeers []*Node
Perturbations []Perturbation
LogLevel string
DisableLegacyP2P bool
UseLegacyP2P bool
QueueType string
}
@@ -177,7 +177,7 @@ func LoadTestnet(file string) (*Testnet, error) {
Perturbations: []Perturbation{},
LogLevel: manifest.LogLevel,
QueueType: manifest.QueueType,
DisableLegacyP2P: manifest.DisableLegacyP2P || nodeManifest.DisableLegacyP2P,
UseLegacyP2P: manifest.UseLegacyP2P && nodeManifest.UseLegacyP2P,
}
if node.StartAt == testnet.InitialHeight {
@@ -417,16 +417,6 @@ func (t Testnet) ArchiveNodes() []*Node {
return nodes
}
// RandomNode returns a random non-seed node.
func (t Testnet) RandomNode() *Node {
for {
node := t.Nodes[rand.Intn(len(t.Nodes))]
if node.Mode != ModeSeed {
return node
}
}
}
// IPv6 returns true if the testnet is an IPv6 network.
func (t Testnet) IPv6() bool {
return t.IP.IP.To4() == nil

View File

@@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"math/rand"
@@ -29,7 +30,21 @@ const lightClientEvidenceRatio = 4
// DuplicateVoteEvidence.
func InjectEvidence(testnet *e2e.Testnet, amount int) error {
// select a random node
targetNode := testnet.RandomNode()
var targetNode *e2e.Node
for i := 0; i < len(testnet.Nodes)-1; i++ {
targetNode = testnet.Nodes[rand.Intn(len(testnet.Nodes))] // nolint: gosec
if targetNode.Mode == e2e.ModeSeed {
targetNode = nil
continue
}
break
}
if targetNode == nil {
return errors.New("could not find node to inject evidence into")
}
logger.Info(fmt.Sprintf("Injecting evidence through %v (amount: %d)...", targetNode.Name, amount))

View File

@@ -1,6 +1,7 @@
package main
import (
"container/ring"
"context"
"crypto/rand"
"errors"
@@ -93,34 +94,64 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, siz
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
// Each worker gets its own client to each node, which allows for some
// concurrency while still bounding it.
clients := map[string]*rpchttp.HTTP{}
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
var err error
for tx := range chTx {
node := testnet.RandomNode()
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
// check that the node is up
_, err = client.Health(ctx)
if err != nil {
continue
}
clients[node.Name] = client
}
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
for idx := range testnet.Nodes {
// Construct a list of usable nodes for the creating
// load. Don't send load through seed nodes because
// they do not provide the RPC endpoints required to
// broadcast transaction.
if testnet.Nodes[idx].Mode == e2e.ModeSeed {
continue
}
chSuccess <- tx
client, err := testnet.Nodes[idx].Client()
if err != nil {
continue
}
clients = append(clients, client)
}
if len(clients) == 0 {
panic("no clients to process load")
}
// Put the clients in a ring so they can be used in a
// round-robin fashion.
clientRing := ring.New(len(clients))
for idx := range clients {
clientRing.Value = clients[idx]
clientRing = clientRing.Next()
}
var err error
for {
select {
case <-ctx.Done():
return
case tx := <-chTx:
clientRing = clientRing.Next()
client := clientRing.Value.(*rpchttp.HTTP)
if _, err := client.Health(ctx); err != nil {
continue
}
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
continue
}
select {
case chSuccess <- tx:
continue
case <-ctx.Done():
return
}
}
}
}

View File

@@ -238,7 +238,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.RPC.PprofListenAddress = ":6060"
cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false))
cfg.P2P.AddrBookStrict = false
cfg.P2P.DisableLegacy = node.DisableLegacyP2P
cfg.P2P.UseLegacy = node.UseLegacyP2P
cfg.P2P.QueueType = node.QueueType
cfg.DBBackend = node.Database
cfg.StateSync.DiscoveryTime = 5 * time.Second
@@ -342,17 +342,17 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
// MakeAppConfig generates an ABCI application config for a node.
func MakeAppConfig(node *e2e.Node) ([]byte, error) {
cfg := map[string]interface{}{
"chain_id": node.Testnet.Name,
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,
"snapshot_interval": node.SnapshotInterval,
"retain_blocks": node.RetainBlocks,
"key_type": node.PrivvalKey.Type(),
"disable_legacy_p2p": node.DisableLegacyP2P,
"chain_id": node.Testnet.Name,
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,
"snapshot_interval": node.SnapshotInterval,
"retain_blocks": node.RetainBlocks,
"key_type": node.PrivvalKey.Type(),
"use_legacy_p2p": node.UseLegacyP2P,
}
switch node.ABCIProtocol {
case e2e.ProtocolUNIX: