From 0900ea8396359cdf72be555cddcf597349e39af1 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 15 Oct 2021 12:31:33 -0400 Subject: [PATCH] p2p: channel shim cleanup (#7129) --- internal/blocksync/reactor.go | 2 +- internal/blocksync/reactor_test.go | 2 +- internal/consensus/reactor.go | 8 +-- internal/consensus/reactor_test.go | 2 +- internal/evidence/reactor.go | 2 +- internal/evidence/reactor_test.go | 2 +- internal/mempool/v0/reactor.go | 2 +- internal/mempool/v0/reactor_test.go | 2 +- internal/mempool/v1/reactor.go | 2 +- internal/mempool/v1/reactor_test.go | 2 +- internal/p2p/conn/connection.go | 14 +++-- internal/p2p/conn/connection_test.go | 22 +++---- internal/p2p/mocks/connection.go | 17 +++--- internal/p2p/mocks/transport.go | 8 +++ internal/p2p/p2p_test.go | 2 +- internal/p2p/p2ptest/network.go | 4 +- internal/p2p/pex/reactor.go | 2 +- internal/p2p/pqueue.go | 2 +- internal/p2p/router.go | 9 +-- internal/p2p/router_test.go | 7 ++- internal/p2p/shim.go | 91 ---------------------------- internal/p2p/transport.go | 4 ++ internal/p2p/transport_mconn.go | 6 +- internal/p2p/transport_mconn_test.go | 8 +-- internal/p2p/transport_memory.go | 2 + internal/p2p/types.go | 1 + internal/statesync/reactor.go | 8 +-- node/node.go | 30 ++------- node/setup.go | 31 ++++------ 29 files changed, 99 insertions(+), 195 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 5f4ce3029..86aaf79d3 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -30,7 +30,7 @@ var ( ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ BlockSyncChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(BlockSyncChannel), + ID: BlockSyncChannel, MessageType: new(bcproto.Message), Priority: 5, SendQueueCapacity: 1000, diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 3824f1460..59889eec4 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -65,7 +65,7 @@ func setup( blockSync: true, } - chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)} + chDesc := p2p.ChannelDescriptor{ID: BlockSyncChannel} rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf)) i := 0 diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 27ecb4602..dde36306c 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -32,7 +32,7 @@ var ( ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ StateChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(StateChannel), + ID: StateChannel, MessageType: new(tmcons.Message), Priority: 8, SendQueueCapacity: 64, @@ -45,7 +45,7 @@ var ( // TODO: Consider a split between gossiping current block and catchup // stuff. Once we gossip the whole block there is nothing left to send // until next height or round. - ID: byte(DataChannel), + ID: DataChannel, MessageType: new(tmcons.Message), Priority: 12, SendQueueCapacity: 64, @@ -55,7 +55,7 @@ var ( }, VoteChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(VoteChannel), + ID: VoteChannel, MessageType: new(tmcons.Message), Priority: 10, SendQueueCapacity: 64, @@ -65,7 +65,7 @@ var ( }, VoteSetBitsChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(VoteSetBitsChannel), + ID: VoteSetBitsChannel, MessageType: new(tmcons.Message), Priority: 5, SendQueueCapacity: 8, diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index e536906a9..04bc3708e 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -52,7 +52,7 @@ type reactorTestSuite struct { func chDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { return p2p.ChannelDescriptor{ - ID: byte(chID), + ID: chID, } } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 0090bd32f..0cf261441 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -28,7 +28,7 @@ var ( ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ EvidenceChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(EvidenceChannel), + ID: EvidenceChannel, MessageType: new(tmproto.EvidenceList), Priority: 6, RecvMessageCapacity: maxMsgSize, diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index b8055263d..7963ba959 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -62,7 +62,7 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite { peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores), } - chDesc := p2p.ChannelDescriptor{ID: byte(evidence.EvidenceChannel)} + chDesc := p2p.ChannelDescriptor{ID: evidence.EvidenceChannel} rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(tmproto.EvidenceList), diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index 1587c0778..010f98f5d 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -101,7 +101,7 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ mempool.MempoolChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(mempool.MempoolChannel), + ID: mempool.MempoolChannel, MessageType: new(protomem.Message), Priority: 5, RecvMessageCapacity: batchMsg.Size(), diff --git a/internal/mempool/v0/reactor_test.go b/internal/mempool/v0/reactor_test.go index b0462a249..4ae2523a1 100644 --- a/internal/mempool/v0/reactor_test.go +++ b/internal/mempool/v0/reactor_test.go @@ -50,7 +50,7 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint) peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)} + chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel} rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) for nodeID := range rts.network.Nodes { diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index e31977cc9..72154b4a8 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -108,7 +108,7 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ mempool.MempoolChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(mempool.MempoolChannel), + ID: mempool.MempoolChannel, MessageType: new(protomem.Message), Priority: 5, RecvMessageCapacity: batchMsg.Size(), diff --git a/internal/mempool/v1/reactor_test.go b/internal/mempool/v1/reactor_test.go index 5934d534c..1449b20b1 100644 --- a/internal/mempool/v1/reactor_test.go +++ b/internal/mempool/v1/reactor_test.go @@ -52,7 +52,7 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)} + chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel} rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) for nodeID := range rts.network.Nodes { diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index c6a9b206c..1e149a2e5 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -48,7 +48,7 @@ const ( defaultPongTimeout = 45 * time.Second ) -type receiveCbFunc func(chID byte, msgBytes []byte) +type receiveCbFunc func(chID ChannelID, msgBytes []byte) type errorCbFunc func(interface{}) /* @@ -82,7 +82,7 @@ type MConnection struct { send chan struct{} pong chan struct{} channels []*Channel - channelsIdx map[byte]*Channel + channelsIdx map[ChannelID]*Channel onReceive receiveCbFunc onError errorCbFunc errored uint32 @@ -186,7 +186,7 @@ func NewMConnectionWithConfig( } // Create channels - var channelsIdx = map[byte]*Channel{} + var channelsIdx = map[ChannelID]*Channel{} var channels = []*Channel{} for _, desc := range chDescs { @@ -307,7 +307,7 @@ func (c *MConnection) stopForError(r interface{}) { } // Queues a message to be sent to channel. -func (c *MConnection) Send(chID byte, msgBytes []byte) bool { +func (c *MConnection) Send(chID ChannelID, msgBytes []byte) bool { if !c.IsRunning() { return false } @@ -540,7 +540,7 @@ FOR_LOOP: // never block } case *tmp2p.Packet_PacketMsg: - channelID := byte(pkt.PacketMsg.ChannelID) + channelID := ChannelID(pkt.PacketMsg.ChannelID) channel, ok := c.channelsIdx[channelID] if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil { err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) @@ -607,9 +607,11 @@ type ChannelStatus struct { } //----------------------------------------------------------------------------- +// ChannelID is an arbitrary channel ID. +type ChannelID uint16 type ChannelDescriptor struct { - ID byte + ID ChannelID Priority int MessageType proto.Message diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index 6a9c0988f..1ed179ad8 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -20,7 +20,7 @@ import ( const maxPingPongPacketSize = 1024 // bytes func createTestMConnection(conn net.Conn) *MConnection { - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { } onError := func(r interface{}) { } @@ -31,7 +31,7 @@ func createTestMConnection(conn net.Conn) *MConnection { func createMConnectionWithCallbacks( conn net.Conn, - onReceive func(chID byte, msgBytes []byte), + onReceive func(chID ChannelID, msgBytes []byte), onError func(r interface{}), ) *MConnection { cfg := DefaultMConnConfig() @@ -111,7 +111,7 @@ func TestMConnectionReceive(t *testing.T) { receivedCh := make(chan []byte) errorsCh := make(chan interface{}) - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { receivedCh <- msgBytes } onError := func(r interface{}) { @@ -146,7 +146,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { receivedCh := make(chan []byte) errorsCh := make(chan interface{}) - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { receivedCh <- msgBytes } onError := func(r interface{}) { @@ -184,7 +184,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { receivedCh := make(chan []byte) errorsCh := make(chan interface{}) - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { receivedCh <- msgBytes } onError := func(r interface{}) { @@ -238,7 +238,7 @@ func TestMConnectionMultiplePings(t *testing.T) { receivedCh := make(chan []byte) errorsCh := make(chan interface{}) - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { receivedCh <- msgBytes } onError := func(r interface{}) { @@ -285,7 +285,7 @@ func TestMConnectionPingPongs(t *testing.T) { receivedCh := make(chan []byte) errorsCh := make(chan interface{}) - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { receivedCh <- msgBytes } onError := func(r interface{}) { @@ -342,7 +342,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { receivedCh := make(chan []byte) errorsCh := make(chan interface{}) - onReceive := func(chID byte, msgBytes []byte) { + onReceive := func(chID ChannelID, msgBytes []byte) { receivedCh <- msgBytes } onError := func(r interface{}) { @@ -371,7 +371,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) { server, client := NetPipe() - onReceive := func(chID byte, msgBytes []byte) {} + onReceive := func(chID ChannelID, msgBytes []byte) {} onError := func(r interface{}) {} // create client conn with two channels @@ -443,7 +443,7 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) { mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr) t.Cleanup(stopAll(t, mconnClient, mconnServer)) - mconnServer.onReceive = func(chID byte, msgBytes []byte) { + mconnServer.onReceive = func(chID ChannelID, msgBytes []byte) { chOnRcv <- struct{}{} } @@ -538,7 +538,7 @@ func TestMConnectionChannelOverflow(t *testing.T) { mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr) t.Cleanup(stopAll(t, mconnClient, mconnServer)) - mconnServer.onReceive = func(chID byte, msgBytes []byte) { + mconnServer.onReceive = func(chID ChannelID, msgBytes []byte) { chOnRcv <- struct{}{} } diff --git a/internal/p2p/mocks/connection.go b/internal/p2p/mocks/connection.go index 03f7b5b33..65b9afafb 100644 --- a/internal/p2p/mocks/connection.go +++ b/internal/p2p/mocks/connection.go @@ -5,9 +5,12 @@ package mocks import ( context "context" - mock "github.com/stretchr/testify/mock" + conn "github.com/tendermint/tendermint/internal/p2p/conn" + crypto "github.com/tendermint/tendermint/crypto" + mock "github.com/stretchr/testify/mock" + p2p "github.com/tendermint/tendermint/internal/p2p" types "github.com/tendermint/tendermint/types" @@ -77,14 +80,14 @@ func (_m *Connection) LocalEndpoint() p2p.Endpoint { } // ReceiveMessage provides a mock function with given fields: -func (_m *Connection) ReceiveMessage() (p2p.ChannelID, []byte, error) { +func (_m *Connection) ReceiveMessage() (conn.ChannelID, []byte, error) { ret := _m.Called() - var r0 p2p.ChannelID - if rf, ok := ret.Get(0).(func() p2p.ChannelID); ok { + var r0 conn.ChannelID + if rf, ok := ret.Get(0).(func() conn.ChannelID); ok { r0 = rf() } else { - r0 = ret.Get(0).(p2p.ChannelID) + r0 = ret.Get(0).(conn.ChannelID) } var r1 []byte @@ -121,11 +124,11 @@ func (_m *Connection) RemoteEndpoint() p2p.Endpoint { } // SendMessage provides a mock function with given fields: _a0, _a1 -func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) error { +func (_m *Connection) SendMessage(_a0 conn.ChannelID, _a1 []byte) error { ret := _m.Called(_a0, _a1) var r0 error - if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) error); ok { + if rf, ok := ret.Get(0).(func(conn.ChannelID, []byte) error); ok { r0 = rf(_a0, _a1) } else { r0 = ret.Error(0) diff --git a/internal/p2p/mocks/transport.go b/internal/p2p/mocks/transport.go index 82bd670cb..2fc7baa29 100644 --- a/internal/p2p/mocks/transport.go +++ b/internal/p2p/mocks/transport.go @@ -5,7 +5,10 @@ package mocks import ( context "context" + conn "github.com/tendermint/tendermint/internal/p2p/conn" + mock "github.com/stretchr/testify/mock" + p2p "github.com/tendermint/tendermint/internal/p2p" ) @@ -37,6 +40,11 @@ func (_m *Transport) Accept() (p2p.Connection, error) { return r0, r1 } +// AddChannelDescriptors provides a mock function with given fields: _a0 +func (_m *Transport) AddChannelDescriptors(_a0 []*conn.ChannelDescriptor) { + _m.Called(_a0) +} + // Close provides a mock function with given fields: func (_m *Transport) Close() error { ret := _m.Called() diff --git a/internal/p2p/p2p_test.go b/internal/p2p/p2p_test.go index 91e1c0824..f5ed5706c 100644 --- a/internal/p2p/p2p_test.go +++ b/internal/p2p/p2p_test.go @@ -15,7 +15,7 @@ var ( ctx = context.Background() chID = p2p.ChannelID(1) chDesc = p2p.ChannelDescriptor{ - ID: byte(chID), + ID: chID, Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: 10, diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 21aa1a0ea..2ed888764 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -285,7 +285,7 @@ func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor, messageType proto.Message, size int) *p2p.Channel { channel, err := n.Router.OpenChannel(chDesc, messageType, size) require.NoError(t, err) - require.Contains(t, n.Router.NodeInfo().Channels, chDesc.ID) + require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID)) t.Cleanup(func() { RequireEmpty(t, channel) channel.Close() @@ -335,7 +335,7 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates { func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { return p2p.ChannelDescriptor{ - ID: byte(chID), + ID: chID, Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: 10, diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index f313279c3..d43d836ce 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -25,7 +25,7 @@ var ( // See https://github.com/tendermint/tendermint/issues/6371 const ( // PexChannel is a channel for PEX messages - PexChannel = byte(0x00) + PexChannel = 0x00 // over-estimate of max NetAddress size // hexID (40) + IP (16) + Port (2) + Name (100) ... diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index b19436e15..fd0a43db6 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -99,7 +99,7 @@ func newPQScheduler( ) for _, chDesc := range chDescsCopy { - chID := ChannelID(chDesc.ID) + chID := chDesc.ID chPriorities[chID] = uint(chDesc.Priority) sizes[uint(chDesc.Priority)] = 0 } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index f7d012b49..11d0514bb 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -23,9 +23,6 @@ const queueBufferDefault = 32 const dialRandomizerIntervalMillisecond = 3000 -// ChannelID is an arbitrary channel ID. -type ChannelID uint16 - // Envelope contains a message with sender/receiver routing info. type Envelope struct { From types.NodeID // sender (empty if outbound) @@ -361,7 +358,7 @@ func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message r.channelMtx.Lock() defer r.channelMtx.Unlock() - id := ChannelID(chDesc.ID) + id := chDesc.ID if _, ok := r.channelQueues[id]; ok { return nil, fmt.Errorf("channel %v already exists", id) } @@ -383,6 +380,10 @@ func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message // add the channel to the nodeInfo if it's not already there. r.nodeInfo.AddChannel(uint16(chDesc.ID)) + for _, t := range r.transports { + t.AddChannelDescriptors([]*ChannelDescriptor{&chDesc}) + } + go func() { defer func() { r.channelMtx.Lock() diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index c12f1a2f0..7f922c29d 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -121,17 +121,17 @@ func TestRouter_Channel_Basic(t *testing.T) { // Opening a channel should work. channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0) require.NoError(t, err) - require.Contains(t, router.NodeInfo().Channels, chDesc.ID) + require.Contains(t, router.NodeInfo().Channels, byte(chDesc.ID)) // Opening the same channel again should fail. _, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0) require.Error(t, err) // Opening a different channel should work. - chDesc2 := p2p.ChannelDescriptor{ID: byte(2)} + chDesc2 := p2p.ChannelDescriptor{ID: 2} _, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0) require.NoError(t, err) - require.Contains(t, router.NodeInfo().Channels, chDesc2.ID) + require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID)) // Closing the channel, then opening it again should be fine. channel.Close() @@ -865,6 +865,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { mockConnection.On("ReceiveMessage").Return(chID, nil, io.EOF) mockTransport := &mocks.Transport{} + mockTransport.On("AddChannelDescriptors", mock.Anything).Return() mockTransport.On("String").Maybe().Return("mock") mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) mockTransport.On("Close").Return(nil) diff --git a/internal/p2p/shim.go b/internal/p2p/shim.go index f57ce18b9..8de0b71c7 100644 --- a/internal/p2p/shim.go +++ b/internal/p2p/shim.go @@ -1,11 +1,5 @@ package p2p -import ( - "sort" - - "github.com/tendermint/tendermint/libs/log" -) - // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel // and the proto.Message the new p2p Channel is responsible for handling. // A ChannelDescriptorShim is not contained in ReactorShim, but is rather @@ -13,88 +7,3 @@ import ( type ChannelDescriptorShim struct { Descriptor *ChannelDescriptor } - -// ChannelShim defines a generic shim wrapper around a legacy p2p channel -// and the new p2p Channel. It also includes the raw bi-directional Go channels -// so we can proxy message delivery. -type ChannelShim struct { - Descriptor *ChannelDescriptor - Channel *Channel - inCh chan<- Envelope - outCh <-chan Envelope - errCh <-chan PeerError -} - -// ReactorShim defines a generic shim wrapper around a BaseReactor. It is -// responsible for wiring up legacy p2p behavior to the new p2p semantics -// (e.g. proxying Envelope messages to legacy peers). -type ReactorShim struct { - Name string - PeerUpdates *PeerUpdates - Channels map[ChannelID]*ChannelShim -} - -func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim { - channels := make(map[ChannelID]*ChannelShim) - - for _, cds := range descriptors { - chShim := NewChannelShim(cds, 0) - channels[chShim.Channel.ID] = chShim - } - - rs := &ReactorShim{ - Name: name, - PeerUpdates: NewPeerUpdates(make(chan PeerUpdate), 0), - Channels: channels, - } - - return rs -} - -func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim { - inCh := make(chan Envelope, buf) - outCh := make(chan Envelope, buf) - errCh := make(chan PeerError, buf) - return &ChannelShim{ - Descriptor: cds.Descriptor, - Channel: NewChannel( - ChannelID(cds.Descriptor.ID), - cds.Descriptor.MessageType, - inCh, - outCh, - errCh, - ), - inCh: inCh, - outCh: outCh, - errCh: errCh, - } -} - -// GetChannels implements the legacy Reactor interface for getting a slice of all -// the supported ChannelDescriptors. -func (rs *ReactorShim) GetChannels() []*ChannelDescriptor { - sortedChIDs := make([]ChannelID, 0, len(rs.Channels)) - for cID := range rs.Channels { - sortedChIDs = append(sortedChIDs, cID) - } - - sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] }) - - descriptors := make([]*ChannelDescriptor, len(rs.Channels)) - for i, cID := range sortedChIDs { - descriptors[i] = rs.Channels[cID].Descriptor - } - - return descriptors -} - -// GetChannel returns a p2p Channel reference for a given ChannelID. If no -// Channel exists, nil is returned. -func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel { - channelShim, ok := rs.Channels[cID] - if ok { - return channelShim.Channel - } - - return nil -} diff --git a/internal/p2p/transport.go b/internal/p2p/transport.go index b49b096bb..e78906362 100644 --- a/internal/p2p/transport.go +++ b/internal/p2p/transport.go @@ -44,6 +44,10 @@ type Transport interface { // Close stops accepting new connections, but does not close active connections. Close() error + // AddChannelDescriptors is only part of this interface + // temporarily + AddChannelDescriptors([]*ChannelDescriptor) + // Stringer is used to display the transport, e.g. in logs. // // Without this, the logger may use reflection to access and display diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index de643bdb9..3e0281c39 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -348,9 +348,9 @@ func (c *mConnConnection) handshake( } // onReceive is a callback for MConnection received messages. -func (c *mConnConnection) onReceive(chID byte, payload []byte) { +func (c *mConnConnection) onReceive(chID ChannelID, payload []byte) { select { - case c.receiveCh <- mConnMessage{channelID: ChannelID(chID), payload: payload}: + case c.receiveCh <- mConnMessage{channelID: chID, payload: payload}: case <-c.closeCh: } } @@ -387,7 +387,7 @@ func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) error { case <-c.closeCh: return io.EOF default: - if ok := c.mconn.Send(byte(chID), msg); !ok { + if ok := c.mconn.Send(chID, msg); !ok { return errors.New("sending message timed out") } diff --git a/internal/p2p/transport_mconn_test.go b/internal/p2p/transport_mconn_test.go index f4d7198ed..d33438109 100644 --- a/internal/p2p/transport_mconn_test.go +++ b/internal/p2p/transport_mconn_test.go @@ -21,7 +21,7 @@ func init() { transport := p2p.NewMConnTransport( log.TestingLogger(), conn.DefaultMConnConfig(), - []*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}}, + []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, ) err := transport.Listen(p2p.Endpoint{ @@ -43,7 +43,7 @@ func TestMConnTransport_AcceptBeforeListen(t *testing.T) { transport := p2p.NewMConnTransport( log.TestingLogger(), conn.DefaultMConnConfig(), - []*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}}, + []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{ MaxAcceptedConnections: 2, }, @@ -61,7 +61,7 @@ func TestMConnTransport_AcceptMaxAcceptedConnections(t *testing.T) { transport := p2p.NewMConnTransport( log.TestingLogger(), conn.DefaultMConnConfig(), - []*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}}, + []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{ MaxAcceptedConnections: 2, }, @@ -148,7 +148,7 @@ func TestMConnTransport_Listen(t *testing.T) { transport := p2p.NewMConnTransport( log.TestingLogger(), conn.DefaultMConnConfig(), - []*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}}, + []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, ) diff --git a/internal/p2p/transport_memory.go b/internal/p2p/transport_memory.go index 853288c59..b4161ecd6 100644 --- a/internal/p2p/transport_memory.go +++ b/internal/p2p/transport_memory.go @@ -117,6 +117,8 @@ func (t *MemoryTransport) String() string { return string(MemoryProtocol) } +func (t *MemoryTransport) AddChannelDescriptors([]*ChannelDescriptor) {} + // Protocols implements Transport. func (t *MemoryTransport) Protocols() []Protocol { return []Protocol{MemoryProtocol} diff --git a/internal/p2p/types.go b/internal/p2p/types.go index 388ff2253..bee99a4fe 100644 --- a/internal/p2p/types.go +++ b/internal/p2p/types.go @@ -5,3 +5,4 @@ import ( ) type ChannelDescriptor = conn.ChannelDescriptor +type ChannelID = conn.ChannelID diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 3757d028d..86a46ea63 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -39,7 +39,7 @@ var ( ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ SnapshotChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(SnapshotChannel), + ID: SnapshotChannel, MessageType: new(ssproto.Message), Priority: 6, SendQueueCapacity: 10, @@ -49,7 +49,7 @@ var ( }, ChunkChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(ChunkChannel), + ID: ChunkChannel, Priority: 3, MessageType: new(ssproto.Message), SendQueueCapacity: 4, @@ -59,7 +59,7 @@ var ( }, LightBlockChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(LightBlockChannel), + ID: LightBlockChannel, MessageType: new(ssproto.Message), Priority: 5, SendQueueCapacity: 10, @@ -69,7 +69,7 @@ var ( }, ParamsChannel: { Descriptor: &p2p.ChannelDescriptor{ - ID: byte(ParamsChannel), + ID: ParamsChannel, MessageType: new(ssproto.Message), Priority: 2, SendQueueCapacity: 10, diff --git a/node/node.go b/node/node.go index 6a66f4224..61ba25749 100644 --- a/node/node.go +++ b/node/node.go @@ -19,7 +19,6 @@ import ( "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/p2p/pex" "github.com/tendermint/tendermint/internal/proxy" rpccore "github.com/tendermint/tendermint/internal/rpc/core" sm "github.com/tendermint/tendermint/internal/state" @@ -277,7 +276,7 @@ func makeNode(cfg *config.Config, makeCloser(closers)) } - mpReactorShim, mpReactor, mp, err := createMempoolReactor( + mpReactor, mp, err := createMempoolReactor( cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { @@ -285,7 +284,7 @@ func makeNode(cfg *config.Config, } - evReactorShim, evReactor, evPool, err := createEvidenceReactor( + evReactor, evPool, err := createEvidenceReactor( cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, ) if err != nil { @@ -304,7 +303,7 @@ func makeNode(cfg *config.Config, sm.BlockExecutorWithMetrics(nodeMetrics.state), ) - csReactorShim, csReactor, csState := createConsensusReactor( + csReactor, csState := createConsensusReactor( cfg, state, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, peerManager, router, consensusLogger, @@ -312,7 +311,7 @@ func makeNode(cfg *config.Config, // Create the blockchain reactor. Note, we do not start block sync if we're // doing a state sync first. - bcReactorShim, bcReactor, err := createBlockchainReactor( + bcReactor, err := createBlockchainReactor( logger, state, blockExec, blockStore, csReactor, peerManager, router, blockSync && !stateSync, nodeMetrics.consensus, ) @@ -335,7 +334,6 @@ func makeNode(cfg *config.Config, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 ssLogger := logger.With("module", "statesync") - ssReactorShim := p2p.NewReactorShim(ssLogger, "StateSyncShim", statesync.ChannelShims) channels := makeChannelsFromShims(router, statesync.ChannelShims) peerUpdates := peerManager.Subscribe() @@ -357,16 +355,6 @@ func makeNode(cfg *config.Config, nodeMetrics.statesync, ) - // add the channel descriptors to both the transports - // FIXME: This should be removed when the legacy p2p stack is removed and - // transports can either be agnostic to channel descriptors or can be - // declared in the constructor. - transport.AddChannelDescriptors(mpReactorShim.GetChannels()) - transport.AddChannelDescriptors(bcReactorShim.GetChannels()) - transport.AddChannelDescriptors(csReactorShim.GetChannels()) - transport.AddChannelDescriptors(evReactorShim.GetChannels()) - transport.AddChannelDescriptors(ssReactorShim.GetChannels()) - // Optionally, start the pex reactor // // TODO: @@ -382,9 +370,6 @@ func makeNode(cfg *config.Config, var pexReactor service.Service - pexCh := pex.ChannelDescriptor() - transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - pexReactor, err = createPEXReactor(logger, peerManager, router) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -501,13 +486,6 @@ func makeSeedNode(cfg *config.Config, var pexReactor service.Service - // add the pex reactor - // FIXME: we add channel descriptors to both the router and the transport but only the router - // should be aware of channel info. We should remove this from transport once the legacy - // p2p stack is removed. - pexCh := pex.ChannelDescriptor() - transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - pexReactor, err = createPEXReactor(logger, peerManager, router) if err != nil { return nil, combineCloseError(err, closer) diff --git a/node/setup.go b/node/setup.go index a3491c995..fb5599dfc 100644 --- a/node/setup.go +++ b/node/setup.go @@ -196,11 +196,10 @@ func createMempoolReactor( peerManager *p2p.PeerManager, router *p2p.Router, logger log.Logger, -) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) { +) (service.Service, mempool.Mempool, error) { logger = logger.With("module", "mempool", "version", cfg.Mempool.Version) channelShims := mempoolv0.GetChannelShims(cfg.Mempool) - reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims) channels := makeChannelsFromShims(router, channelShims) peerUpdates := peerManager.Subscribe() @@ -231,7 +230,7 @@ func createMempoolReactor( mp.EnableTxsAvailable() } - return reactorShim, reactor, mp, nil + return reactor, mp, nil case config.MempoolV1: mp := mempoolv1.NewTxMempool( @@ -257,10 +256,10 @@ func createMempoolReactor( mp.EnableTxsAvailable() } - return reactorShim, reactor, mp, nil + return reactor, mp, nil default: - return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", cfg.Mempool.Version) + return nil, nil, fmt.Errorf("unknown mempool version: %s", cfg.Mempool.Version) } } @@ -272,18 +271,17 @@ func createEvidenceReactor( peerManager *p2p.PeerManager, router *p2p.Router, logger log.Logger, -) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { +) (*evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg}) if err != nil { - return nil, nil, nil, err + return nil, nil, err } logger = logger.With("module", "evidence") - reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) if err != nil { - return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err) + return nil, nil, fmt.Errorf("creating evidence pool: %w", err) } evidenceReactor := evidence.NewReactor( @@ -293,7 +291,7 @@ func createEvidenceReactor( evidencePool, ) - return reactorShim, evidenceReactor, evidencePool, nil + return evidenceReactor, evidencePool, nil } func createBlockchainReactor( @@ -306,11 +304,10 @@ func createBlockchainReactor( router *p2p.Router, blockSync bool, metrics *consensus.Metrics, -) (*p2p.ReactorShim, service.Service, error) { +) (service.Service, error) { logger = logger.With("module", "blockchain") - reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", blocksync.ChannelShims) channels := makeChannelsFromShims(router, blocksync.ChannelShims) peerUpdates := peerManager.Subscribe() @@ -320,10 +317,10 @@ func createBlockchainReactor( metrics, ) if err != nil { - return nil, nil, err + return nil, err } - return reactorShim, reactor, nil + return reactor, nil } func createConsensusReactor( @@ -340,7 +337,7 @@ func createConsensusReactor( peerManager *p2p.PeerManager, router *p2p.Router, logger log.Logger, -) (*p2p.ReactorShim, *consensus.Reactor, *consensus.State) { +) (*consensus.Reactor, *consensus.State) { consensusState := consensus.NewState( cfg.Consensus, @@ -356,8 +353,6 @@ func createConsensusReactor( consensusState.SetPrivValidator(privValidator) } - reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", consensus.ChannelShims) - var ( channels map[p2p.ChannelID]*p2p.Channel peerUpdates *p2p.PeerUpdates @@ -382,7 +377,7 @@ func createConsensusReactor( // consensusReactor will set it on consensusState and blockExecutor. reactor.SetEventBus(eventBus) - return reactorShim, reactor, consensusState + return reactor, consensusState } func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {