statesync: merge channel processing (#8240)

This commit is contained in:
Sam Kleinman
2022-04-04 12:31:15 -04:00
committed by GitHub
parent cb8e6b1c1a
commit 97f7021712
10 changed files with 81 additions and 29 deletions

View File

@@ -45,6 +45,7 @@ func GetChannelDescriptor() *p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: MaxMsgSize,
Name: "blockSync",
}
}

View File

@@ -38,6 +38,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "state",
},
DataChannel: {
// TODO: Consider a split between gossiping current block and catchup
@@ -49,6 +50,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
Name: "data",
},
VoteChannel: {
ID: VoteChannel,
@@ -57,6 +59,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 64,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "vote",
},
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
@@ -65,6 +68,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "voteSet",
},
}
}

View File

@@ -38,6 +38,7 @@ func GetChannelDescriptor() *p2p.ChannelDescriptor {
Priority: 6,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
Name: "evidence",
}
}

View File

@@ -106,6 +106,7 @@ func getChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor {
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128,
Name: "mempool",
}
}

View File

@@ -60,6 +60,7 @@ type Channel struct {
errCh chan<- PeerError // peer error reporting
messageType proto.Message // the channel's message type, used for unmarshaling
name string
}
// NewChannel creates a new channel. It is primarily for internal and test
@@ -102,6 +103,8 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
}
}
func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
// Receive returns a new unbuffered iterator to receive messages from ch.
// The iterator runs until ctx ends.
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {

View File

@@ -616,6 +616,10 @@ type ChannelDescriptor struct {
// RecvBufferCapacity defines the max buffer size of inbound messages for a
// given p2p Channel queue.
RecvBufferCapacity int
// Human readable name of the channel, used in logging and
// diagnostics.
Name string
}
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {

View File

@@ -63,6 +63,7 @@ func ChannelDescriptor() *conn.ChannelDescriptor {
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "pex",
}
}

View File

@@ -281,6 +281,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
channel.name = chDesc.Name
var wrapper Wrapper
if w, ok := messageType.(Wrapper); ok {

View File

@@ -16,6 +16,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -75,13 +76,13 @@ const (
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
SnapshotChannel: {
ID: SnapshotChannel,
MessageType: new(ssproto.Message),
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
RecvBufferCapacity: 128,
Name: "snapshot",
},
ChunkChannel: {
ID: ChunkChannel,
@@ -90,6 +91,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
Name: "chunk",
},
LightBlockChannel: {
ID: LightBlockChannel,
@@ -98,6 +100,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
Name: "light-block",
},
ParamsChannel: {
ID: ParamsChannel,
@@ -106,6 +109,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
Name: "params",
},
}
@@ -233,10 +237,7 @@ func NewReactor(
// The caller must be sure to execute OnStop to ensure the outbound p2p Channels are
// closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processCh(ctx, r.snapshotCh, "snapshot")
go r.processCh(ctx, r.chunkCh, "chunk")
go r.processCh(ctx, r.blockCh, "light block")
go r.processCh(ctx, r.paramsCh, "consensus params")
go r.processChannels(ctx, r.snapshotCh, r.chunkCh, r.blockCh, r.paramsCh)
go r.processPeerUpdates(ctx)
return nil
@@ -291,6 +292,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.metrics,
)
r.mtx.Unlock()
defer func() {
r.mtx.Lock()
// reset syncing objects at the close of Sync
@@ -780,6 +782,8 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
select {
case sp.paramsRecvCh <- cp:
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return errors.New("failed to send consensus params, stateprovider not ready for response")
}
@@ -797,7 +801,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -811,7 +815,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r.logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From)
switch chID {
switch envelope.ChannelID {
case SnapshotChannel:
err = r.handleSnapshotMessage(ctx, envelope)
case ChunkChannel:
@@ -821,7 +825,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
case ParamsChannel:
err = r.handleParamsMessage(ctx, envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
return err
@@ -831,15 +835,35 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// encountered during message execution will result in a PeerError being sent on
// the respective channel. When the reactor is stopped, we will catch the signal
// and close the p2p Channel gracefully.
func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) {
iter := ch.Receive(ctx)
func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
// make sure that the iterator gets cleaned up in case of error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chanTable := make(map[conn.ChannelID]*p2p.Channel, len(chs))
for idx := range chs {
ch := chs[idx]
chanTable[ch.ID] = ch
}
iter := p2p.MergedChannelIterator(ctx, chs...)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, ch.ID, envelope); err != nil {
if err := r.handleMessage(ctx, envelope); err != nil {
ch, ok := chanTable[envelope.ChannelID]
if !ok {
r.logger.Error("received impossible message",
"envelope_from", envelope.From,
"envelope_ch", envelope.ChannelID,
"num_chs", len(chanTable),
"err", err,
)
return
}
r.logger.Error("failed to process message",
"err", err,
"channel", chName,
"ch_id", ch.ID,
"channel", ch.String(),
"ch_id", envelope.ChannelID,
"envelope", envelope)
if serr := ch.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
@@ -875,14 +899,15 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
r.mtx.Lock()
defer r.mtx.Unlock()
if r.syncer == nil {
return
}
switch peerUpdate.Status {
case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider
err := r.syncer.AddPeer(ctx, peerUpdate.NodeID)
if err != nil {

View File

@@ -257,8 +257,9 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
rts := setup(ctx, t, nil, nil, 2)
rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: &ssproto.SnapshotsRequest{},
From: types.NodeID("aa"),
ChannelID: ChunkChannel,
Message: &ssproto.SnapshotsRequest{},
}
response := <-rts.chunkPeerErrCh
@@ -315,8 +316,9 @@ func TestReactor_ChunkRequest(t *testing.T) {
rts := setup(ctx, t, conn, nil, 2)
rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: tc.request,
From: types.NodeID("aa"),
ChannelID: ChunkChannel,
Message: tc.request,
}
response := <-rts.chunkOutCh
@@ -335,8 +337,9 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
rts := setup(ctx, t, nil, nil, 2)
rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: &ssproto.ChunkRequest{},
From: types.NodeID("aa"),
ChannelID: SnapshotChannel,
Message: &ssproto.ChunkRequest{},
}
response := <-rts.snapshotPeerErrCh
@@ -400,8 +403,9 @@ func TestReactor_SnapshotsRequest(t *testing.T) {
rts := setup(ctx, t, conn, nil, 100)
rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: &ssproto.SnapshotsRequest{},
From: types.NodeID("aa"),
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
}
if len(tc.expectResponses) > 0 {
@@ -457,7 +461,8 @@ func TestReactor_LightBlockResponse(t *testing.T) {
rts.stateStore.On("LoadValidators", height).Return(vals, nil)
rts.blockInCh <- p2p.Envelope{
From: types.NodeID("aa"),
From: types.NodeID("aa"),
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockRequest{
Height: 10,
},
@@ -733,7 +738,8 @@ func handleLightBlockRequests(
require.NoError(t, err)
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockResponse{
LightBlock: lb,
},
@@ -750,7 +756,8 @@ func handleLightBlockRequests(
require.NoError(t, err)
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockResponse{
LightBlock: differntLB,
},
@@ -761,7 +768,8 @@ func handleLightBlockRequests(
case 1: // send nil block i.e. pretend we don't have it
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
},
@@ -802,7 +810,8 @@ func handleConsensusParamsRequest(
}
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: ParamsChannel,
Message: &ssproto.ParamsResponse{
Height: msg.Height,
ConsensusParams: paramsProto,
@@ -913,7 +922,8 @@ func handleSnapshotRequests(
require.True(t, ok)
for _, snapshot := range snapshots {
sendingCh <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
@@ -946,7 +956,8 @@ func handleChunkRequests(
msg, ok := envelope.Message.(*ssproto.ChunkRequest)
require.True(t, ok)
sendingCh <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: ChunkChannel,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,