diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index a6ff8847d..686bb5510 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -62,7 +62,8 @@ func setup( fastSync: true, } - rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, BlockchainChannel, new(bcproto.Message), int(chBuf)) + chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel)} + rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf)) i := 0 for nodeID := range rts.network.Nodes { diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 6e2c43df8..cc4c2feb3 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -47,6 +47,12 @@ type reactorTestSuite struct { voteSetBitsChannels map[p2p.NodeID]*p2p.Channel } +func chDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { + return p2p.ChannelDescriptor{ + ID: byte(chID), + } +} + func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSuite { t.Helper() @@ -57,10 +63,10 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu subs: make(map[p2p.NodeID]types.Subscription, numNodes), } - rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, StateChannel, new(tmcons.Message), size) - rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, DataChannel, new(tmcons.Message), size) - rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, VoteChannel, new(tmcons.Message), size) - rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, VoteSetBitsChannel, new(tmcons.Message), size) + rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel), new(tmcons.Message), size) + rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel), new(tmcons.Message), size) + rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel), new(tmcons.Message), size) + rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel), new(tmcons.Message), size) i := 0 for nodeID, node := range rts.network.Nodes { diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 684a9f406..455beb152 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -62,8 +62,9 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite { peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, numStateStores), } + chDesc := p2p.ChannelDescriptor{ID: byte(evidence.EvidenceChannel)} rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, - evidence.EvidenceChannel, + chDesc, new(tmproto.EvidenceList), int(chBuf)) require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index f9c609dac..71d572247 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -48,7 +48,8 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reac peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, numNodes), } - rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, MempoolChannel, new(protomem.Message), int(chBuf)) + chDesc := p2p.ChannelDescriptor{ID: byte(MempoolChannel)} + rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) for nodeID := range rts.network.Nodes { rts.kvstores[nodeID] = kvstore.NewApplication() diff --git a/node/node.go b/node/node.go index fe135a079..10322333c 100644 --- a/node/node.go +++ b/node/node.go @@ -829,7 +829,7 @@ func createPEXReactorV2( router *p2p.Router, ) (*pex.ReactorV2, error) { - channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 4096) + channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096) if err != nil { return nil, err } @@ -961,8 +961,8 @@ func NewSeedNode(config *cfg.Config, // FIXME: we add channel descriptors to both the router and the transport but only the router // should be aware of channel info. We should remove this from transport once the legacy // p2p stack is removed. - router.AddChannelDescriptors(pex.ChannelDescriptors()) - transport.AddChannelDescriptors(pex.ChannelDescriptors()) + pexCh := pex.ChannelDescriptor() + transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) if config.P2P.DisableLegacy { pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) if err != nil { @@ -1213,14 +1213,10 @@ func NewNode(config *cfg.Config, config.StateSync.TempDir, ) - // add the channel descriptors to both the router and the underlying - // transports - router.AddChannelDescriptors(mpReactorShim.GetChannels()) - router.AddChannelDescriptors(bcReactorForSwitch.GetChannels()) - router.AddChannelDescriptors(csReactorShim.GetChannels()) - router.AddChannelDescriptors(evReactorShim.GetChannels()) - router.AddChannelDescriptors(stateSyncReactorShim.GetChannels()) - + // add the channel descriptors to both the transports + // FIXME: This should be removed when the legacy p2p stack is removed and + // transports can either be agnostic to channel descriptors or can be + // declared in the constructor. transport.AddChannelDescriptors(mpReactorShim.GetChannels()) transport.AddChannelDescriptors(bcReactorForSwitch.GetChannels()) transport.AddChannelDescriptors(csReactorShim.GetChannels()) @@ -1266,8 +1262,8 @@ func NewNode(config *cfg.Config, ) if config.P2P.PexReactor { - router.AddChannelDescriptors(pex.ChannelDescriptors()) - transport.AddChannelDescriptors(pex.ChannelDescriptors()) + pexCh := pex.ChannelDescriptor() + transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) if config.P2P.DisableLegacy { pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) if err != nil { @@ -2058,7 +2054,7 @@ func makeChannelsFromShims( channels := map[p2p.ChannelID]*p2p.Channel{} for chID, chShim := range chShims { - ch, err := router.OpenChannel(chID, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity) + ch, err := router.OpenChannel(*chShim.Descriptor, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity) if err != nil { panic(fmt.Sprintf("failed to open channel %v: %v", chID, err)) } diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 9ab57e614..a046cb08e 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -11,8 +11,15 @@ import ( // Common setup for P2P tests. var ( - ctx = context.Background() - chID = p2p.ChannelID(1) + ctx = context.Background() + chID = p2p.ChannelID(1) + chDesc = p2p.ChannelDescriptor{ + ID: byte(chID), + Priority: 5, + SendQueueCapacity: 10, + RecvMessageCapacity: 10, + MaxSendBytes: 1000, + } selfKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd}) selfID = p2p.NodeIDFromPubKey(selfKey.PubKey()) diff --git a/p2p/p2ptest/network.go b/p2p/p2ptest/network.go index a5a858f66..ecaa1e592 100644 --- a/p2p/p2ptest/network.go +++ b/p2p/p2ptest/network.go @@ -135,13 +135,13 @@ func (n *Network) NodeIDs() []p2p.NodeID { // doing error checks and cleanups. func (n *Network) MakeChannels( t *testing.T, - chID p2p.ChannelID, + chDesc p2p.ChannelDescriptor, messageType proto.Message, size int, ) map[p2p.NodeID]*p2p.Channel { channels := map[p2p.NodeID]*p2p.Channel{} for _, node := range n.Nodes { - channels[node.NodeID] = node.MakeChannel(t, chID, messageType, size) + channels[node.NodeID] = node.MakeChannel(t, chDesc, messageType, size) } return channels } @@ -151,13 +151,13 @@ func (n *Network) MakeChannels( // all the channels. func (n *Network) MakeChannelsNoCleanup( t *testing.T, - chID p2p.ChannelID, + chDesc p2p.ChannelDescriptor, messageType proto.Message, size int, ) map[p2p.NodeID]*p2p.Channel { channels := map[p2p.NodeID]*p2p.Channel{} for _, node := range n.Nodes { - channels[node.NodeID] = node.MakeChannelNoCleanup(t, chID, messageType, size) + channels[node.NodeID] = node.MakeChannelNoCleanup(t, chDesc, messageType, size) } return channels } @@ -279,8 +279,9 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node { // MakeChannel opens a channel, with automatic error handling and cleanup. On // test cleanup, it also checks that the channel is empty, to make sure // all expected messages have been asserted. -func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.Message, size int) *p2p.Channel { - channel, err := n.Router.OpenChannel(chID, messageType, size) +func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor, + messageType proto.Message, size int) *p2p.Channel { + channel, err := n.Router.OpenChannel(chDesc, messageType, size) require.NoError(t, err) t.Cleanup(func() { RequireEmpty(t, channel) @@ -293,12 +294,12 @@ func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.M // caller must ensure proper cleanup of the channel. func (n *Node) MakeChannelNoCleanup( t *testing.T, - chID p2p.ChannelID, + chDesc p2p.ChannelDescriptor, messageType proto.Message, size int, ) *p2p.Channel { - channel, err := n.Router.OpenChannel(chID, messageType, size) + channel, err := n.Router.OpenChannel(chDesc, messageType, size) require.NoError(t, err) return channel } @@ -328,3 +329,13 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates { return sub } + +func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { + return p2p.ChannelDescriptor{ + ID: byte(chID), + Priority: 5, + SendQueueCapacity: 10, + RecvMessageCapacity: 10, + MaxSendBytes: 1000, + } +} diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go index b9281551d..a1229cb16 100644 --- a/p2p/pex/reactor.go +++ b/p2p/pex/reactor.go @@ -45,16 +45,14 @@ const ( // within each reactor (as they are now) or, considering that the reactor doesn't // really need to care about the channel descriptors, if they should be housed // in the node module. -func ChannelDescriptors() []*conn.ChannelDescriptor { - return []*conn.ChannelDescriptor{ - { - ID: PexChannel, - Priority: 1, - SendQueueCapacity: 10, - RecvMessageCapacity: maxMsgSize, +func ChannelDescriptor() conn.ChannelDescriptor { + return conn.ChannelDescriptor{ + ID: PexChannel, + Priority: 1, + SendQueueCapacity: 10, + RecvMessageCapacity: maxMsgSize, - MaxSendBytes: 200, - }, + MaxSendBytes: 200, } } diff --git a/p2p/pex/reactor_test.go b/p2p/pex/reactor_test.go index e716dfac8..8c2473d35 100644 --- a/p2p/pex/reactor_test.go +++ b/p2p/pex/reactor_test.go @@ -350,7 +350,7 @@ func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite { // NOTE: we don't assert that the channels get drained after stopping the // reactor rts.pexChannels = rts.network.MakeChannelsNoCleanup( - t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), chBuf, + t, pex.ChannelDescriptor(), new(proto.PexMessage), chBuf, ) idx := 0 @@ -416,7 +416,7 @@ func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) { r.network.Nodes[node.NodeID] = node nodeID := node.NodeID r.pexChannels[nodeID] = node.MakeChannelNoCleanup( - t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), r.opts.BufferSize, + t, pex.ChannelDescriptor(), new(proto.PexMessage), r.opts.BufferSize, ) r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize) r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) diff --git a/p2p/router.go b/p2p/router.go index afe696426..abf2d6c24 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -344,21 +344,13 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { } } -// AddChannelDescriptors adds a set of ChannelDescriptors to the reactor. Note, -// this should be called before the router is started and any connections are made. -func (r *Router) AddChannelDescriptors(chDescs []*ChannelDescriptor) { - for _, chDesc := range chDescs { - r.chDescs = append(r.chDescs, *chDesc) - } -} - // OpenChannel opens a new channel for the given message type. The caller must // close the channel when done, before stopping the Router. messageType is the // type of message passed through the channel (used for unmarshaling), which can // implement Wrapper to automatically (un)wrap multiple message types in a // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. -func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) (*Channel, error) { +func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) { if size == 0 { size = queueBufferDefault } @@ -366,9 +358,11 @@ func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) r.channelMtx.Lock() defer r.channelMtx.Unlock() + id := ChannelID(chDesc.ID) if _, ok := r.channelQueues[id]; ok { return nil, fmt.Errorf("channel %v already exists", id) } + r.chDescs = append(r.chDescs, chDesc) queue := r.queueFactory(size) outCh := make(chan Envelope, size) diff --git a/p2p/router_test.go b/p2p/router_test.go index 91f2474a8..aec16c5ad 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -49,7 +49,7 @@ func TestRouter_Network(t *testing.T) { local := network.RandomNode() peers := network.Peers(local.NodeID) - channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) channel := channels[local.NodeID] for _, peer := range peers { @@ -116,22 +116,23 @@ func TestRouter_Channel(t *testing.T) { }) // Opening a channel should work. - channel, err := router.OpenChannel(chID, &p2ptest.Message{}, 0) + channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0) require.NoError(t, err) // Opening the same channel again should fail. - _, err = router.OpenChannel(chID, &p2ptest.Message{}, 0) + _, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0) require.Error(t, err) // Opening a different channel should work. - _, err = router.OpenChannel(2, &p2ptest.Message{}, 0) + chDesc2 := p2p.ChannelDescriptor{ID: byte(2)} + _, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0) require.NoError(t, err) // Closing the channel, then opening it again should be fine. channel.Close() time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async... - channel, err = router.OpenChannel(chID, &p2ptest.Message{}, 0) + channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0) require.NoError(t, err) // We should be able to send on the channel, even though there are no peers. @@ -158,9 +159,9 @@ func TestRouter_Channel_SendReceive(t *testing.T) { ids := network.NodeIDs() aID, bID, cID := ids[0], ids[1], ids[2] - channels := network.MakeChannels(t, chID, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) a, b, c := channels[aID], channels[bID], channels[cID] - otherChannels := network.MakeChannels(t, 9, &p2ptest.Message{}, 0) + otherChannels := network.MakeChannels(t, p2ptest.MakeChannelDesc(9), &p2ptest.Message{}, 0) // Sending a message a->b should work, and not send anything // further to a, b, or c. @@ -216,7 +217,7 @@ func TestRouter_Channel_Broadcast(t *testing.T) { ids := network.NodeIDs() aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3] - channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID] // Sending a broadcast from b should work. @@ -243,7 +244,7 @@ func TestRouter_Channel_Wrapper(t *testing.T) { ids := network.NodeIDs() aID, bID := ids[0], ids[1] - channels := network.MakeChannels(t, 1, &wrapperMessage{}, 0) + channels := network.MakeChannels(t, chDesc, &wrapperMessage{}, 0) a, b := channels[aID], channels[bID] // Since wrapperMessage implements p2p.Wrapper and handles Message, it @@ -301,7 +302,7 @@ func TestRouter_Channel_Error(t *testing.T) { ids := network.NodeIDs() aID, bID := ids[0], ids[1] - channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) a := channels[aID] // Erroring b should cause it to be disconnected. It will reconnect shortly after.