From 8afa250f8d08ca98038ea8c1eabdfbda395fd7a6 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 22 Jun 2021 11:34:53 -0400 Subject: [PATCH] fix queues --- internal/blockchain/v0/reactor.go | 3 +-- internal/consensus/reactor.go | 13 +++++-------- internal/evidence/reactor.go | 4 ++-- internal/mempool/v0/reactor.go | 4 ++-- internal/mempool/v1/reactor.go | 4 ++-- internal/p2p/router.go | 2 +- internal/statesync/reactor.go | 12 ++++++------ 7 files changed, 19 insertions(+), 23 deletions(-) diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index cc61a157b..f8e3c4ee6 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -34,8 +34,7 @@ var ( SendQueueCapacity: 1000, RecvBufferCapacity: 1024, RecvMessageCapacity: bc.MaxMsgSize, - - MaxSendBytes: 100, + MaxSendBytes: 100, }, }, } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index e7c1c03b8..820079613 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -36,8 +36,8 @@ var ( Priority: 8, SendQueueCapacity: 64, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 12000, + RecvBufferCapacity: 128, + MaxSendBytes: 12000, }, }, DataChannel: { @@ -51,8 +51,7 @@ var ( SendQueueCapacity: 64, RecvBufferCapacity: 512, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 40000, + MaxSendBytes: 40000, }, }, VoteChannel: { @@ -63,8 +62,7 @@ var ( SendQueueCapacity: 64, RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 150, + MaxSendBytes: 150, }, }, VoteSetBitsChannel: { @@ -75,8 +73,7 @@ var ( SendQueueCapacity: 8, RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 50, + MaxSendBytes: 50, }, }, } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 941c9dbc6..d2625309e 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -31,8 +31,8 @@ var ( ID: byte(EvidenceChannel), Priority: 6, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 32, + MaxSendBytes: 400, }, }, } diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index e2ff5090e..5a85a89b9 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -104,8 +104,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe ID: byte(mempool.MempoolChannel), Priority: 5, RecvMessageCapacity: batchMsg.Size(), - - MaxSendBytes: 5000, + RecvBufferCapacity: 128, + MaxSendBytes: 5000, }, }, } diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 27363c8e9..4443fe6dc 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -103,8 +103,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe ID: byte(mempool.MempoolChannel), Priority: 5, RecvMessageCapacity: batchMsg.Size(), - - MaxSendBytes: 5000, + RecvBufferCapacity: 128, + MaxSendBytes: 5000, }, }, } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 499136b6c..8657309ba 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -365,7 +365,7 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { // which internally makes the inbound, outbound, and error channel buffered. func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) { if size == 0 { - size = queueBufferDefault + return nil, fmt.Errorf("must specify non-zero size for channel %d", chDesc.ID) } r.channelMtx.Lock() diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 513000c1d..26ddf1809 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -41,8 +41,8 @@ var ( Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: snapshotMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 4096, + MaxSendBytes: 400, }, }, ChunkChannel: { @@ -52,8 +52,8 @@ var ( Priority: 3, SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 4096, + MaxSendBytes: 400, }, }, LightBlockChannel: { @@ -63,8 +63,8 @@ var ( Priority: 2, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 4096, + MaxSendBytes: 400, }, }, }