From 4daee74d9c6b1185e681b9aa5a58a45a42305765 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 19 Oct 2022 21:05:04 -0400 Subject: [PATCH] create new map of channel id to message type --- p2p/peer.go | 3 +++ p2p/peer_test.go | 6 +++++- p2p/switch.go | 47 +++++++++++++++++++++++++++-------------------- p2p/test_util.go | 1 + p2p/transport.go | 9 ++++++--- 5 files changed, 42 insertions(+), 24 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index c7db24893..b2f61048a 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -134,6 +134,7 @@ func newPeer( mConfig tmconn.MConnConfig, nodeInfo NodeInfo, reactorsByCh map[byte]Reactor, + msgTypeByChID map[byte]proto.Message, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), options ...PeerOption, @@ -151,6 +152,7 @@ func newPeer( pc.conn, p, reactorsByCh, + msgTypeByChID, chDescs, onPeerError, mConfig, @@ -396,6 +398,7 @@ func createMConnection( conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, + msgTypeByChID map[byte]proto.Message, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), config tmconn.MConnConfig, diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 0cc9de2a8..4551d2dba 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/cosmos/gogoproto/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -83,6 +84,9 @@ func createOutboundPeerAndPerformHandshake( {ID: testCh, Priority: 1}, } reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} + msgTypeByChID := map[byte]proto.Message{ + testCh: &p2p.Message{}, + } pk := ed25519.GenPrivKey() pc, err := testOutboundPeerConn(addr, config, false, pk) if err != nil { @@ -95,7 +99,7 @@ func createOutboundPeerAndPerformHandshake( return nil, err } - p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}) + p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {}) p.SetLogger(log.TestingLogger().With("peer", addr)) return p, nil } diff --git a/p2p/switch.go b/p2p/switch.go index 7106bd935..8f0ae3fe5 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/cosmos/gogoproto/proto" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/cmap" "github.com/tendermint/tendermint/libs/rand" @@ -68,16 +69,17 @@ type PeerFilterFunc func(IPeerSet, Peer) error type Switch struct { service.BaseService - config *config.P2PConfig - reactors map[string]Reactor - chDescs []*conn.ChannelDescriptor - reactorsByCh map[byte]Reactor - peers *PeerSet - dialing *cmap.CMap - reconnecting *cmap.CMap - nodeInfo NodeInfo // our node info - nodeKey *NodeKey // our node privkey - addrBook AddrBook + config *config.P2PConfig + reactors map[string]Reactor + chDescs []*conn.ChannelDescriptor + reactorsByCh map[byte]Reactor + msgTypeByChID map[byte]proto.Message + peers *PeerSet + dialing *cmap.CMap + reconnecting *cmap.CMap + nodeInfo NodeInfo // our node info + nodeKey *NodeKey // our node privkey + addrBook AddrBook // peers addresses with whom we'll maintain constant connection persistentPeersAddrs []*NetAddress unconditionalPeerIDs map[ID]struct{} @@ -112,6 +114,7 @@ func NewSwitch( reactors: make(map[string]Reactor), chDescs: make([]*conn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), + msgTypeByChID: make(map[byte]proto.Message), peers: NewPeerSet(), dialing: cmap.NewCMap(), reconnecting: cmap.NewCMap(), @@ -163,6 +166,7 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { } sw.chDescs = append(sw.chDescs, chDesc) sw.reactorsByCh[chID] = reactor + sw.msgTypeByChID[chID] = chDesc.MessageType } sw.reactors[name] = reactor reactor.SetSwitch(sw) @@ -181,6 +185,7 @@ func (sw *Switch) RemoveReactor(name string, reactor Reactor) { } } delete(sw.reactorsByCh, chDesc.ID) + delete(sw.msgTypeByChID, chDesc.ID) } delete(sw.reactors, name) reactor.SetSwitch(nil) @@ -654,11 +659,12 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool { func (sw *Switch) acceptRoutine() { for { p, err := sw.transport.Accept(peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - reactorsByCh: sw.reactorsByCh, - metrics: sw.metrics, - isPersistent: sw.IsPeerPersistent, + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + reactorsByCh: sw.reactorsByCh, + msgTypeByChID: sw.msgTypeByChID, + metrics: sw.metrics, + isPersistent: sw.IsPeerPersistent, }) if err != nil { switch err := err.(type) { @@ -757,11 +763,12 @@ func (sw *Switch) addOutboundPeerWithConfig( } p, err := sw.transport.Dial(*addr, peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - metrics: sw.metrics, + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + isPersistent: sw.IsPeerPersistent, + reactorsByCh: sw.reactorsByCh, + msgTypeByChID: sw.msgTypeByChID, + metrics: sw.metrics, }) if err != nil { if e, ok := err.(ErrRejected); ok { diff --git a/p2p/test_util.go b/p2p/test_util.go index 4e56f0193..14af8c520 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -149,6 +149,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { MConnConfig(sw.config), ni, sw.reactorsByCh, + sw.msgTypeByChID, sw.chDescs, sw.StopPeerForError, ) diff --git a/p2p/transport.go b/p2p/transport.go index e6e19a901..c16376fe8 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -8,6 +8,7 @@ import ( "golang.org/x/net/netutil" + "github.com/cosmos/gogoproto/proto" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/p2p/conn" @@ -47,9 +48,10 @@ type peerConfig struct { // isPersistent allows you to set a function, which, given socket address // (for outbound peers) OR self-reported address (for inbound peers), tells // if the peer is persistent or not. - isPersistent func(*NetAddress) bool - reactorsByCh map[byte]Reactor - metrics *Metrics + isPersistent func(*NetAddress) bool + reactorsByCh map[byte]Reactor + msgTypeByChID map[byte]proto.Message + metrics *Metrics } // Transport emits and connects to Peers. The implementation of Peer is left to @@ -519,6 +521,7 @@ func (mt *MultiplexTransport) wrapPeer( mt.mConfig, ni, cfg.reactorsByCh, + cfg.msgTypeByChID, cfg.chDescs, cfg.onPeerError, PeerMetrics(cfg.metrics),