From 10402b728f6f9ec9cdb43ed103c31f621aed2fb1 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 7 Jan 2022 10:59:10 -0500 Subject: [PATCH] consensus+p2p: change how consensus reactor is constructed (#7525) --- internal/consensus/byzantine_test.go | 5 +++- internal/consensus/reactor.go | 41 ++++++++++++++++++++-------- internal/consensus/reactor_test.go | 25 +++++++++++++---- internal/p2p/router.go | 5 ++++ node/setup.go | 23 ++++------------ 5 files changed, 64 insertions(+), 35 deletions(-) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index fa4074d2e..c69c81672 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -256,10 +256,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } msg, err := s.Next(ctx) - if !assert.NoError(t, err) { + + assert.NoError(t, err) + if err != nil { cancel() return } + require.NotNil(t, msg) block := msg.Data().(types.EventDataNewBlock).Block if len(block.Evidence.Evidence) != 0 { diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 25a63d78b..6c1bbae5f 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -28,9 +28,9 @@ var ( // GetChannelDescriptor produces an instance of a descriptor for this // package's required channels. -func GetChannelDescriptors() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { +func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { + return map[p2p.ChannelID]*p2p.ChannelDescriptor{ + StateChannel: { ID: StateChannel, MessageType: new(tmcons.Message), Priority: 8, @@ -38,7 +38,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor { RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 128, }, - { + DataChannel: { // TODO: Consider a split between gossiping current block and catchup // stuff. Once we gossip the whole block there is nothing left to send // until next height or round. @@ -49,7 +49,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor { RecvBufferCapacity: 512, RecvMessageCapacity: maxMsgSize, }, - { + VoteChannel: { ID: VoteChannel, MessageType: new(tmcons.Message), Priority: 10, @@ -57,7 +57,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor { RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, }, - { + VoteSetBitsChannel: { ID: VoteSetBitsChannel, MessageType: new(tmcons.Message), Priority: 5, @@ -131,17 +131,34 @@ type Reactor struct { // to relevant p2p Channels and a channel to listen for peer updates on. The // reactor will close all p2p Channels when stopping. func NewReactor( + ctx context.Context, logger log.Logger, cs *State, - stateCh *p2p.Channel, - dataCh *p2p.Channel, - voteCh *p2p.Channel, - voteSetBitsCh *p2p.Channel, + channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, waitSync bool, metrics *Metrics, -) *Reactor { +) (*Reactor, error) { + chans := getChannelDescriptors() + stateCh, err := channelCreator(ctx, chans[StateChannel]) + if err != nil { + return nil, err + } + dataCh, err := channelCreator(ctx, chans[DataChannel]) + if err != nil { + return nil, err + } + + voteCh, err := channelCreator(ctx, chans[VoteChannel]) + if err != nil { + return nil, err + } + + voteSetBitsCh, err := channelCreator(ctx, chans[VoteSetBitsChannel]) + if err != nil { + return nil, err + } r := &Reactor{ logger: logger, state: cs, @@ -156,7 +173,7 @@ func NewReactor( } r.BaseService = *service.NewBaseService(logger, "Consensus", r) - return r + return r, nil } // OnStart starts separate go routines for each p2p Channel and listens for diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 5a9eda31c..a717c4b8b 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -83,21 +83,36 @@ func setup( ctx, cancel := context.WithCancel(ctx) // Canceled during cleanup (see below). + chCreator := func(nodeID types.NodeID) p2p.ChannelCreator { + return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + switch desc.ID { + case StateChannel: + return rts.stateChannels[nodeID], nil + case DataChannel: + return rts.dataChannels[nodeID], nil + case VoteChannel: + return rts.voteChannels[nodeID], nil + case VoteSetBitsChannel: + return rts.voteSetBitsChannels[nodeID], nil + default: + return nil, fmt.Errorf("invalid channel; %v", desc.ID) + } + } + } + i := 0 for nodeID, node := range rts.network.Nodes { state := states[i] - reactor := NewReactor( + reactor, err := NewReactor(ctx, state.logger.With("node", nodeID), state, - rts.stateChannels[nodeID], - rts.dataChannels[nodeID], - rts.voteChannels[nodeID], - rts.voteSetBitsChannels[nodeID], + chCreator(nodeID), node.MakePeerUpdates(ctx, t), true, NopMetrics(), ) + require.NoError(t, err) reactor.SetEventBus(state.eventBus) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index c18ac2c85..6c6bf78ed 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -254,6 +254,11 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error } } +// ChannelCreator allows routers to construct their own channels, +// either by receiving a reference to Router.OpenChannel or using some +// kind shim for testing purposes. +type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error) + // OpenChannel opens a new channel for the given message type. The caller must // close the channel when done, before stopping the Router. messageType is the // type of message passed through the channel (used for unmarshaling), which can diff --git a/node/setup.go b/node/setup.go index 0d257440e..af8a4b789 100644 --- a/node/setup.go +++ b/node/setup.go @@ -313,29 +313,18 @@ func createConsensusReactor( consensusState.SetPrivValidator(ctx, privValidator) } - csChDesc := consensus.GetChannelDescriptors() - channels := make(map[p2p.ChannelID]*p2p.Channel, len(csChDesc)) - for idx := range csChDesc { - chd := csChDesc[idx] - ch, err := router.OpenChannel(ctx, chd) - if err != nil { - return nil, nil, err - } - - channels[ch.ID] = ch - } - - reactor := consensus.NewReactor( + reactor, err := consensus.NewReactor( + ctx, logger, consensusState, - channels[consensus.StateChannel], - channels[consensus.DataChannel], - channels[consensus.VoteChannel], - channels[consensus.VoteSetBitsChannel], + router.OpenChannel, peerManager.Subscribe(ctx), waitSync, csMetrics, ) + if err != nil { + return nil, nil, err + } // Services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor.