From f4a56f4034e04775b4157413c3a5bbf71eef3409 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 15 Oct 2021 11:45:12 -0400 Subject: [PATCH] p2p: refactor channel description (#7130) This is another small sliver of #7075, with the intention of removing the legacy shim layer related to channel registration. --- internal/blocksync/reactor.go | 3 +-- internal/consensus/reactor.go | 12 ++++-------- internal/evidence/reactor.go | 3 +-- internal/mempool/v0/reactor.go | 3 +-- internal/mempool/v1/reactor.go | 3 +-- internal/p2p/conn/connection.go | 6 ++---- internal/p2p/p2p_test.go | 1 - internal/p2p/p2ptest/network.go | 1 - internal/p2p/pex/reactor.go | 1 - internal/p2p/pqueue_test.go | 2 +- internal/p2p/shim.go | 4 +--- internal/statesync/reactor.go | 12 ++++-------- node/node.go | 2 +- 13 files changed, 17 insertions(+), 36 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 0c193577e..5f4ce3029 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -29,14 +29,13 @@ var ( // ref: https://github.com/tendermint/tendermint/issues/5670 ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ BlockSyncChannel: { - MsgType: new(bcproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(BlockSyncChannel), + MessageType: new(bcproto.Message), Priority: 5, SendQueueCapacity: 1000, RecvBufferCapacity: 1024, RecvMessageCapacity: MaxMsgSize, - MaxSendBytes: 100, }, }, } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 22bd7576e..27ecb4602 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -31,50 +31,46 @@ var ( // ref: https://github.com/tendermint/tendermint/issues/5670 ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ StateChannel: { - MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(StateChannel), + MessageType: new(tmcons.Message), Priority: 8, SendQueueCapacity: 64, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 128, - MaxSendBytes: 12000, }, }, DataChannel: { - MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ // TODO: Consider a split between gossiping current block and catchup // stuff. Once we gossip the whole block there is nothing left to send // until next height or round. ID: byte(DataChannel), + MessageType: new(tmcons.Message), Priority: 12, SendQueueCapacity: 64, RecvBufferCapacity: 512, RecvMessageCapacity: maxMsgSize, - MaxSendBytes: 40000, }, }, VoteChannel: { - MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(VoteChannel), + MessageType: new(tmcons.Message), Priority: 10, SendQueueCapacity: 64, RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, - MaxSendBytes: 150, }, }, VoteSetBitsChannel: { - MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(VoteSetBitsChannel), + MessageType: new(tmcons.Message), Priority: 5, SendQueueCapacity: 8, RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, - MaxSendBytes: 50, }, }, } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index a454038fd..0090bd32f 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -27,13 +27,12 @@ var ( // ref: https://github.com/tendermint/tendermint/issues/5670 ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ EvidenceChannel: { - MsgType: new(tmproto.EvidenceList), Descriptor: &p2p.ChannelDescriptor{ ID: byte(EvidenceChannel), + MessageType: new(tmproto.EvidenceList), Priority: 6, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 32, - MaxSendBytes: 400, }, }, } diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index 86392e96f..1587c0778 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -100,13 +100,12 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ mempool.MempoolChannel: { - MsgType: new(protomem.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(mempool.MempoolChannel), + MessageType: new(protomem.Message), Priority: 5, RecvMessageCapacity: batchMsg.Size(), RecvBufferCapacity: 128, - MaxSendBytes: 5000, }, }, } diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index f07808387..e31977cc9 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -107,13 +107,12 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ mempool.MempoolChannel: { - MsgType: new(protomem.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(mempool.MempoolChannel), + MessageType: new(protomem.Message), Priority: 5, RecvMessageCapacity: batchMsg.Size(), RecvBufferCapacity: 128, - MaxSendBytes: 5000, }, }, } diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 0ab7b7546..c6a9b206c 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -612,6 +612,8 @@ type ChannelDescriptor struct { ID byte Priority int + MessageType proto.Message + // TODO: Remove once p2p refactor is complete. SendQueueCapacity int RecvMessageCapacity int @@ -619,10 +621,6 @@ type ChannelDescriptor struct { // RecvBufferCapacity defines the max buffer size of inbound messages for a // given p2p Channel queue. RecvBufferCapacity int - - // MaxSendBytes defines the maximum number of bytes that can be sent at any - // given moment from a Channel to a peer. - MaxSendBytes uint } func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { diff --git a/internal/p2p/p2p_test.go b/internal/p2p/p2p_test.go index 6e524d492..91e1c0824 100644 --- a/internal/p2p/p2p_test.go +++ b/internal/p2p/p2p_test.go @@ -19,7 +19,6 @@ var ( Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: 10, - MaxSendBytes: 1000, } selfKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd}) diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 1daba3f14..21aa1a0ea 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -339,6 +339,5 @@ func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: 10, - MaxSendBytes: 1000, } } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index f5c03adf8..f313279c3 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -69,7 +69,6 @@ func ChannelDescriptor() conn.ChannelDescriptor { SendQueueCapacity: 10, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 32, - MaxSendBytes: 200, } } diff --git a/internal/p2p/pqueue_test.go b/internal/p2p/pqueue_test.go index c038408ef..18f0c02e3 100644 --- a/internal/p2p/pqueue_test.go +++ b/internal/p2p/pqueue_test.go @@ -13,7 +13,7 @@ type testMessage = gogotypes.StringValue func TestCloseWhileDequeueFull(t *testing.T) { enqueueLength := 5 chDescs := []ChannelDescriptor{ - {ID: 0x01, Priority: 1, MaxSendBytes: 4}, + {ID: 0x01, Priority: 1}, } pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), chDescs, uint(enqueueLength), 1, 120) diff --git a/internal/p2p/shim.go b/internal/p2p/shim.go index 9f82bc11a..f57ce18b9 100644 --- a/internal/p2p/shim.go +++ b/internal/p2p/shim.go @@ -3,7 +3,6 @@ package p2p import ( "sort" - "github.com/gogo/protobuf/proto" "github.com/tendermint/tendermint/libs/log" ) @@ -12,7 +11,6 @@ import ( // A ChannelDescriptorShim is not contained in ReactorShim, but is rather // used to construct a ReactorShim. type ChannelDescriptorShim struct { - MsgType proto.Message Descriptor *ChannelDescriptor } @@ -61,7 +59,7 @@ func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim { Descriptor: cds.Descriptor, Channel: NewChannel( ChannelID(cds.Descriptor.ID), - cds.MsgType, + cds.Descriptor.MessageType, inCh, outCh, errCh, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 74803b3e2..3757d028d 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -38,47 +38,43 @@ var ( // ref: https://github.com/tendermint/tendermint/issues/5670 ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ SnapshotChannel: { - MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(SnapshotChannel), + MessageType: new(ssproto.Message), Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: snapshotMsgSize, RecvBufferCapacity: 128, - MaxSendBytes: 400, }, }, ChunkChannel: { - MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(ChunkChannel), Priority: 3, + MessageType: new(ssproto.Message), SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, RecvBufferCapacity: 128, - MaxSendBytes: 400, }, }, LightBlockChannel: { - MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(LightBlockChannel), + MessageType: new(ssproto.Message), Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, RecvBufferCapacity: 128, - MaxSendBytes: 400, }, }, ParamsChannel: { - MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(ParamsChannel), + MessageType: new(ssproto.Message), Priority: 2, SendQueueCapacity: 10, RecvMessageCapacity: paramMsgSize, RecvBufferCapacity: 128, - MaxSendBytes: 400, }, }, } diff --git a/node/node.go b/node/node.go index b6c47a9ce..6a66f4224 100644 --- a/node/node.go +++ b/node/node.go @@ -1119,7 +1119,7 @@ func makeChannelsFromShims( channels := map[p2p.ChannelID]*p2p.Channel{} for chID, chShim := range chShims { - ch, err := router.OpenChannel(*chShim.Descriptor, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity) + ch, err := router.OpenChannel(*chShim.Descriptor, chShim.Descriptor.MessageType, chShim.Descriptor.RecvBufferCapacity) if err != nil { panic(fmt.Sprintf("failed to open channel %v: %v", chID, err)) }