Compare commits

...

6 Commits

Author SHA1 Message Date
tycho garen
67fb3a5edb fix test fixtures 2021-06-23 11:44:56 -04:00
tycho garen
25c58c0a00 reduce pex sending buffer 2021-06-22 17:51:23 -04:00
tycho garen
d874f85710 keep state sync queue sizes smaller 2021-06-22 14:37:53 -04:00
tycho garen
a8fcc919e0 specify buffer on pex 2021-06-22 13:21:25 -04:00
tycho garen
8afa250f8d fix queues 2021-06-22 11:34:53 -04:00
tycho garen
3616717500 p2p: reduce buffering on channels 2021-06-21 16:45:52 -04:00
12 changed files with 39 additions and 41 deletions

View File

@@ -32,10 +32,9 @@ var (
ID: byte(BlockchainChannel),
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvBufferCapacity: 1024,
RecvMessageCapacity: bc.MaxMsgSize,
MaxSendBytes: 100,
MaxSendBytes: 100,
},
},
}

View File

@@ -63,7 +63,7 @@ func setup(
fastSync: true,
}
chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel)}
chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel), RecvBufferCapacity: 16}
rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
i := 0

View File

@@ -583,7 +583,7 @@ func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 2000,
RecvBufferCapacity: 50 * 4096,
RecvBufferCapacity: 1024,
RecvMessageCapacity: bc.MaxMsgSize,
},
}

View File

@@ -33,11 +33,11 @@ var (
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(StateChannel),
Priority: 6,
SendQueueCapacity: 100,
Priority: 8,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 12000,
RecvBufferCapacity: 128,
MaxSendBytes: 12000,
},
},
DataChannel: {
@@ -47,36 +47,33 @@ var (
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: byte(DataChannel),
Priority: 10,
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
Priority: 12,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 40000,
MaxSendBytes: 40000,
},
},
VoteChannel: {
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(VoteChannel),
Priority: 7,
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
Priority: 10,
SendQueueCapacity: 64,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 150,
MaxSendBytes: 150,
},
},
VoteSetBitsChannel: {
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(VoteSetBitsChannel),
Priority: 1,
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
Priority: 5,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 50,
MaxSendBytes: 50,
},
},
}

View File

@@ -31,8 +31,8 @@ var (
ID: byte(EvidenceChannel),
Priority: 6,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 400,
RecvBufferCapacity: 32,
MaxSendBytes: 400,
},
},
}

View File

@@ -104,8 +104,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
ID: byte(mempool.MempoolChannel),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MaxSendBytes: 5000,
RecvBufferCapacity: 128,
MaxSendBytes: 5000,
},
},
}

View File

@@ -103,8 +103,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
ID: byte(mempool.MempoolChannel),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MaxSendBytes: 5000,
RecvBufferCapacity: 128,
MaxSendBytes: 5000,
},
},
}

View File

@@ -18,6 +18,7 @@ var (
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: 10,
RecvBufferCapacity: 8,
MaxSendBytes: 1000,
}

View File

@@ -50,8 +50,8 @@ func ChannelDescriptor() conn.ChannelDescriptor {
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 200,
RecvBufferCapacity: 32,
MaxSendBytes: 200,
}
}
@@ -416,6 +416,7 @@ func (r *ReactorV2) sendRequestForPeers() {
// no peers are available
r.Logger.Debug("no available peers to send request to, waiting...")
r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod)
return
}
var peerID p2p.NodeID

View File

@@ -18,7 +18,7 @@ import (
"github.com/tendermint/tendermint/libs/service"
)
const queueBufferDefault = 4096
const queueBufferDefault = 32
// ChannelID is an arbitrary channel ID.
type ChannelID uint16
@@ -365,7 +365,7 @@ func (r *Router) createQueueFactory() (func(int) queue, error) {
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) {
if size == 0 {
size = queueBufferDefault
return nil, fmt.Errorf("must specify non-zero size for channel %d", chDesc.ID)
}
r.channelMtx.Lock()

View File

@@ -41,8 +41,8 @@ var (
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
MaxSendBytes: 400,
RecvBufferCapacity: 128,
MaxSendBytes: 400,
},
},
ChunkChannel: {
@@ -52,8 +52,8 @@ var (
Priority: 3,
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
MaxSendBytes: 400,
RecvBufferCapacity: 128,
MaxSendBytes: 400,
},
},
LightBlockChannel: {
@@ -63,8 +63,8 @@ var (
Priority: 2,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
MaxSendBytes: 400,
RecvBufferCapacity: 128,
MaxSendBytes: 400,
},
},
}

View File

@@ -696,7 +696,7 @@ func createPEXReactorV2(
router *p2p.Router,
) (*pex.ReactorV2, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096)
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
if err != nil {
return nil, err
}