p2p: delete legacy stack initial pass (#7035)

A few notes:

- this is not all the deletion that we can do, but this is the most
  "simple" case: it leaves in shims, and there's some trivial
  additional cleanup to the transport that can happen but that
  requires writing more code, and I wanted this to be easy to review
  above all else.
  
- This should land *after* we cut the branch for 0.35, but I'm
  anticipating that to happen soon, and I wanted to run this through
  CI.
This commit is contained in:
Sam Kleinman
2021-10-05 09:40:32 -04:00
committed by GitHub
parent f5b9c210ca
commit 03ad7d6f20
54 changed files with 127 additions and 10207 deletions

View File

@@ -54,10 +54,8 @@ type nodeImpl struct {
// network
transport *p2p.MConnTransport
sw *p2p.Switch // p2p connections
peerManager *p2p.PeerManager
router *p2p.Router
addrBook pex.AddrBook // known peers
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
@@ -292,14 +290,6 @@ func makeNode(cfg *config.Config,
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}
// TODO: Remove this once the switch is removed.
var bcReactorForSwitch p2p.Reactor
if bcReactorShim != nil {
bcReactorForSwitch = bcReactorShim
} else {
bcReactorForSwitch = bcReactor.(p2p.Reactor)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
@@ -312,29 +302,15 @@ func makeNode(cfg *config.Config,
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
var (
stateSyncReactor *statesync.Reactor
stateSyncReactorShim *p2p.ReactorShim
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
if cfg.P2P.UseLegacy {
channels = getChannelsFromShim(stateSyncReactorShim)
peerUpdates = stateSyncReactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
stateSyncReactor = statesync.NewReactor(
ssLogger := logger.With("module", "statesync")
ssReactorShim := p2p.NewReactorShim(ssLogger, "StateSyncShim", statesync.ChannelShims)
channels := makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates := peerManager.Subscribe()
stateSyncReactor := statesync.NewReactor(
genDoc.ChainID,
genDoc.InitialHeight,
*cfg.StateSync,
stateSyncReactorShim.Logger,
ssLogger,
proxyApp.Snapshot(),
proxyApp.Query(),
channels[statesync.SnapshotChannel],
@@ -353,10 +329,10 @@ func makeNode(cfg *config.Config,
// transports can either be agnostic to channel descriptors or can be
// declared in the constructor.
transport.AddChannelDescriptors(mpReactorShim.GetChannels())
transport.AddChannelDescriptors(bcReactorForSwitch.GetChannels())
transport.AddChannelDescriptors(bcReactorShim.GetChannels())
transport.AddChannelDescriptors(csReactorShim.GetChannels())
transport.AddChannelDescriptors(evReactorShim.GetChannels())
transport.AddChannelDescriptors(stateSyncReactorShim.GetChannels())
transport.AddChannelDescriptors(ssReactorShim.GetChannels())
// Optionally, start the pex reactor
//
@@ -371,44 +347,14 @@ func makeNode(cfg *config.Config,
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var (
pexReactor service.Service
sw *p2p.Switch
addrBook pex.AddrBook
)
var pexReactor service.Service
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if cfg.P2P.UseLegacy {
// setup Transport and Switch
sw = createSwitch(
cfg, transport, nodeMetrics.p2p, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, cfg, sw, logger)
} else {
addrBook = nil
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
if err != nil {
return nil, err
}
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
if err != nil {
return nil, err
}
if cfg.RPC.PprofListenAddress != "" {
@@ -424,10 +370,8 @@ func makeNode(cfg *config.Config,
privValidator: privValidator,
transport: transport,
sw: sw,
peerManager: peerManager,
router: router,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
@@ -456,7 +400,6 @@ func makeNode(cfg *config.Config,
ConsensusReactor: csReactor,
BlockSyncReactor: bcReactor.(consensus.BlockSyncReactor),
P2PPeers: sw,
PeerManager: peerManager,
GenDoc: genDoc,
@@ -468,17 +411,6 @@ func makeNode(cfg *config.Config,
},
}
// this is a terrible, because typed nil interfaces are not ==
// nil, so this is just cleanup to avoid having a non-nil
// value in the RPC environment that has the semantic
// properties of nil.
if sw == nil {
node.rpcEnv.P2PPeers = nil
} else if peerManager == nil {
node.rpcEnv.PeerManager = nil
}
// end hack
node.rpcEnv.P2PTransport = node
node.BaseService = *service.NewBaseService(logger, "Node", node)
@@ -525,11 +457,7 @@ func makeSeedNode(cfg *config.Config,
return nil, fmt.Errorf("failed to create router: %w", err)
}
var (
pexReactor service.Service
sw *p2p.Switch
addrBook pex.AddrBook
)
var pexReactor service.Service
// add the pex reactor
// FIXME: we add channel descriptors to both the router and the transport but only the router
@@ -538,33 +466,9 @@ func makeSeedNode(cfg *config.Config,
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if cfg.P2P.UseLegacy {
sw = createSwitch(
cfg, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, cfg, sw, logger)
} else {
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
if err != nil {
return nil, err
}
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
if err != nil {
return nil, err
}
if cfg.RPC.PprofListenAddress != "" {
@@ -579,8 +483,6 @@ func makeSeedNode(cfg *config.Config,
genesisDoc: genDoc,
transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
@@ -627,15 +529,8 @@ func (n *nodeImpl) OnStart() error {
}
n.isListening = true
n.Logger.Info("p2p service", "legacy_enabled", n.config.P2P.UseLegacy)
if n.config.P2P.UseLegacy {
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
if err = n.sw.Start(); err != nil {
return err
}
} else if err = n.router.Start(); err != nil {
if err = n.router.Start(); err != nil {
return err
}
@@ -667,13 +562,7 @@ func (n *nodeImpl) OnStart() error {
}
}
if n.config.P2P.UseLegacy {
// Always connect to persistent peers
err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
}
} else if err := n.pexReactor.Start(); err != nil {
if err := n.pexReactor.Start(); err != nil {
return err
}
@@ -794,14 +683,8 @@ func (n *nodeImpl) OnStop() {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
if n.config.P2P.UseLegacy {
if err := n.sw.Stop(); err != nil {
n.Logger.Error("failed to stop switch", "err", err)
}
} else {
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
if err := n.transport.Close(); err != nil {
@@ -1216,12 +1099,3 @@ func makeChannelsFromShims(
return channels
}
func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Channel {
channels := map[p2p.ChannelID]*p2p.Channel{}
for chID := range reactorShim.Channels {
channels[chID] = reactorShim.GetChannel(chID)
}
return channels
}

View File

@@ -2,16 +2,13 @@ package node
import (
"bytes"
"context"
"fmt"
"math"
"net"
"time"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
@@ -161,18 +158,8 @@ func createMempoolReactor(
channelShims := mempoolv0.GetChannelShims(cfg.Mempool)
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if cfg.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
}
channels := makeChannelsFromShims(router, channelShims)
peerUpdates := peerManager.Subscribe()
switch cfg.Mempool.Version {
case config.MempoolV0:
@@ -255,23 +242,10 @@ func createEvidenceReactor(
return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if cfg.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
evidenceReactor := evidence.NewReactor(
logger,
channels[evidence.EvidenceChannel],
peerUpdates,
makeChannelsFromShims(router, evidence.ChannelShims)[evidence.EvidenceChannel],
peerManager.Subscribe(),
evidencePool,
)
@@ -294,19 +268,8 @@ func createBlockchainReactor(
logger = logger.With("module", "blockchain")
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if cfg.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
channels := makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates := peerManager.Subscribe()
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
@@ -357,13 +320,8 @@ func createConsensusReactor(
peerUpdates *p2p.PeerUpdates
)
if cfg.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, consensus.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
channels = makeChannelsFromShims(router, consensus.ChannelShims)
peerUpdates = peerManager.Subscribe()
reactor := consensus.NewReactor(
logger,
@@ -500,142 +458,6 @@ func createRouter(
)
}
func createSwitch(
cfg *config.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
mempoolReactor *p2p.ReactorShim,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *p2p.ReactorShim,
evidenceReactor *p2p.ReactorShim,
proxyApp proxy.AppConns,
nodeInfo types.NodeInfo,
nodeKey types.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
var (
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !cfg.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
}
// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
if cfg.FilterPeers {
connFilters = append(
connFilters,
// ABCI query for address filtering.
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
peerFilters = append(
peerFilters,
// ABCI query for ID filtering.
func(_ p2p.IPeerSet, p p2p.Peer) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
}
sw := p2p.NewSwitch(
cfg.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
p2p.SwitchConnFilters(connFilters...),
)
sw.SetLogger(p2pLogger)
if cfg.Mode != config.ModeSeed {
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
}
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", cfg.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(cfg *config.Config, sw *p2p.Switch,
p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(cfg.P2P.AddrBookFile(), cfg.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", cfg.P2P.AddrBookFile()))
// Add ourselves to addrbook to prevent dialing ourselves
if cfg.P2P.ExternalAddress != "" {
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(cfg.P2P.ExternalAddress))
if err != nil {
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
if cfg.P2P.ListenAddress != "" {
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
if err != nil {
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
sw.SetAddrBook(addrBook)
return addrBook, nil
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, cfg *config.Config,
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
reactorConfig := &pex.ReactorConfig{
Seeds: tmstrings.SplitAndTrimEmpty(cfg.P2P.Seeds, ",", " "),
SeedMode: cfg.Mode == config.ModeSeed,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: cfg.P2P.PersistentPeersMaxDialPeriod,
}
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook, reactorConfig)
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
func createPEXReactorV2(
cfg *config.Config,
logger log.Logger,