diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 12dd504e3..7d312b4f8 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -121,7 +121,7 @@ jobs: - run: | cat ./*profile.out | grep -v "mode: atomic" >> coverage.txt if: env.GIT_DIFF - - uses: codecov/codecov-action@v2.0.2 + - uses: codecov/codecov-action@v2.0.3 with: file: ./coverage.txt if: env.GIT_DIFF diff --git a/.golangci.yml b/.golangci.yml index f05cde90c..574ed22b0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -26,7 +26,7 @@ linters: # - maligned - nakedret - prealloc - - scopelint + - exportloopref - staticcheck - structcheck - stylecheck diff --git a/config/config.go b/config/config.go index 7d19616aa..7e8dd5976 100644 --- a/config/config.go +++ b/config/config.go @@ -694,13 +694,14 @@ type P2PConfig struct { //nolint: maligned // Force dial to fail TestDialFail bool `mapstructure:"test-dial-fail"` - // DisableLegacy is used mostly for testing to enable or disable the legacy - // P2P stack. - DisableLegacy bool `mapstructure:"disable-legacy"` + // UseLegacy enables the "legacy" P2P implementation and + // disables the newer default implementation. This flag will + // be removed in a future release. + UseLegacy bool `mapstructure:"use-legacy"` // Makes it possible to configure which queue backend the p2p // layer uses. Options are: "fifo", "priority" and "wdrr", - // with the default being "fifo". + // with the default being "priority". QueueType string `mapstructure:"queue-type"` } @@ -732,6 +733,7 @@ func DefaultP2PConfig() *P2PConfig { DialTimeout: 3 * time.Second, TestDialFail: false, QueueType: "priority", + UseLegacy: false, } } diff --git a/config/toml.go b/config/toml.go index edb192109..76058802c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -271,7 +271,7 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}" [p2p] # Enable the new p2p layer. -disable-legacy = {{ .P2P.DisableLegacy }} +use-legacy = {{ .P2P.UseLegacy }} # Select the p2p internal queue queue-type = "{{ .P2P.QueueType }}" diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 4da989b40..6379b71d5 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -916,8 +916,8 @@ func (cs *State) handleMsg(mi msgInfo) { "height", cs.Height, "round", cs.Round, "peer", peerID, + "msg_type", fmt.Sprintf("%T", msg), "err", err, - "msg", msg, ) } } diff --git a/node/node.go b/node/node.go index 9cb5315bc..0dedd2861 100644 --- a/node/node.go +++ b/node/node.go @@ -319,12 +319,12 @@ func makeNode(config *cfg.Config, stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, statesync.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(stateSyncReactorShim) peerUpdates = stateSyncReactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, statesync.ChannelShims) + peerUpdates = peerManager.Subscribe() } stateSyncReactor = statesync.NewReactor( @@ -373,13 +373,7 @@ func makeNode(config *cfg.Config, pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.DisableLegacy { - addrBook = nil - pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { + if config.P2P.UseLegacy { // setup Transport and Switch sw = createSwitch( config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, @@ -402,6 +396,12 @@ func makeNode(config *cfg.Config, } pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + } else { + addrBook = nil + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } } if config.RPC.PprofListenAddress != "" { @@ -461,6 +461,17 @@ func makeNode(config *cfg.Config, }, } + // this is a terrible, because typed nil interfaces are not == + // nil, so this is just cleanup to avoid having a non-nil + // value in the RPC environment that has the semantic + // properties of nil. + if sw == nil { + node.rpcEnv.P2PPeers = nil + } else if peerManager == nil { + node.rpcEnv.PeerManager = nil + } + // end hack + node.rpcEnv.P2PTransport = node node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -519,12 +530,8 @@ func makeSeedNode(config *cfg.Config, // p2p stack is removed. pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.DisableLegacy { - pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { + + if config.P2P.UseLegacy { sw = createSwitch( config, transport, p2pMetrics, nil, nil, nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, @@ -546,6 +553,11 @@ func makeSeedNode(config *cfg.Config, } pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + } else { + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } } if config.RPC.PprofListenAddress != "" { @@ -608,18 +620,16 @@ func (n *nodeImpl) OnStart() error { } n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy) + n.Logger.Info("p2p service", "legacy_enabled", n.config.P2P.UseLegacy) - if n.config.P2P.DisableLegacy { - if err = n.router.Start(); err != nil { - return err - } - } else { + if n.config.P2P.UseLegacy { // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) if err = n.sw.Start(); err != nil { return err } + } else if err = n.router.Start(); err != nil { + return err } if n.config.Mode != cfg.ModeSeed { @@ -650,16 +660,14 @@ func (n *nodeImpl) OnStart() error { } } - if n.config.P2P.DisableLegacy { - if err := n.pexReactor.Start(); err != nil { - return err - } - } else { + if n.config.P2P.UseLegacy { // Always connect to persistent peers err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) } + } else if err := n.pexReactor.Start(); err != nil { + return err } // Run state sync @@ -738,14 +746,14 @@ func (n *nodeImpl) OnStop() { n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) } - if n.config.P2P.DisableLegacy { - if err := n.router.Stop(); err != nil { - n.Logger.Error("failed to stop router", "err", err) - } - } else { + if n.config.P2P.UseLegacy { if err := n.sw.Stop(); err != nil { n.Logger.Error("failed to stop switch", "err", err) } + } else { + if err := n.router.Stop(); err != nil { + n.Logger.Error("failed to stop router", "err", err) + } } if err := n.transport.Close(); err != nil { diff --git a/node/node_test.go b/node/node_test.go index 64b28c0bb..885bddcfc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -43,6 +43,7 @@ import ( func TestNodeStartStop(t *testing.T) { config := cfg.ResetTestRoot("node_node_test") + defer os.RemoveAll(config.RootDir) // create & start node @@ -53,8 +54,6 @@ func TestNodeStartStop(t *testing.T) { n, ok := ns.(*nodeImpl) require.True(t, ok) - t.Logf("Started node %v", n.sw.NodeInfo()) - // wait for the node to produce a block blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock) require.NoError(t, err) diff --git a/node/setup.go b/node/setup.go index 6d2a7523b..00f8051f0 100644 --- a/node/setup.go +++ b/node/setup.go @@ -166,12 +166,12 @@ func createMempoolReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, channelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, channelShims) + peerUpdates = peerManager.Subscribe() } switch config.Mempool.Version { @@ -260,12 +260,12 @@ func createEvidenceReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, evidence.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, evidence.ChannelShims) + peerUpdates = peerManager.Subscribe() } evidenceReactor := evidence.NewReactor( @@ -302,12 +302,12 @@ func createBlockchainReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, bcv0.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, bcv0.ChannelShims) + peerUpdates = peerManager.Subscribe() } reactor, err := bcv0.NewReactor( @@ -366,12 +366,12 @@ func createConsensusReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, cs.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, cs.ChannelShims) + peerUpdates = peerManager.Subscribe() } reactor := cs.NewReactor( diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index f699b1162..12997eb81 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -107,11 +107,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er switch opt["p2p"].(P2PMode) { case NewP2PMode: - manifest.DisableLegacyP2P = true + manifest.UseLegacyP2P = true case LegacyP2PMode: - manifest.DisableLegacyP2P = false + manifest.UseLegacyP2P = false case HybridP2PMode: - manifest.DisableLegacyP2P = false + manifest.UseLegacyP2P = true p2pNodeFactor = 2 default: return manifest, fmt.Errorf("unknown p2p mode %s", opt["p2p"]) @@ -138,9 +138,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false) if p2pNodeFactor == 0 { - node.DisableLegacyP2P = manifest.DisableLegacyP2P + node.UseLegacyP2P = manifest.UseLegacyP2P } else if p2pNodeFactor%i == 0 { - node.DisableLegacyP2P = !manifest.DisableLegacyP2P + node.UseLegacyP2P = !manifest.UseLegacyP2P } manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node @@ -162,9 +162,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2) if p2pNodeFactor == 0 { - node.DisableLegacyP2P = manifest.DisableLegacyP2P + node.UseLegacyP2P = manifest.UseLegacyP2P } else if p2pNodeFactor%i == 0 { - node.DisableLegacyP2P = !manifest.DisableLegacyP2P + node.UseLegacyP2P = !manifest.UseLegacyP2P } manifest.Nodes[name] = node @@ -198,9 +198,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false) if p2pNodeFactor == 0 { - node.DisableLegacyP2P = manifest.DisableLegacyP2P + node.UseLegacyP2P = manifest.UseLegacyP2P } else if p2pNodeFactor%i == 0 { - node.DisableLegacyP2P = !manifest.DisableLegacyP2P + node.UseLegacyP2P = !manifest.UseLegacyP2P } manifest.Nodes[fmt.Sprintf("full%02d", i)] = node } diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 5711be37d..1b0fc8753 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -59,8 +59,8 @@ type Manifest struct { // by individual nodes. LogLevel string `toml:"log_level"` - // DisableLegacyP2P enables use of the new p2p layer for all nodes in a test. - DisableLegacyP2P bool `toml:"disable_legacy_p2p"` + // UseLegacyP2P uses the legacy p2p layer for all nodes in a test. + UseLegacyP2P bool `toml:"use_legacy_p2p"` // QueueType describes the type of queue that the system uses internally QueueType string `toml:"queue_type"` @@ -147,8 +147,8 @@ type ManifestNode struct { // level. LogLevel string `toml:"log_level"` - // UseNewP2P enables use of the new p2p layer for this node. - DisableLegacyP2P bool `toml:"disable_legacy_p2p"` + // UseLegacyP2P enables use of the legacy p2p layer for this node. + UseLegacyP2P bool `toml:"use_legacy_p2p"` } // Save saves the testnet manifest to a file. diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index cfeb54bde..e51fa859e 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -92,7 +92,7 @@ type Node struct { PersistentPeers []*Node Perturbations []Perturbation LogLevel string - DisableLegacyP2P bool + UseLegacyP2P bool QueueType string } @@ -177,7 +177,7 @@ func LoadTestnet(file string) (*Testnet, error) { Perturbations: []Perturbation{}, LogLevel: manifest.LogLevel, QueueType: manifest.QueueType, - DisableLegacyP2P: manifest.DisableLegacyP2P || nodeManifest.DisableLegacyP2P, + UseLegacyP2P: manifest.UseLegacyP2P && nodeManifest.UseLegacyP2P, } if node.StartAt == testnet.InitialHeight { @@ -417,16 +417,6 @@ func (t Testnet) ArchiveNodes() []*Node { return nodes } -// RandomNode returns a random non-seed node. -func (t Testnet) RandomNode() *Node { - for { - node := t.Nodes[rand.Intn(len(t.Nodes))] - if node.Mode != ModeSeed { - return node - } - } -} - // IPv6 returns true if the testnet is an IPv6 network. func (t Testnet) IPv6() bool { return t.IP.IP.To4() == nil diff --git a/test/e2e/runner/evidence.go b/test/e2e/runner/evidence.go index 6a246dcb5..30e8d9f0a 100644 --- a/test/e2e/runner/evidence.go +++ b/test/e2e/runner/evidence.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "errors" "fmt" "io/ioutil" "math/rand" @@ -29,7 +30,21 @@ const lightClientEvidenceRatio = 4 // DuplicateVoteEvidence. func InjectEvidence(testnet *e2e.Testnet, amount int) error { // select a random node - targetNode := testnet.RandomNode() + var targetNode *e2e.Node + + for i := 0; i < len(testnet.Nodes)-1; i++ { + targetNode = testnet.Nodes[rand.Intn(len(testnet.Nodes))] // nolint: gosec + if targetNode.Mode == e2e.ModeSeed { + targetNode = nil + continue + } + + break + } + + if targetNode == nil { + return errors.New("could not find node to inject evidence into") + } logger.Info(fmt.Sprintf("Injecting evidence through %v (amount: %d)...", targetNode.Name, amount)) diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index 518e32564..b57c96ddf 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -1,6 +1,7 @@ package main import ( + "container/ring" "context" "crypto/rand" "errors" @@ -93,34 +94,64 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, siz // loadProcess processes transactions func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) { - // Each worker gets its own client to each node, which allows for some - // concurrency while still bounding it. - clients := map[string]*rpchttp.HTTP{} + // Each worker gets its own client to each usable node, which + // allows for some concurrency while still bounding it. + clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes)) - var err error - for tx := range chTx { - node := testnet.RandomNode() - - client, ok := clients[node.Name] - if !ok { - client, err = node.Client() - if err != nil { - continue - } - - // check that the node is up - _, err = client.Health(ctx) - if err != nil { - continue - } - - clients[node.Name] = client - } - - if _, err = client.BroadcastTxSync(ctx, tx); err != nil { + for idx := range testnet.Nodes { + // Construct a list of usable nodes for the creating + // load. Don't send load through seed nodes because + // they do not provide the RPC endpoints required to + // broadcast transaction. + if testnet.Nodes[idx].Mode == e2e.ModeSeed { continue } - chSuccess <- tx + client, err := testnet.Nodes[idx].Client() + if err != nil { + continue + } + + clients = append(clients, client) + } + + if len(clients) == 0 { + panic("no clients to process load") + } + + // Put the clients in a ring so they can be used in a + // round-robin fashion. + clientRing := ring.New(len(clients)) + for idx := range clients { + clientRing.Value = clients[idx] + clientRing = clientRing.Next() + } + + var err error + + for { + select { + case <-ctx.Done(): + return + case tx := <-chTx: + clientRing = clientRing.Next() + client := clientRing.Value.(*rpchttp.HTTP) + + if _, err := client.Health(ctx); err != nil { + continue + } + + if _, err = client.BroadcastTxSync(ctx, tx); err != nil { + continue + } + + select { + case chSuccess <- tx: + continue + case <-ctx.Done(): + return + } + + } } } diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index c968ef306..a0bd4996a 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -238,7 +238,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) cfg.P2P.AddrBookStrict = false - cfg.P2P.DisableLegacy = node.DisableLegacyP2P + cfg.P2P.UseLegacy = node.UseLegacyP2P cfg.P2P.QueueType = node.QueueType cfg.DBBackend = node.Database cfg.StateSync.DiscoveryTime = 5 * time.Second @@ -342,17 +342,17 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { // MakeAppConfig generates an ABCI application config for a node. func MakeAppConfig(node *e2e.Node) ([]byte, error) { cfg := map[string]interface{}{ - "chain_id": node.Testnet.Name, - "dir": "data/app", - "listen": AppAddressUNIX, - "mode": node.Mode, - "proxy_port": node.ProxyPort, - "protocol": "socket", - "persist_interval": node.PersistInterval, - "snapshot_interval": node.SnapshotInterval, - "retain_blocks": node.RetainBlocks, - "key_type": node.PrivvalKey.Type(), - "disable_legacy_p2p": node.DisableLegacyP2P, + "chain_id": node.Testnet.Name, + "dir": "data/app", + "listen": AppAddressUNIX, + "mode": node.Mode, + "proxy_port": node.ProxyPort, + "protocol": "socket", + "persist_interval": node.PersistInterval, + "snapshot_interval": node.SnapshotInterval, + "retain_blocks": node.RetainBlocks, + "key_type": node.PrivvalKey.Type(), + "use_legacy_p2p": node.UseLegacyP2P, } switch node.ABCIProtocol { case e2e.ProtocolUNIX: