create new map of channel id to message type

This commit is contained in:
William Banfield
2022-10-19 21:05:04 -04:00
parent 1053a49b4e
commit 4daee74d9c
5 changed files with 42 additions and 24 deletions

View File

@@ -134,6 +134,7 @@ func newPeer(
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
@@ -151,6 +152,7 @@ func newPeer(
pc.conn,
p,
reactorsByCh,
msgTypeByChID,
chDescs,
onPeerError,
mConfig,
@@ -396,6 +398,7 @@ func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -83,6 +84,9 @@ func createOutboundPeerAndPerformHandshake(
{ID: testCh, Priority: 1},
}
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
msgTypeByChID := map[byte]proto.Message{
testCh: &p2p.Message{},
}
pk := ed25519.GenPrivKey()
pc, err := testOutboundPeerConn(addr, config, false, pk)
if err != nil {
@@ -95,7 +99,7 @@ func createOutboundPeerAndPerformHandshake(
return nil, err
}
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {})
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}

View File

@@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/rand"
@@ -68,16 +69,17 @@ type PeerFilterFunc func(IPeerSet, Peer) error
type Switch struct {
service.BaseService
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[ID]struct{}
@@ -112,6 +114,7 @@ func NewSwitch(
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
msgTypeByChID: make(map[byte]proto.Message),
peers: NewPeerSet(),
dialing: cmap.NewCMap(),
reconnecting: cmap.NewCMap(),
@@ -163,6 +166,7 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
}
sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chID] = reactor
sw.msgTypeByChID[chID] = chDesc.MessageType
}
sw.reactors[name] = reactor
reactor.SetSwitch(sw)
@@ -181,6 +185,7 @@ func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
}
}
delete(sw.reactorsByCh, chDesc.ID)
delete(sw.msgTypeByChID, chDesc.ID)
}
delete(sw.reactors, name)
reactor.SetSwitch(nil)
@@ -654,11 +659,12 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool {
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
})
if err != nil {
switch err := err.(type) {
@@ -757,11 +763,12 @@ func (sw *Switch) addOutboundPeerWithConfig(
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
})
if err != nil {
if e, ok := err.(ErrRejected); ok {

View File

@@ -149,6 +149,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
MConnConfig(sw.config),
ni,
sw.reactorsByCh,
sw.msgTypeByChID,
sw.chDescs,
sw.StopPeerForError,
)

View File

@@ -8,6 +8,7 @@ import (
"golang.org/x/net/netutil"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
@@ -47,9 +48,10 @@ type peerConfig struct {
// isPersistent allows you to set a function, which, given socket address
// (for outbound peers) OR self-reported address (for inbound peers), tells
// if the peer is persistent or not.
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
metrics *Metrics
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
metrics *Metrics
}
// Transport emits and connects to Peers. The implementation of Peer is left to
@@ -519,6 +521,7 @@ func (mt *MultiplexTransport) wrapPeer(
mt.mConfig,
ni,
cfg.reactorsByCh,
cfg.msgTypeByChID,
cfg.chDescs,
cfg.onPeerError,
PeerMetrics(cfg.metrics),