node: cleanup pex initialization (#6467)

This commit is contained in:
Sam Kleinman
2021-05-14 10:05:59 -04:00
committed by GitHub
parent 15a67b37d8
commit fa891c5a4b
5 changed files with 67 additions and 56 deletions

View File

@@ -118,7 +118,7 @@ func NewRunNodeCmd(nodeProvider nm.Provider) *cobra.Command {
return fmt.Errorf("failed to start node: %w", err)
}
logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo())
logger.Info("Started node", "nodeInfo", n.NodeInfo())
// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {

View File

@@ -348,27 +348,6 @@ func NewNode(config *cfg.Config,
transport.AddChannelDescriptors(evReactorShim.GetChannels())
transport.AddChannelDescriptors(stateSyncReactorShim.GetChannels())
// setup Transport and Switch
sw := createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
// Optionally, start the pex reactor
//
// TODO:
@@ -381,22 +360,46 @@ func NewNode(config *cfg.Config,
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
sw *p2p.Switch
addrBook pex.AddrBook
)
if config.P2P.PexReactor {
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
addrBook = nil
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
// setup Transport and Switch
sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
if config.RPC.PprofListenAddress != "" {
@@ -609,9 +612,6 @@ func (n *Node) OnStart() error {
time.Sleep(genTime.Sub(now))
}
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" && n.config.Mode != cfg.ModeSeed {
@@ -643,6 +643,8 @@ func (n *Node) OnStart() error {
if n.config.P2P.DisableLegacy {
err = n.router.Start()
} else {
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
err = n.sw.Start()
}
if err != nil {
@@ -682,12 +684,13 @@ func (n *Node) OnStart() error {
if err := n.pexReactorV2.Start(); err != nil {
return err
}
}
} else {
// Always connect to persistent peers
err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
}
// Always connect to persistent peers
err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
}
// Run state sync

View File

@@ -10,7 +10,6 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@@ -791,19 +790,27 @@ func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
errCh <- r.sendPeer(peerID, conn, sendQueue)
}()
err := <-errCh
e1 := <-errCh
err := e1
_ = conn.Close()
sendQueue.close()
if e := <-errCh; err == nil {
e2 := <-errCh
if err == nil {
// The first err was nil, so we update it with the second err, which may
// or may not be nil.
err = e
err = e2
}
switch err {
case nil, io.EOF:
r.logger.Info("peer disconnected", "peer", peerID, "endpoint", conn)
r.logger.Info("peer disconnected",
"peer", peerID,
"endpoint", conn,
"err", e1,
"err2", e2,
)
default:
r.logger.Error("peer failure", "peer", peerID, "endpoint", conn, "err", err)

View File

@@ -33,7 +33,7 @@ var (
nodeABCIProtocols = uniformChoice{"unix", "tcp", "builtin"} // "grpc"
nodePrivvalProtocols = uniformChoice{"file", "unix", "tcp", "grpc"}
// FIXME: v2 disabled due to flake
nodeFastSyncs = uniformChoice{"", "v0"} // "v2"
nodeFastSyncs = uniformChoice{"v0"} // "v2"
nodeStateSyncs = uniformChoice{false, true}
nodePersistIntervals = uniformChoice{0, 1, 5}
nodeSnapshotIntervals = uniformChoice{0, 3}

View File

@@ -339,16 +339,17 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
// MakeAppConfig generates an ABCI application config for a node.
func MakeAppConfig(node *e2e.Node) ([]byte, error) {
cfg := map[string]interface{}{
"chain_id": node.Testnet.Name,
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,
"snapshot_interval": node.SnapshotInterval,
"retain_blocks": node.RetainBlocks,
"key_type": node.PrivvalKey.Type(),
"chain_id": node.Testnet.Name,
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,
"snapshot_interval": node.SnapshotInterval,
"retain_blocks": node.RetainBlocks,
"key_type": node.PrivvalKey.Type(),
"disable_legacy_p2p": node.DisableLegacyP2P,
}
switch node.ABCIProtocol {
case e2e.ProtocolUNIX: