mirror of
https://github.com/tendermint/tendermint.git
synced 2026-03-27 12:05:02 +00:00
p2p: channel shim cleanup (#7129)
This commit is contained in:
@@ -30,7 +30,7 @@ var (
|
||||
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
BlockSyncChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(BlockSyncChannel),
|
||||
ID: BlockSyncChannel,
|
||||
MessageType: new(bcproto.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 1000,
|
||||
|
||||
@@ -65,7 +65,7 @@ func setup(
|
||||
blockSync: true,
|
||||
}
|
||||
|
||||
chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)}
|
||||
chDesc := p2p.ChannelDescriptor{ID: BlockSyncChannel}
|
||||
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
|
||||
|
||||
i := 0
|
||||
|
||||
@@ -32,7 +32,7 @@ var (
|
||||
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
StateChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(StateChannel),
|
||||
ID: StateChannel,
|
||||
MessageType: new(tmcons.Message),
|
||||
Priority: 8,
|
||||
SendQueueCapacity: 64,
|
||||
@@ -45,7 +45,7 @@ var (
|
||||
// TODO: Consider a split between gossiping current block and catchup
|
||||
// stuff. Once we gossip the whole block there is nothing left to send
|
||||
// until next height or round.
|
||||
ID: byte(DataChannel),
|
||||
ID: DataChannel,
|
||||
MessageType: new(tmcons.Message),
|
||||
Priority: 12,
|
||||
SendQueueCapacity: 64,
|
||||
@@ -55,7 +55,7 @@ var (
|
||||
},
|
||||
VoteChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(VoteChannel),
|
||||
ID: VoteChannel,
|
||||
MessageType: new(tmcons.Message),
|
||||
Priority: 10,
|
||||
SendQueueCapacity: 64,
|
||||
@@ -65,7 +65,7 @@ var (
|
||||
},
|
||||
VoteSetBitsChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(VoteSetBitsChannel),
|
||||
ID: VoteSetBitsChannel,
|
||||
MessageType: new(tmcons.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 8,
|
||||
|
||||
@@ -52,7 +52,7 @@ type reactorTestSuite struct {
|
||||
|
||||
func chDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
|
||||
return p2p.ChannelDescriptor{
|
||||
ID: byte(chID),
|
||||
ID: chID,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ var (
|
||||
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
EvidenceChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(EvidenceChannel),
|
||||
ID: EvidenceChannel,
|
||||
MessageType: new(tmproto.EvidenceList),
|
||||
Priority: 6,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
|
||||
@@ -62,7 +62,7 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores),
|
||||
}
|
||||
|
||||
chDesc := p2p.ChannelDescriptor{ID: byte(evidence.EvidenceChannel)}
|
||||
chDesc := p2p.ChannelDescriptor{ID: evidence.EvidenceChannel}
|
||||
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t,
|
||||
chDesc,
|
||||
new(tmproto.EvidenceList),
|
||||
|
||||
@@ -101,7 +101,7 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
|
||||
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
mempool.MempoolChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(mempool.MempoolChannel),
|
||||
ID: mempool.MempoolChannel,
|
||||
MessageType: new(protomem.Message),
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
|
||||
@@ -50,7 +50,7 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint)
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
}
|
||||
|
||||
chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)}
|
||||
chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
|
||||
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
|
||||
|
||||
for nodeID := range rts.network.Nodes {
|
||||
|
||||
@@ -108,7 +108,7 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
|
||||
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
mempool.MempoolChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(mempool.MempoolChannel),
|
||||
ID: mempool.MempoolChannel,
|
||||
MessageType: new(protomem.Message),
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
|
||||
@@ -52,7 +52,7 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
}
|
||||
|
||||
chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)}
|
||||
chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
|
||||
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
|
||||
|
||||
for nodeID := range rts.network.Nodes {
|
||||
|
||||
@@ -48,7 +48,7 @@ const (
|
||||
defaultPongTimeout = 45 * time.Second
|
||||
)
|
||||
|
||||
type receiveCbFunc func(chID byte, msgBytes []byte)
|
||||
type receiveCbFunc func(chID ChannelID, msgBytes []byte)
|
||||
type errorCbFunc func(interface{})
|
||||
|
||||
/*
|
||||
@@ -82,7 +82,7 @@ type MConnection struct {
|
||||
send chan struct{}
|
||||
pong chan struct{}
|
||||
channels []*Channel
|
||||
channelsIdx map[byte]*Channel
|
||||
channelsIdx map[ChannelID]*Channel
|
||||
onReceive receiveCbFunc
|
||||
onError errorCbFunc
|
||||
errored uint32
|
||||
@@ -186,7 +186,7 @@ func NewMConnectionWithConfig(
|
||||
}
|
||||
|
||||
// Create channels
|
||||
var channelsIdx = map[byte]*Channel{}
|
||||
var channelsIdx = map[ChannelID]*Channel{}
|
||||
var channels = []*Channel{}
|
||||
|
||||
for _, desc := range chDescs {
|
||||
@@ -307,7 +307,7 @@ func (c *MConnection) stopForError(r interface{}) {
|
||||
}
|
||||
|
||||
// Queues a message to be sent to channel.
|
||||
func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
|
||||
func (c *MConnection) Send(chID ChannelID, msgBytes []byte) bool {
|
||||
if !c.IsRunning() {
|
||||
return false
|
||||
}
|
||||
@@ -540,7 +540,7 @@ FOR_LOOP:
|
||||
// never block
|
||||
}
|
||||
case *tmp2p.Packet_PacketMsg:
|
||||
channelID := byte(pkt.PacketMsg.ChannelID)
|
||||
channelID := ChannelID(pkt.PacketMsg.ChannelID)
|
||||
channel, ok := c.channelsIdx[channelID]
|
||||
if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
|
||||
err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
|
||||
@@ -607,9 +607,11 @@ type ChannelStatus struct {
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// ChannelID is an arbitrary channel ID.
|
||||
type ChannelID uint16
|
||||
|
||||
type ChannelDescriptor struct {
|
||||
ID byte
|
||||
ID ChannelID
|
||||
Priority int
|
||||
|
||||
MessageType proto.Message
|
||||
|
||||
@@ -20,7 +20,7 @@ import (
|
||||
const maxPingPongPacketSize = 1024 // bytes
|
||||
|
||||
func createTestMConnection(conn net.Conn) *MConnection {
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
}
|
||||
@@ -31,7 +31,7 @@ func createTestMConnection(conn net.Conn) *MConnection {
|
||||
|
||||
func createMConnectionWithCallbacks(
|
||||
conn net.Conn,
|
||||
onReceive func(chID byte, msgBytes []byte),
|
||||
onReceive func(chID ChannelID, msgBytes []byte),
|
||||
onError func(r interface{}),
|
||||
) *MConnection {
|
||||
cfg := DefaultMConnConfig()
|
||||
@@ -111,7 +111,7 @@ func TestMConnectionReceive(t *testing.T) {
|
||||
|
||||
receivedCh := make(chan []byte)
|
||||
errorsCh := make(chan interface{})
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
receivedCh <- msgBytes
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
@@ -146,7 +146,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
|
||||
|
||||
receivedCh := make(chan []byte)
|
||||
errorsCh := make(chan interface{})
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
receivedCh <- msgBytes
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
@@ -184,7 +184,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
|
||||
|
||||
receivedCh := make(chan []byte)
|
||||
errorsCh := make(chan interface{})
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
receivedCh <- msgBytes
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
@@ -238,7 +238,7 @@ func TestMConnectionMultiplePings(t *testing.T) {
|
||||
|
||||
receivedCh := make(chan []byte)
|
||||
errorsCh := make(chan interface{})
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
receivedCh <- msgBytes
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
@@ -285,7 +285,7 @@ func TestMConnectionPingPongs(t *testing.T) {
|
||||
|
||||
receivedCh := make(chan []byte)
|
||||
errorsCh := make(chan interface{})
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
receivedCh <- msgBytes
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
@@ -342,7 +342,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
|
||||
|
||||
receivedCh := make(chan []byte)
|
||||
errorsCh := make(chan interface{})
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {
|
||||
receivedCh <- msgBytes
|
||||
}
|
||||
onError := func(r interface{}) {
|
||||
@@ -371,7 +371,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
|
||||
func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
|
||||
server, client := NetPipe()
|
||||
|
||||
onReceive := func(chID byte, msgBytes []byte) {}
|
||||
onReceive := func(chID ChannelID, msgBytes []byte) {}
|
||||
onError := func(r interface{}) {}
|
||||
|
||||
// create client conn with two channels
|
||||
@@ -443,7 +443,7 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) {
|
||||
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
|
||||
t.Cleanup(stopAll(t, mconnClient, mconnServer))
|
||||
|
||||
mconnServer.onReceive = func(chID byte, msgBytes []byte) {
|
||||
mconnServer.onReceive = func(chID ChannelID, msgBytes []byte) {
|
||||
chOnRcv <- struct{}{}
|
||||
}
|
||||
|
||||
@@ -538,7 +538,7 @@ func TestMConnectionChannelOverflow(t *testing.T) {
|
||||
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
|
||||
t.Cleanup(stopAll(t, mconnClient, mconnServer))
|
||||
|
||||
mconnServer.onReceive = func(chID byte, msgBytes []byte) {
|
||||
mconnServer.onReceive = func(chID ChannelID, msgBytes []byte) {
|
||||
chOnRcv <- struct{}{}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,9 +5,12 @@ package mocks
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
conn "github.com/tendermint/tendermint/internal/p2p/conn"
|
||||
|
||||
crypto "github.com/tendermint/tendermint/crypto"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
p2p "github.com/tendermint/tendermint/internal/p2p"
|
||||
|
||||
types "github.com/tendermint/tendermint/types"
|
||||
@@ -77,14 +80,14 @@ func (_m *Connection) LocalEndpoint() p2p.Endpoint {
|
||||
}
|
||||
|
||||
// ReceiveMessage provides a mock function with given fields:
|
||||
func (_m *Connection) ReceiveMessage() (p2p.ChannelID, []byte, error) {
|
||||
func (_m *Connection) ReceiveMessage() (conn.ChannelID, []byte, error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 p2p.ChannelID
|
||||
if rf, ok := ret.Get(0).(func() p2p.ChannelID); ok {
|
||||
var r0 conn.ChannelID
|
||||
if rf, ok := ret.Get(0).(func() conn.ChannelID); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(p2p.ChannelID)
|
||||
r0 = ret.Get(0).(conn.ChannelID)
|
||||
}
|
||||
|
||||
var r1 []byte
|
||||
@@ -121,11 +124,11 @@ func (_m *Connection) RemoteEndpoint() p2p.Endpoint {
|
||||
}
|
||||
|
||||
// SendMessage provides a mock function with given fields: _a0, _a1
|
||||
func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) error {
|
||||
func (_m *Connection) SendMessage(_a0 conn.ChannelID, _a1 []byte) error {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) error); ok {
|
||||
if rf, ok := ret.Get(0).(func(conn.ChannelID, []byte) error); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
|
||||
@@ -5,7 +5,10 @@ package mocks
|
||||
import (
|
||||
context "context"
|
||||
|
||||
conn "github.com/tendermint/tendermint/internal/p2p/conn"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
p2p "github.com/tendermint/tendermint/internal/p2p"
|
||||
)
|
||||
|
||||
@@ -37,6 +40,11 @@ func (_m *Transport) Accept() (p2p.Connection, error) {
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// AddChannelDescriptors provides a mock function with given fields: _a0
|
||||
func (_m *Transport) AddChannelDescriptors(_a0 []*conn.ChannelDescriptor) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields:
|
||||
func (_m *Transport) Close() error {
|
||||
ret := _m.Called()
|
||||
|
||||
@@ -15,7 +15,7 @@ var (
|
||||
ctx = context.Background()
|
||||
chID = p2p.ChannelID(1)
|
||||
chDesc = p2p.ChannelDescriptor{
|
||||
ID: byte(chID),
|
||||
ID: chID,
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: 10,
|
||||
|
||||
@@ -285,7 +285,7 @@ func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor,
|
||||
messageType proto.Message, size int) *p2p.Channel {
|
||||
channel, err := n.Router.OpenChannel(chDesc, messageType, size)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, n.Router.NodeInfo().Channels, chDesc.ID)
|
||||
require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID))
|
||||
t.Cleanup(func() {
|
||||
RequireEmpty(t, channel)
|
||||
channel.Close()
|
||||
@@ -335,7 +335,7 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates {
|
||||
|
||||
func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
|
||||
return p2p.ChannelDescriptor{
|
||||
ID: byte(chID),
|
||||
ID: chID,
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: 10,
|
||||
|
||||
@@ -25,7 +25,7 @@ var (
|
||||
// See https://github.com/tendermint/tendermint/issues/6371
|
||||
const (
|
||||
// PexChannel is a channel for PEX messages
|
||||
PexChannel = byte(0x00)
|
||||
PexChannel = 0x00
|
||||
|
||||
// over-estimate of max NetAddress size
|
||||
// hexID (40) + IP (16) + Port (2) + Name (100) ...
|
||||
|
||||
@@ -99,7 +99,7 @@ func newPQScheduler(
|
||||
)
|
||||
|
||||
for _, chDesc := range chDescsCopy {
|
||||
chID := ChannelID(chDesc.ID)
|
||||
chID := chDesc.ID
|
||||
chPriorities[chID] = uint(chDesc.Priority)
|
||||
sizes[uint(chDesc.Priority)] = 0
|
||||
}
|
||||
|
||||
@@ -23,9 +23,6 @@ const queueBufferDefault = 32
|
||||
|
||||
const dialRandomizerIntervalMillisecond = 3000
|
||||
|
||||
// ChannelID is an arbitrary channel ID.
|
||||
type ChannelID uint16
|
||||
|
||||
// Envelope contains a message with sender/receiver routing info.
|
||||
type Envelope struct {
|
||||
From types.NodeID // sender (empty if outbound)
|
||||
@@ -361,7 +358,7 @@ func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message
|
||||
r.channelMtx.Lock()
|
||||
defer r.channelMtx.Unlock()
|
||||
|
||||
id := ChannelID(chDesc.ID)
|
||||
id := chDesc.ID
|
||||
if _, ok := r.channelQueues[id]; ok {
|
||||
return nil, fmt.Errorf("channel %v already exists", id)
|
||||
}
|
||||
@@ -383,6 +380,10 @@ func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message
|
||||
// add the channel to the nodeInfo if it's not already there.
|
||||
r.nodeInfo.AddChannel(uint16(chDesc.ID))
|
||||
|
||||
for _, t := range r.transports {
|
||||
t.AddChannelDescriptors([]*ChannelDescriptor{&chDesc})
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
r.channelMtx.Lock()
|
||||
|
||||
@@ -121,17 +121,17 @@ func TestRouter_Channel_Basic(t *testing.T) {
|
||||
// Opening a channel should work.
|
||||
channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, router.NodeInfo().Channels, chDesc.ID)
|
||||
require.Contains(t, router.NodeInfo().Channels, byte(chDesc.ID))
|
||||
|
||||
// Opening the same channel again should fail.
|
||||
_, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
|
||||
require.Error(t, err)
|
||||
|
||||
// Opening a different channel should work.
|
||||
chDesc2 := p2p.ChannelDescriptor{ID: byte(2)}
|
||||
chDesc2 := p2p.ChannelDescriptor{ID: 2}
|
||||
_, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, router.NodeInfo().Channels, chDesc2.ID)
|
||||
require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID))
|
||||
|
||||
// Closing the channel, then opening it again should be fine.
|
||||
channel.Close()
|
||||
@@ -865,6 +865,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
|
||||
mockConnection.On("ReceiveMessage").Return(chID, nil, io.EOF)
|
||||
|
||||
mockTransport := &mocks.Transport{}
|
||||
mockTransport.On("AddChannelDescriptors", mock.Anything).Return()
|
||||
mockTransport.On("String").Maybe().Return("mock")
|
||||
mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
|
||||
mockTransport.On("Close").Return(nil)
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
// ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
|
||||
// and the proto.Message the new p2p Channel is responsible for handling.
|
||||
// A ChannelDescriptorShim is not contained in ReactorShim, but is rather
|
||||
@@ -13,88 +7,3 @@ import (
|
||||
type ChannelDescriptorShim struct {
|
||||
Descriptor *ChannelDescriptor
|
||||
}
|
||||
|
||||
// ChannelShim defines a generic shim wrapper around a legacy p2p channel
|
||||
// and the new p2p Channel. It also includes the raw bi-directional Go channels
|
||||
// so we can proxy message delivery.
|
||||
type ChannelShim struct {
|
||||
Descriptor *ChannelDescriptor
|
||||
Channel *Channel
|
||||
inCh chan<- Envelope
|
||||
outCh <-chan Envelope
|
||||
errCh <-chan PeerError
|
||||
}
|
||||
|
||||
// ReactorShim defines a generic shim wrapper around a BaseReactor. It is
|
||||
// responsible for wiring up legacy p2p behavior to the new p2p semantics
|
||||
// (e.g. proxying Envelope messages to legacy peers).
|
||||
type ReactorShim struct {
|
||||
Name string
|
||||
PeerUpdates *PeerUpdates
|
||||
Channels map[ChannelID]*ChannelShim
|
||||
}
|
||||
|
||||
func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
|
||||
channels := make(map[ChannelID]*ChannelShim)
|
||||
|
||||
for _, cds := range descriptors {
|
||||
chShim := NewChannelShim(cds, 0)
|
||||
channels[chShim.Channel.ID] = chShim
|
||||
}
|
||||
|
||||
rs := &ReactorShim{
|
||||
Name: name,
|
||||
PeerUpdates: NewPeerUpdates(make(chan PeerUpdate), 0),
|
||||
Channels: channels,
|
||||
}
|
||||
|
||||
return rs
|
||||
}
|
||||
|
||||
func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
|
||||
inCh := make(chan Envelope, buf)
|
||||
outCh := make(chan Envelope, buf)
|
||||
errCh := make(chan PeerError, buf)
|
||||
return &ChannelShim{
|
||||
Descriptor: cds.Descriptor,
|
||||
Channel: NewChannel(
|
||||
ChannelID(cds.Descriptor.ID),
|
||||
cds.Descriptor.MessageType,
|
||||
inCh,
|
||||
outCh,
|
||||
errCh,
|
||||
),
|
||||
inCh: inCh,
|
||||
outCh: outCh,
|
||||
errCh: errCh,
|
||||
}
|
||||
}
|
||||
|
||||
// GetChannels implements the legacy Reactor interface for getting a slice of all
|
||||
// the supported ChannelDescriptors.
|
||||
func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
|
||||
sortedChIDs := make([]ChannelID, 0, len(rs.Channels))
|
||||
for cID := range rs.Channels {
|
||||
sortedChIDs = append(sortedChIDs, cID)
|
||||
}
|
||||
|
||||
sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] })
|
||||
|
||||
descriptors := make([]*ChannelDescriptor, len(rs.Channels))
|
||||
for i, cID := range sortedChIDs {
|
||||
descriptors[i] = rs.Channels[cID].Descriptor
|
||||
}
|
||||
|
||||
return descriptors
|
||||
}
|
||||
|
||||
// GetChannel returns a p2p Channel reference for a given ChannelID. If no
|
||||
// Channel exists, nil is returned.
|
||||
func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel {
|
||||
channelShim, ok := rs.Channels[cID]
|
||||
if ok {
|
||||
return channelShim.Channel
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -44,6 +44,10 @@ type Transport interface {
|
||||
// Close stops accepting new connections, but does not close active connections.
|
||||
Close() error
|
||||
|
||||
// AddChannelDescriptors is only part of this interface
|
||||
// temporarily
|
||||
AddChannelDescriptors([]*ChannelDescriptor)
|
||||
|
||||
// Stringer is used to display the transport, e.g. in logs.
|
||||
//
|
||||
// Without this, the logger may use reflection to access and display
|
||||
|
||||
@@ -348,9 +348,9 @@ func (c *mConnConnection) handshake(
|
||||
}
|
||||
|
||||
// onReceive is a callback for MConnection received messages.
|
||||
func (c *mConnConnection) onReceive(chID byte, payload []byte) {
|
||||
func (c *mConnConnection) onReceive(chID ChannelID, payload []byte) {
|
||||
select {
|
||||
case c.receiveCh <- mConnMessage{channelID: ChannelID(chID), payload: payload}:
|
||||
case c.receiveCh <- mConnMessage{channelID: chID, payload: payload}:
|
||||
case <-c.closeCh:
|
||||
}
|
||||
}
|
||||
@@ -387,7 +387,7 @@ func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) error {
|
||||
case <-c.closeCh:
|
||||
return io.EOF
|
||||
default:
|
||||
if ok := c.mconn.Send(byte(chID), msg); !ok {
|
||||
if ok := c.mconn.Send(chID, msg); !ok {
|
||||
return errors.New("sending message timed out")
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ func init() {
|
||||
transport := p2p.NewMConnTransport(
|
||||
log.TestingLogger(),
|
||||
conn.DefaultMConnConfig(),
|
||||
[]*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}},
|
||||
[]*p2p.ChannelDescriptor{{ID: chID, Priority: 1}},
|
||||
p2p.MConnTransportOptions{},
|
||||
)
|
||||
err := transport.Listen(p2p.Endpoint{
|
||||
@@ -43,7 +43,7 @@ func TestMConnTransport_AcceptBeforeListen(t *testing.T) {
|
||||
transport := p2p.NewMConnTransport(
|
||||
log.TestingLogger(),
|
||||
conn.DefaultMConnConfig(),
|
||||
[]*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}},
|
||||
[]*p2p.ChannelDescriptor{{ID: chID, Priority: 1}},
|
||||
p2p.MConnTransportOptions{
|
||||
MaxAcceptedConnections: 2,
|
||||
},
|
||||
@@ -61,7 +61,7 @@ func TestMConnTransport_AcceptMaxAcceptedConnections(t *testing.T) {
|
||||
transport := p2p.NewMConnTransport(
|
||||
log.TestingLogger(),
|
||||
conn.DefaultMConnConfig(),
|
||||
[]*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}},
|
||||
[]*p2p.ChannelDescriptor{{ID: chID, Priority: 1}},
|
||||
p2p.MConnTransportOptions{
|
||||
MaxAcceptedConnections: 2,
|
||||
},
|
||||
@@ -148,7 +148,7 @@ func TestMConnTransport_Listen(t *testing.T) {
|
||||
transport := p2p.NewMConnTransport(
|
||||
log.TestingLogger(),
|
||||
conn.DefaultMConnConfig(),
|
||||
[]*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}},
|
||||
[]*p2p.ChannelDescriptor{{ID: chID, Priority: 1}},
|
||||
p2p.MConnTransportOptions{},
|
||||
)
|
||||
|
||||
|
||||
@@ -117,6 +117,8 @@ func (t *MemoryTransport) String() string {
|
||||
return string(MemoryProtocol)
|
||||
}
|
||||
|
||||
func (t *MemoryTransport) AddChannelDescriptors([]*ChannelDescriptor) {}
|
||||
|
||||
// Protocols implements Transport.
|
||||
func (t *MemoryTransport) Protocols() []Protocol {
|
||||
return []Protocol{MemoryProtocol}
|
||||
|
||||
@@ -5,3 +5,4 @@ import (
|
||||
)
|
||||
|
||||
type ChannelDescriptor = conn.ChannelDescriptor
|
||||
type ChannelID = conn.ChannelID
|
||||
|
||||
@@ -39,7 +39,7 @@ var (
|
||||
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
SnapshotChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(SnapshotChannel),
|
||||
ID: SnapshotChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 6,
|
||||
SendQueueCapacity: 10,
|
||||
@@ -49,7 +49,7 @@ var (
|
||||
},
|
||||
ChunkChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(ChunkChannel),
|
||||
ID: ChunkChannel,
|
||||
Priority: 3,
|
||||
MessageType: new(ssproto.Message),
|
||||
SendQueueCapacity: 4,
|
||||
@@ -59,7 +59,7 @@ var (
|
||||
},
|
||||
LightBlockChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(LightBlockChannel),
|
||||
ID: LightBlockChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
@@ -69,7 +69,7 @@ var (
|
||||
},
|
||||
ParamsChannel: {
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(ParamsChannel),
|
||||
ID: ParamsChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 2,
|
||||
SendQueueCapacity: 10,
|
||||
|
||||
30
node/node.go
30
node/node.go
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/p2p/pex"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
rpccore "github.com/tendermint/tendermint/internal/rpc/core"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
@@ -277,7 +276,7 @@ func makeNode(cfg *config.Config,
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
mpReactorShim, mpReactor, mp, err := createMempoolReactor(
|
||||
mpReactor, mp, err := createMempoolReactor(
|
||||
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -285,7 +284,7 @@ func makeNode(cfg *config.Config,
|
||||
|
||||
}
|
||||
|
||||
evReactorShim, evReactor, evPool, err := createEvidenceReactor(
|
||||
evReactor, evPool, err := createEvidenceReactor(
|
||||
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -304,7 +303,7 @@ func makeNode(cfg *config.Config,
|
||||
sm.BlockExecutorWithMetrics(nodeMetrics.state),
|
||||
)
|
||||
|
||||
csReactorShim, csReactor, csState := createConsensusReactor(
|
||||
csReactor, csState := createConsensusReactor(
|
||||
cfg, state, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
|
||||
peerManager, router, consensusLogger,
|
||||
@@ -312,7 +311,7 @@ func makeNode(cfg *config.Config,
|
||||
|
||||
// Create the blockchain reactor. Note, we do not start block sync if we're
|
||||
// doing a state sync first.
|
||||
bcReactorShim, bcReactor, err := createBlockchainReactor(
|
||||
bcReactor, err := createBlockchainReactor(
|
||||
logger, state, blockExec, blockStore, csReactor,
|
||||
peerManager, router, blockSync && !stateSync, nodeMetrics.consensus,
|
||||
)
|
||||
@@ -335,7 +334,6 @@ func makeNode(cfg *config.Config,
|
||||
// we should clean this whole thing up. See:
|
||||
// https://github.com/tendermint/tendermint/issues/4644
|
||||
ssLogger := logger.With("module", "statesync")
|
||||
ssReactorShim := p2p.NewReactorShim(ssLogger, "StateSyncShim", statesync.ChannelShims)
|
||||
channels := makeChannelsFromShims(router, statesync.ChannelShims)
|
||||
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
@@ -357,16 +355,6 @@ func makeNode(cfg *config.Config,
|
||||
nodeMetrics.statesync,
|
||||
)
|
||||
|
||||
// add the channel descriptors to both the transports
|
||||
// FIXME: This should be removed when the legacy p2p stack is removed and
|
||||
// transports can either be agnostic to channel descriptors or can be
|
||||
// declared in the constructor.
|
||||
transport.AddChannelDescriptors(mpReactorShim.GetChannels())
|
||||
transport.AddChannelDescriptors(bcReactorShim.GetChannels())
|
||||
transport.AddChannelDescriptors(csReactorShim.GetChannels())
|
||||
transport.AddChannelDescriptors(evReactorShim.GetChannels())
|
||||
transport.AddChannelDescriptors(ssReactorShim.GetChannels())
|
||||
|
||||
// Optionally, start the pex reactor
|
||||
//
|
||||
// TODO:
|
||||
@@ -382,9 +370,6 @@ func makeNode(cfg *config.Config,
|
||||
|
||||
var pexReactor service.Service
|
||||
|
||||
pexCh := pex.ChannelDescriptor()
|
||||
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
|
||||
|
||||
pexReactor, err = createPEXReactor(logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
@@ -501,13 +486,6 @@ func makeSeedNode(cfg *config.Config,
|
||||
|
||||
var pexReactor service.Service
|
||||
|
||||
// add the pex reactor
|
||||
// FIXME: we add channel descriptors to both the router and the transport but only the router
|
||||
// should be aware of channel info. We should remove this from transport once the legacy
|
||||
// p2p stack is removed.
|
||||
pexCh := pex.ChannelDescriptor()
|
||||
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
|
||||
|
||||
pexReactor, err = createPEXReactor(logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, closer)
|
||||
|
||||
@@ -196,11 +196,10 @@ func createMempoolReactor(
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
logger log.Logger,
|
||||
) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
|
||||
) (service.Service, mempool.Mempool, error) {
|
||||
|
||||
logger = logger.With("module", "mempool", "version", cfg.Mempool.Version)
|
||||
channelShims := mempoolv0.GetChannelShims(cfg.Mempool)
|
||||
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
|
||||
|
||||
channels := makeChannelsFromShims(router, channelShims)
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
@@ -231,7 +230,7 @@ func createMempoolReactor(
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return reactorShim, reactor, mp, nil
|
||||
return reactor, mp, nil
|
||||
|
||||
case config.MempoolV1:
|
||||
mp := mempoolv1.NewTxMempool(
|
||||
@@ -257,10 +256,10 @@ func createMempoolReactor(
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return reactorShim, reactor, mp, nil
|
||||
return reactor, mp, nil
|
||||
|
||||
default:
|
||||
return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", cfg.Mempool.Version)
|
||||
return nil, nil, fmt.Errorf("unknown mempool version: %s", cfg.Mempool.Version)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,18 +271,17 @@ func createEvidenceReactor(
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
logger log.Logger,
|
||||
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
|
||||
) (*evidence.Reactor, *evidence.Pool, error) {
|
||||
evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
logger = logger.With("module", "evidence")
|
||||
reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
|
||||
|
||||
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
|
||||
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
|
||||
}
|
||||
|
||||
evidenceReactor := evidence.NewReactor(
|
||||
@@ -293,7 +291,7 @@ func createEvidenceReactor(
|
||||
evidencePool,
|
||||
)
|
||||
|
||||
return reactorShim, evidenceReactor, evidencePool, nil
|
||||
return evidenceReactor, evidencePool, nil
|
||||
}
|
||||
|
||||
func createBlockchainReactor(
|
||||
@@ -306,11 +304,10 @@ func createBlockchainReactor(
|
||||
router *p2p.Router,
|
||||
blockSync bool,
|
||||
metrics *consensus.Metrics,
|
||||
) (*p2p.ReactorShim, service.Service, error) {
|
||||
) (service.Service, error) {
|
||||
|
||||
logger = logger.With("module", "blockchain")
|
||||
|
||||
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", blocksync.ChannelShims)
|
||||
channels := makeChannelsFromShims(router, blocksync.ChannelShims)
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
|
||||
@@ -320,10 +317,10 @@ func createBlockchainReactor(
|
||||
metrics,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return reactorShim, reactor, nil
|
||||
return reactor, nil
|
||||
}
|
||||
|
||||
func createConsensusReactor(
|
||||
@@ -340,7 +337,7 @@ func createConsensusReactor(
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
logger log.Logger,
|
||||
) (*p2p.ReactorShim, *consensus.Reactor, *consensus.State) {
|
||||
) (*consensus.Reactor, *consensus.State) {
|
||||
|
||||
consensusState := consensus.NewState(
|
||||
cfg.Consensus,
|
||||
@@ -356,8 +353,6 @@ func createConsensusReactor(
|
||||
consensusState.SetPrivValidator(privValidator)
|
||||
}
|
||||
|
||||
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", consensus.ChannelShims)
|
||||
|
||||
var (
|
||||
channels map[p2p.ChannelID]*p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
@@ -382,7 +377,7 @@ func createConsensusReactor(
|
||||
// consensusReactor will set it on consensusState and blockExecutor.
|
||||
reactor.SetEventBus(eventBus)
|
||||
|
||||
return reactorShim, reactor, consensusState
|
||||
return reactor, consensusState
|
||||
}
|
||||
|
||||
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
|
||||
|
||||
Reference in New Issue
Block a user