From 97f7021712cbc0db8891a1f17c1c7406636e7c1a Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Mon, 4 Apr 2022 12:31:15 -0400 Subject: [PATCH] statesync: merge channel processing (#8240) --- internal/blocksync/reactor.go | 1 + internal/consensus/reactor.go | 4 +++ internal/evidence/reactor.go | 1 + internal/mempool/reactor.go | 1 + internal/p2p/channel.go | 3 ++ internal/p2p/conn/connection.go | 4 +++ internal/p2p/pex/reactor.go | 1 + internal/p2p/router.go | 1 + internal/statesync/reactor.go | 53 ++++++++++++++++++++++-------- internal/statesync/reactor_test.go | 41 ++++++++++++++--------- 10 files changed, 81 insertions(+), 29 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index b9c4e498c..34ed1f0fc 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -45,6 +45,7 @@ func GetChannelDescriptor() *p2p.ChannelDescriptor { SendQueueCapacity: 1000, RecvBufferCapacity: 1024, RecvMessageCapacity: MaxMsgSize, + Name: "blockSync", } } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index c8d296ff9..b11775679 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -38,6 +38,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 64, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 128, + Name: "state", }, DataChannel: { // TODO: Consider a split between gossiping current block and catchup @@ -49,6 +50,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 64, RecvBufferCapacity: 512, RecvMessageCapacity: maxMsgSize, + Name: "data", }, VoteChannel: { ID: VoteChannel, @@ -57,6 +59,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 64, RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, + Name: "vote", }, VoteSetBitsChannel: { ID: VoteSetBitsChannel, @@ -65,6 +68,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 8, RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, + Name: "voteSet", }, } } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 1011f732f..c52e7e32b 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -38,6 +38,7 @@ func GetChannelDescriptor() *p2p.ChannelDescriptor { Priority: 6, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 32, + Name: "evidence", } } diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 8f98d97f1..816589c9b 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -106,6 +106,7 @@ func getChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor { Priority: 5, RecvMessageCapacity: batchMsg.Size(), RecvBufferCapacity: 128, + Name: "mempool", } } diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index d7dad4d3b..8e6774612 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -60,6 +60,7 @@ type Channel struct { errCh chan<- PeerError // peer error reporting messageType proto.Message // the channel's message type, used for unmarshaling + name string } // NewChannel creates a new channel. It is primarily for internal and test @@ -102,6 +103,8 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { } } +func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) } + // Receive returns a new unbuffered iterator to receive messages from ch. // The iterator runs until ctx ends. func (ch *Channel) Receive(ctx context.Context) *ChannelIterator { diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 4cbca7f19..c8fc21188 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -616,6 +616,10 @@ type ChannelDescriptor struct { // RecvBufferCapacity defines the max buffer size of inbound messages for a // given p2p Channel queue. RecvBufferCapacity int + + // Human readable name of the channel, used in logging and + // diagnostics. + Name string } func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 2beaeaa17..178524af2 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -63,6 +63,7 @@ func ChannelDescriptor() *conn.ChannelDescriptor { SendQueueCapacity: 10, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 128, + Name: "pex", } } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index f9b3d1ad8..40c24d56b 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -281,6 +281,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C outCh := make(chan Envelope, chDesc.RecvBufferCapacity) errCh := make(chan PeerError, chDesc.RecvBufferCapacity) channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh) + channel.name = chDesc.Name var wrapper Wrapper if w, ok := messageType.(Wrapper); ok { diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 51f626027..dba520a1c 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -16,6 +16,7 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/p2p/conn" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -75,13 +76,13 @@ const ( func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { return map[p2p.ChannelID]*p2p.ChannelDescriptor{ SnapshotChannel: { - ID: SnapshotChannel, MessageType: new(ssproto.Message), Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: snapshotMsgSize, RecvBufferCapacity: 128, + Name: "snapshot", }, ChunkChannel: { ID: ChunkChannel, @@ -90,6 +91,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, RecvBufferCapacity: 128, + Name: "chunk", }, LightBlockChannel: { ID: LightBlockChannel, @@ -98,6 +100,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, RecvBufferCapacity: 128, + Name: "light-block", }, ParamsChannel: { ID: ParamsChannel, @@ -106,6 +109,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { SendQueueCapacity: 10, RecvMessageCapacity: paramMsgSize, RecvBufferCapacity: 128, + Name: "params", }, } @@ -233,10 +237,7 @@ func NewReactor( // The caller must be sure to execute OnStop to ensure the outbound p2p Channels are // closed. No error is returned. func (r *Reactor) OnStart(ctx context.Context) error { - go r.processCh(ctx, r.snapshotCh, "snapshot") - go r.processCh(ctx, r.chunkCh, "chunk") - go r.processCh(ctx, r.blockCh, "light block") - go r.processCh(ctx, r.paramsCh, "consensus params") + go r.processChannels(ctx, r.snapshotCh, r.chunkCh, r.blockCh, r.paramsCh) go r.processPeerUpdates(ctx) return nil @@ -291,6 +292,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.metrics, ) r.mtx.Unlock() + defer func() { r.mtx.Lock() // reset syncing objects at the close of Sync @@ -780,6 +782,8 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop if sp, ok := r.stateProvider.(*stateProviderP2P); ok { select { case sp.paramsRecvCh <- cp: + case <-ctx.Done(): + return ctx.Err() case <-time.After(time.Second): return errors.New("failed to send consensus params, stateprovider not ready for response") } @@ -797,7 +801,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -811,7 +815,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case SnapshotChannel: err = r.handleSnapshotMessage(ctx, envelope) case ChunkChannel: @@ -821,7 +825,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop case ParamsChannel: err = r.handleParamsMessage(ctx, envelope) default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } return err @@ -831,15 +835,35 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // encountered during message execution will result in a PeerError being sent on // the respective channel. When the reactor is stopped, we will catch the signal // and close the p2p Channel gracefully. -func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) { - iter := ch.Receive(ctx) +func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) { + // make sure that the iterator gets cleaned up in case of error + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + chanTable := make(map[conn.ChannelID]*p2p.Channel, len(chs)) + for idx := range chs { + ch := chs[idx] + chanTable[ch.ID] = ch + } + + iter := p2p.MergedChannelIterator(ctx, chs...) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, ch.ID, envelope); err != nil { + if err := r.handleMessage(ctx, envelope); err != nil { + ch, ok := chanTable[envelope.ChannelID] + if !ok { + r.logger.Error("received impossible message", + "envelope_from", envelope.From, + "envelope_ch", envelope.ChannelID, + "num_chs", len(chanTable), + "err", err, + ) + return + } r.logger.Error("failed to process message", "err", err, - "channel", chName, - "ch_id", ch.ID, + "channel", ch.String(), + "ch_id", envelope.ChannelID, "envelope", envelope) if serr := ch.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, @@ -875,14 +899,15 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda r.mtx.Lock() defer r.mtx.Unlock() + if r.syncer == nil { return } switch peerUpdate.Status { case p2p.PeerStatusUp: - newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) + r.providers[peerUpdate.NodeID] = newProvider err := r.syncer.AddPeer(ctx, peerUpdate.NodeID) if err != nil { diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index c1ac1a048..709fe9a2c 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -257,8 +257,9 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) { rts := setup(ctx, t, nil, nil, 2) rts.chunkInCh <- p2p.Envelope{ - From: types.NodeID("aa"), - Message: &ssproto.SnapshotsRequest{}, + From: types.NodeID("aa"), + ChannelID: ChunkChannel, + Message: &ssproto.SnapshotsRequest{}, } response := <-rts.chunkPeerErrCh @@ -315,8 +316,9 @@ func TestReactor_ChunkRequest(t *testing.T) { rts := setup(ctx, t, conn, nil, 2) rts.chunkInCh <- p2p.Envelope{ - From: types.NodeID("aa"), - Message: tc.request, + From: types.NodeID("aa"), + ChannelID: ChunkChannel, + Message: tc.request, } response := <-rts.chunkOutCh @@ -335,8 +337,9 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) { rts := setup(ctx, t, nil, nil, 2) rts.snapshotInCh <- p2p.Envelope{ - From: types.NodeID("aa"), - Message: &ssproto.ChunkRequest{}, + From: types.NodeID("aa"), + ChannelID: SnapshotChannel, + Message: &ssproto.ChunkRequest{}, } response := <-rts.snapshotPeerErrCh @@ -400,8 +403,9 @@ func TestReactor_SnapshotsRequest(t *testing.T) { rts := setup(ctx, t, conn, nil, 100) rts.snapshotInCh <- p2p.Envelope{ - From: types.NodeID("aa"), - Message: &ssproto.SnapshotsRequest{}, + From: types.NodeID("aa"), + ChannelID: SnapshotChannel, + Message: &ssproto.SnapshotsRequest{}, } if len(tc.expectResponses) > 0 { @@ -457,7 +461,8 @@ func TestReactor_LightBlockResponse(t *testing.T) { rts.stateStore.On("LoadValidators", height).Return(vals, nil) rts.blockInCh <- p2p.Envelope{ - From: types.NodeID("aa"), + From: types.NodeID("aa"), + ChannelID: LightBlockChannel, Message: &ssproto.LightBlockRequest{ Height: 10, }, @@ -733,7 +738,8 @@ func handleLightBlockRequests( require.NoError(t, err) select { case sending <- p2p.Envelope{ - From: envelope.To, + From: envelope.To, + ChannelID: LightBlockChannel, Message: &ssproto.LightBlockResponse{ LightBlock: lb, }, @@ -750,7 +756,8 @@ func handleLightBlockRequests( require.NoError(t, err) select { case sending <- p2p.Envelope{ - From: envelope.To, + From: envelope.To, + ChannelID: LightBlockChannel, Message: &ssproto.LightBlockResponse{ LightBlock: differntLB, }, @@ -761,7 +768,8 @@ func handleLightBlockRequests( case 1: // send nil block i.e. pretend we don't have it select { case sending <- p2p.Envelope{ - From: envelope.To, + From: envelope.To, + ChannelID: LightBlockChannel, Message: &ssproto.LightBlockResponse{ LightBlock: nil, }, @@ -802,7 +810,8 @@ func handleConsensusParamsRequest( } select { case sending <- p2p.Envelope{ - From: envelope.To, + From: envelope.To, + ChannelID: ParamsChannel, Message: &ssproto.ParamsResponse{ Height: msg.Height, ConsensusParams: paramsProto, @@ -913,7 +922,8 @@ func handleSnapshotRequests( require.True(t, ok) for _, snapshot := range snapshots { sendingCh <- p2p.Envelope{ - From: envelope.To, + From: envelope.To, + ChannelID: SnapshotChannel, Message: &ssproto.SnapshotsResponse{ Height: snapshot.Height, Format: snapshot.Format, @@ -946,7 +956,8 @@ func handleChunkRequests( msg, ok := envelope.Message.(*ssproto.ChunkRequest) require.True(t, ok) sendingCh <- p2p.Envelope{ - From: envelope.To, + From: envelope.To, + ChannelID: ChunkChannel, Message: &ssproto.ChunkResponse{ Height: msg.Height, Format: msg.Format,