From 9d1e8eaad46e78d6c9d1caf912ab181c11a780c6 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 5 Apr 2022 09:26:53 -0400 Subject: [PATCH] node: remove channel and peer update initialization from construction (#8238) --- internal/blocksync/reactor.go | 163 +++++++++-------- internal/blocksync/reactor_test.go | 6 +- internal/consensus/byzantine_test.go | 6 +- internal/consensus/invalid_test.go | 5 +- internal/consensus/reactor.go | 251 ++++++++++++++------------- internal/consensus/reactor_test.go | 5 +- internal/evidence/reactor.go | 60 +++---- internal/evidence/reactor_test.go | 11 +- internal/mempool/reactor.go | 80 ++++----- internal/mempool/reactor_test.go | 11 +- internal/p2p/p2ptest/network.go | 1 - internal/p2p/peermanager.go | 5 + internal/p2p/pex/reactor.go | 56 +++--- internal/p2p/pex/reactor_test.go | 31 ++-- internal/p2p/router.go | 22 ++- internal/p2p/router_init_test.go | 14 +- internal/p2p/router_test.go | 10 -- internal/statesync/reactor.go | 242 +++++++++++++------------- internal/statesync/reactor_test.go | 28 +-- internal/statesync/syncer.go | 26 --- node/node.go | 44 ++--- node/node_test.go | 4 +- node/public.go | 2 +- node/seed.go | 13 +- node/setup.go | 58 +++---- 25 files changed, 545 insertions(+), 609 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 34ed1f0fc..6533fa046 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/p2p/conn" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -80,8 +81,8 @@ type Reactor struct { consReactor consensusReactor blockSync *atomicBool - blockSyncCh *p2p.Channel - peerUpdates *p2p.PeerUpdates + chCreator p2p.ChannelCreator + peerEvents p2p.PeerEventSubscriber requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -94,23 +95,17 @@ type Reactor struct { // NewReactor returns new reactor instance. func NewReactor( - ctx context.Context, logger log.Logger, stateStore sm.Store, blockExec *sm.BlockExecutor, store *store.BlockStore, consReactor consensusReactor, channelCreator p2p.ChannelCreator, - peerUpdates *p2p.PeerUpdates, + peerEvents p2p.PeerEventSubscriber, blockSync bool, metrics *consensus.Metrics, eventBus *eventbus.EventBus, -) (*Reactor, error) { - blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor()) - if err != nil { - return nil, err - } - +) *Reactor { r := &Reactor{ logger: logger, stateStore: stateStore, @@ -118,14 +113,14 @@ func NewReactor( store: store, consReactor: consReactor, blockSync: newAtomicBool(blockSync), - blockSyncCh: blockSyncCh, - peerUpdates: peerUpdates, + chCreator: channelCreator, + peerEvents: peerEvents, metrics: metrics, eventBus: eventBus, } r.BaseService = *service.NewBaseService(logger, "BlockSync", r) - return r, nil + return r } // OnStart starts separate go routines for each p2p Channel and listens for @@ -136,6 +131,12 @@ func NewReactor( // If blockSync is enabled, we also start the pool and the pool processing // goroutine. If the pool fails to start, an error is returned. func (r *Reactor) OnStart(ctx context.Context) error { + blockSyncCh, err := r.chCreator(ctx, GetChannelDescriptor()) + if err != nil { + return err + } + r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil } + state, err := r.stateStore.Load() if err != nil { return err @@ -161,13 +162,13 @@ func (r *Reactor) OnStart(ctx context.Context) error { if err := r.pool.Start(ctx); err != nil { return err } - go r.requestRoutine(ctx) + go r.requestRoutine(ctx, blockSyncCh) - go r.poolRoutine(ctx, false) + go r.poolRoutine(ctx, false, blockSyncCh) } - go r.processBlockSyncCh(ctx) - go r.processPeerUpdates(ctx) + go r.processBlockSyncCh(ctx, blockSyncCh) + go r.processPeerUpdates(ctx, r.peerEvents(ctx), blockSyncCh) return nil } @@ -182,7 +183,7 @@ func (r *Reactor) OnStop() { // respondToPeer loads a block and sends it to the requesting peer, if we have it. // Otherwise, we'll respond saying we do not have it. -func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID) error { +func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error { block := r.store.LoadBlock(msg.Height) if block != nil { blockProto, err := block.ToProto() @@ -191,7 +192,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, return err } - return r.blockSyncCh.Send(ctx, p2p.Envelope{ + return blockSyncCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &bcproto.BlockResponse{Block: blockProto}, }) @@ -199,55 +200,16 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) - return r.blockSyncCh.Send(ctx, p2p.Envelope{ + return blockSyncCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &bcproto.NoBlockResponse{Height: msg.Height}, }) } -// handleBlockSyncMessage handles envelopes sent from peers on the -// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown -// for this channel. This should never be called outside of handleMessage. -func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Envelope) error { - logger := r.logger.With("peer", envelope.From) - - switch msg := envelope.Message.(type) { - case *bcproto.BlockRequest: - return r.respondToPeer(ctx, msg, envelope.From) - case *bcproto.BlockResponse: - block, err := types.BlockFromProto(msg.Block) - if err != nil { - logger.Error("failed to convert block from proto", "err", err) - return err - } - - r.pool.AddBlock(envelope.From, block, block.Size()) - - case *bcproto.StatusRequest: - return r.blockSyncCh.Send(ctx, p2p.Envelope{ - To: envelope.From, - Message: &bcproto.StatusResponse{ - Height: r.store.Height(), - Base: r.store.Base(), - }, - }) - case *bcproto.StatusResponse: - r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height) - - case *bcproto.NoBlockResponse: - logger.Debug("peer does not have the requested block", "height", msg.Height) - - default: - return fmt.Errorf("received unknown message: %T", msg) - } - - return nil -} - // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -263,7 +225,39 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop switch chID { case BlockSyncChannel: - err = r.handleBlockSyncMessage(ctx, envelope) + switch msg := envelope.Message.(type) { + case *bcproto.BlockRequest: + return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh) + case *bcproto.BlockResponse: + block, err := types.BlockFromProto(msg.Block) + if err != nil { + r.logger.Error("failed to convert block from proto", + "peer", envelope.From, + "err", err) + return err + } + + r.pool.AddBlock(envelope.From, block, block.Size()) + + case *bcproto.StatusRequest: + return blockSyncCh.Send(ctx, p2p.Envelope{ + To: envelope.From, + Message: &bcproto.StatusResponse{ + Height: r.store.Height(), + Base: r.store.Base(), + }, + }) + case *bcproto.StatusResponse: + r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height) + + case *bcproto.NoBlockResponse: + r.logger.Debug("peer does not have the requested block", + "peer", envelope.From, + "height", msg.Height) + + default: + return fmt.Errorf("received unknown message: %T", msg) + } default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) @@ -277,17 +271,17 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // message execution will result in a PeerError being sent on the BlockSyncChannel. // When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processBlockSyncCh(ctx context.Context) { - iter := r.blockSyncCh.Receive(ctx) +func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) { + iter := blockSyncCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.blockSyncCh.ID, envelope); err != nil { + if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } - r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err) - if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err) + if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -298,7 +292,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { } // processPeerUpdate processes a PeerUpdate. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) // XXX: Pool#RedoRequest can sometimes give us an empty peer. @@ -309,7 +303,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ + if err := blockSyncCh.Send(ctx, p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(), @@ -317,7 +311,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda }, }); err != nil { r.pool.RemovePeer(peerUpdate.NodeID) - if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + if err := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: peerUpdate.NodeID, Err: err, }); err != nil { @@ -333,13 +327,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) { for { select { case <-ctx.Done(): return - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(ctx, peerUpdate) + case peerUpdate := <-peerUpdates.Updates(): + r.processPeerUpdate(ctx, peerUpdate, blockSyncCh) } } } @@ -357,13 +351,18 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { r.syncStartTime = time.Now() - go r.requestRoutine(ctx) - go r.poolRoutine(ctx, true) + bsCh, err := r.chCreator(ctx, GetChannelDescriptor()) + if err != nil { + return err + } + + go r.requestRoutine(ctx, bsCh) + go r.poolRoutine(ctx, true, bsCh) return nil } -func (r *Reactor) requestRoutine(ctx context.Context) { +func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) { statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) defer statusUpdateTicker.Stop() @@ -372,11 +371,11 @@ func (r *Reactor) requestRoutine(ctx context.Context) { case <-ctx.Done(): return case request := <-r.requestsCh: - if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ + if err := blockSyncCh.Send(ctx, p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, }); err != nil { - if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + if err := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: request.PeerID, Err: err, }); err != nil { @@ -384,14 +383,14 @@ func (r *Reactor) requestRoutine(ctx context.Context) { } } case pErr := <-r.errorsCh: - if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + if err := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: pErr.peerID, Err: pErr.err, }); err != nil { return } case <-statusUpdateTicker.C: - if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ + if err := blockSyncCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: &bcproto.StatusRequest{}, }); err != nil { @@ -405,7 +404,7 @@ func (r *Reactor) requestRoutine(ctx context.Context) { // do. // // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { +func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) { var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) @@ -523,7 +522,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { // NOTE: We've already removed the peer's request, but we still need // to clean up the rest. peerID := r.pool.RedoRequest(first.Height) - if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: peerID, Err: err, }); serr != nil { @@ -532,7 +531,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { peerID2 := r.pool.RedoRequest(second.Height) if peerID2 != peerID { - if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: peerID2, Err: err, }); serr != nil { diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index e35d45a74..065d75301 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -190,20 +190,18 @@ func (rts *reactorTestSuite) addNode( chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { return rts.blockSyncChannels[nodeID], nil } - rts.reactors[nodeID], err = NewReactor( - ctx, + rts.reactors[nodeID] = NewReactor( rts.logger.With("nodeID", nodeID), stateStore, blockExec, blockStore, nil, chCreator, - rts.peerUpdates[nodeID], + func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }, rts.blockSync, consensus.NopMetrics(), nil, // eventbus, can be nil ) - require.NoError(t, err) require.NoError(t, rts.reactors[nodeID].Start(ctx)) require.True(t, rts.reactors[nodeID].IsRunning()) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index f631452bd..cfcf8b04f 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -141,8 +141,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // send two votes to all peers (1st to one half, 2nd to another half) i := 0 for _, ps := range bzReactor.peers { + voteCh := rts.voteChannels[bzNodeID] if i < len(bzReactor.peers)/2 { - require.NoError(t, bzReactor.voteCh.Send(ctx, + + require.NoError(t, voteCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.Vote{ @@ -150,7 +152,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { }, })) } else { - require.NoError(t, bzReactor.voteCh.Send(ctx, + require.NoError(t, voteCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.Vote{ diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index f38598af9..93c5cea1b 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -57,7 +57,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { privVal := byzState.privValidator byzState.doPrevote = func(ctx context.Context, height int64, round int32) { defer close(signal) - invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, privVal) + invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, rts.voteChannels[node.NodeID], privVal) } byzState.mtx.Unlock() @@ -107,6 +107,7 @@ func invalidDoPrevoteFunc( round int32, cs *State, r *Reactor, + voteCh *p2p.Channel, pv types.PrivValidator, ) { // routine to: @@ -155,7 +156,7 @@ func invalidDoPrevoteFunc( count := 0 for _, peerID := range ids { count++ - err := r.voteCh.Send(ctx, p2p.Envelope{ + err := voteCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &tmcons.Vote{ Vote: precommit.ToProto(), diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index b11775679..4bc109515 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -126,11 +126,8 @@ type Reactor struct { rs *cstypes.RoundState readySignal chan struct{} // closed when the node is ready to start consensus - stateCh *p2p.Channel - dataCh *p2p.Channel - voteCh *p2p.Channel - voteSetBitsCh *p2p.Channel - peerUpdates *p2p.PeerUpdates + peerEvents p2p.PeerEventSubscriber + chCreator p2p.ChannelCreator } // NewReactor returns a reference to a new consensus reactor, which implements @@ -138,49 +135,25 @@ type Reactor struct { // to relevant p2p Channels and a channel to listen for peer updates on. The // reactor will close all p2p Channels when stopping. func NewReactor( - ctx context.Context, logger log.Logger, cs *State, channelCreator p2p.ChannelCreator, - peerUpdates *p2p.PeerUpdates, + peerEvents p2p.PeerEventSubscriber, eventBus *eventbus.EventBus, waitSync bool, metrics *Metrics, -) (*Reactor, error) { - chans := getChannelDescriptors() - stateCh, err := channelCreator(ctx, chans[StateChannel]) - if err != nil { - return nil, err - } - - dataCh, err := channelCreator(ctx, chans[DataChannel]) - if err != nil { - return nil, err - } - - voteCh, err := channelCreator(ctx, chans[VoteChannel]) - if err != nil { - return nil, err - } - - voteSetBitsCh, err := channelCreator(ctx, chans[VoteSetBitsChannel]) - if err != nil { - return nil, err - } +) *Reactor { r := &Reactor{ - logger: logger, - state: cs, - waitSync: waitSync, - rs: cs.GetRoundState(), - peers: make(map[types.NodeID]*PeerState), - eventBus: eventBus, - Metrics: metrics, - stateCh: stateCh, - dataCh: dataCh, - voteCh: voteCh, - voteSetBitsCh: voteSetBitsCh, - peerUpdates: peerUpdates, - readySignal: make(chan struct{}), + logger: logger, + state: cs, + waitSync: waitSync, + rs: cs.GetRoundState(), + peers: make(map[types.NodeID]*PeerState), + eventBus: eventBus, + Metrics: metrics, + peerEvents: peerEvents, + chCreator: channelCreator, + readySignal: make(chan struct{}), } r.BaseService = *service.NewBaseService(logger, "Consensus", r) @@ -188,7 +161,14 @@ func NewReactor( close(r.readySignal) } - return r, nil + return r +} + +type channelBundle struct { + state *p2p.Channel + data *p2p.Channel + vote *p2p.Channel + votSet *p2p.Channel } // OnStart starts separate go routines for each p2p Channel and listens for @@ -198,13 +178,39 @@ func NewReactor( func (r *Reactor) OnStart(ctx context.Context) error { r.logger.Debug("consensus wait sync", "wait_sync", r.WaitSync()) + peerUpdates := r.peerEvents(ctx) + + var chBundle channelBundle + var err error + + chans := getChannelDescriptors() + chBundle.state, err = r.chCreator(ctx, chans[StateChannel]) + if err != nil { + return err + } + + chBundle.data, err = r.chCreator(ctx, chans[DataChannel]) + if err != nil { + return err + } + + chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel]) + if err != nil { + return err + } + + chBundle.votSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel]) + if err != nil { + return err + } + // start routine that computes peer statistics for evaluating peer quality // // TODO: Evaluate if we need this to be synchronized via WaitGroup as to not // leak the goroutine when stopping the reactor. - go r.peerStatsRoutine(ctx) + go r.peerStatsRoutine(ctx, peerUpdates) - r.subscribeToBroadcastEvents() + r.subscribeToBroadcastEvents(chBundle.state) go r.updateRoundStateRoutine() if !r.WaitSync() { @@ -213,11 +219,11 @@ func (r *Reactor) OnStart(ctx context.Context) error { } } - go r.processStateCh(ctx) - go r.processDataCh(ctx) - go r.processVoteCh(ctx) - go r.processVoteSetBitsCh(ctx) - go r.processPeerUpdates(ctx) + go r.processStateCh(ctx, chBundle) + go r.processDataCh(ctx, chBundle) + go r.processVoteCh(ctx, chBundle) + go r.processVoteSetBitsCh(ctx, chBundle) + go r.processPeerUpdates(ctx, peerUpdates, chBundle) return nil } @@ -318,16 +324,16 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) { return ps, ok } -func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState) error { - return r.stateCh.Send(ctx, p2p.Envelope{ +func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { + return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: makeRoundStepMessage(rs), }) } -func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState) error { +func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { psHeader := rs.ProposalBlockParts.Header() - return r.stateCh.Send(ctx, p2p.Envelope{ + return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: &tmcons.NewValidBlock{ Height: rs.Height, @@ -339,8 +345,8 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes }) } -func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) error { - return r.stateCh.Send(ctx, p2p.Envelope{ +func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error { + return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: &tmcons.HasVote{ Height: vote.Height, @@ -354,14 +360,14 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) // subscribeToBroadcastEvents subscribes for new round steps and votes using the // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. -func (r *Reactor) subscribeToBroadcastEvents() { +func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) { onStopCh := r.state.getOnStopCh() err := r.state.evsw.AddListenerForEvent( listenerIDConsensus, types.EventNewRoundStepValue, func(ctx context.Context, data tmevents.EventData) error { - if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil { + if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState), stateCh); err != nil { return err } select { @@ -382,7 +388,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { listenerIDConsensus, types.EventValidBlockValue, func(ctx context.Context, data tmevents.EventData) error { - return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState)) + return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState), stateCh) }, ) if err != nil { @@ -393,7 +399,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { listenerIDConsensus, types.EventVoteValue, func(ctx context.Context, data tmevents.EventData) error { - return r.broadcastHasVoteMessage(ctx, data.(*types.Vote)) + return r.broadcastHasVoteMessage(ctx, data.(*types.Vote), stateCh) }, ) if err != nil { @@ -411,8 +417,8 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { } } -func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error { - return r.stateCh.Send(ctx, p2p.Envelope{ +func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error { + return stateCh.Send(ctx, p2p.Envelope{ To: peerID, Message: makeRoundStepMessage(r.getRoundState()), }) @@ -438,7 +444,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState { return r.rs } -func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { +func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { @@ -487,7 +493,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta } logger.Debug("sending block part for catchup", "round", prs.Round, "index", index) - _ = r.dataCh.Send(ctx, p2p.Envelope{ + _ = dataCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.BlockPart{ Height: prs.Height, // not our height, so it does not matter. @@ -502,7 +508,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta time.Sleep(r.state.config.PeerGossipSleepDuration) } -func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) { +func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) { logger := r.logger.With("peer", ps.peerID) timer := time.NewTimer(0) @@ -534,7 +540,7 @@ OUTER_LOOP: } logger.Debug("sending block part", "height", prs.Height, "round", prs.Round) - if err := r.dataCh.Send(ctx, p2p.Envelope{ + if err := dataCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.BlockPart{ Height: rs.Height, // this tells peer that this part applies to us @@ -581,7 +587,7 @@ OUTER_LOOP: continue OUTER_LOOP } - r.gossipDataForCatchup(ctx, rs, prs, ps) + r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh) continue OUTER_LOOP } @@ -608,7 +614,7 @@ OUTER_LOOP: propProto := rs.Proposal.ToProto() logger.Debug("sending proposal", "height", prs.Height, "round", prs.Round) - if err := r.dataCh.Send(ctx, p2p.Envelope{ + if err := dataCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.Proposal{ Proposal: *propProto, @@ -631,7 +637,7 @@ OUTER_LOOP: pPolProto := pPol.ToProto() logger.Debug("sending POL", "height", prs.Height, "round", prs.Round) - if err := r.dataCh.Send(ctx, p2p.Envelope{ + if err := dataCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.ProposalPOL{ Height: rs.Height, @@ -659,14 +665,14 @@ OUTER_LOOP: // pickSendVote picks a vote and sends it to the peer. It will return true if // there is a vote to send and false otherwise. -func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader) (bool, error) { +func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) { vote, ok := ps.PickVoteToSend(votes) if !ok { return false, nil } r.logger.Debug("sending vote message", "ps", ps, "vote", vote) - if err := r.voteCh.Send(ctx, p2p.Envelope{ + if err := voteCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.Vote{ Vote: vote.ToProto(), @@ -687,12 +693,13 @@ func (r *Reactor) gossipVotesForHeight( rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, + voteCh *p2p.Channel, ) (bool, error) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) // if there are lastCommits to send... if prs.Step == cstypes.RoundStepNewHeight { - if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil { + if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil { return false, err } else if ok { logger.Debug("picked rs.LastCommit to send") @@ -704,7 +711,7 @@ func (r *Reactor) gossipVotesForHeight( // if there are POL prevotes to send... if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil { + if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil { return false, err } else if ok { logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) @@ -715,7 +722,7 @@ func (r *Reactor) gossipVotesForHeight( // if there are prevotes to send... if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round { - if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil { + if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil { return false, err } else if ok { logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round) @@ -725,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight( // if there are precommits to send... if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round { - if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round)); err != nil { + if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round), voteCh); err != nil { return false, err } else if ok { logger.Debug("picked rs.Precommits(prs.Round) to send", "round", prs.Round) @@ -735,7 +742,7 @@ func (r *Reactor) gossipVotesForHeight( // if there are prevotes to send...(which are needed because of validBlock mechanism) if prs.Round != -1 && prs.Round <= rs.Round { - if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil { + if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil { return false, err } else if ok { logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round) @@ -746,7 +753,7 @@ func (r *Reactor) gossipVotesForHeight( // if there are POLPrevotes to send... if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil { + if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil { return false, err } else if ok { logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) @@ -758,7 +765,7 @@ func (r *Reactor) gossipVotesForHeight( return false, nil } -func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { +func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) { logger := r.logger.With("peer", ps.peerID) // XXX: simple hack to throttle logs upon sleep @@ -790,7 +797,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { // if height matches, then send LastCommit, Prevotes, and Precommits if rs.Height == prs.Height { - if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps); err != nil { + if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil { return } else if ok { continue @@ -799,7 +806,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { // special catchup logic -- if peer is lagging by height 1, send LastCommit if prs.Height != 0 && rs.Height == prs.Height+1 { - if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil { + if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil { return } else if ok { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) @@ -813,7 +820,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { // Load the block commit for prs.Height, which contains precommit // signatures for prs.Height. if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil { - if ok, err := r.pickSendVote(ctx, ps, commit); err != nil { + if ok, err := r.pickSendVote(ctx, ps, commit, voteCh); err != nil { return } else if ok { logger.Debug("picked Catchup commit to send", "height", prs.Height) @@ -847,7 +854,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. -func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { +func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) { timer := time.NewTimer(0) defer timer.Stop() @@ -883,7 +890,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { // maybe send Height/Round/Prevotes if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - if err := r.stateCh.Send(ctx, p2p.Envelope{ + if err := stateCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -904,7 +911,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { // maybe send Height/Round/ProposalPOL if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - if err := r.stateCh.Send(ctx, p2p.Envelope{ + if err := stateCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -925,7 +932,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { // maybe send Height/Round/Precommits if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - if err := r.stateCh.Send(ctx, p2p.Envelope{ + if err := stateCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -950,7 +957,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { if prs.Height <= r.state.blockStore.Height() && prs.Height >= r.state.blockStore.Base() { // maybe send Height/CatchupCommitRound/CatchupCommit if commit := r.state.LoadCommit(prs.Height); commit != nil { - if err := r.stateCh.Send(ctx, p2p.Envelope{ + if err := stateCh.Send(ctx, p2p.Envelope{ To: ps.peerID, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -983,7 +990,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { // be the case, and we spawn all the relevant goroutine to broadcast messages to // the peer. During peer removal, we remove the peer for our set of peers and // signal to all spawned goroutines to gracefully exit in a non-blocking manner. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -1024,14 +1031,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda return } // start goroutines for this peer - go r.gossipDataRoutine(ctx, ps) - go r.gossipVotesRoutine(ctx, ps) - go r.queryMaj23Routine(ctx, ps) + go r.gossipDataRoutine(ctx, ps, chans.data) + go r.gossipVotesRoutine(ctx, ps, chans.vote) + go r.queryMaj23Routine(ctx, ps, chans.state) // Send our state to the peer. If we're block-syncing, broadcast a // RoundStepMessage later upon SwitchToConsensus(). if !r.WaitSync() { - go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }() + go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID, chans.state) }() } }() @@ -1058,7 +1065,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // If we fail to find the peer state for the envelope sender, we perform a no-op // and return. This can happen when we process the envelope after the peer is // removed. -func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error { +func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error { ps, ok := r.GetPeerState(envelope.From) if !ok || ps == nil { r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel") @@ -1128,7 +1135,7 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope eMsg.Votes = *votesProto } - if err := r.voteSetBitsCh.Send(ctx, p2p.Envelope{ + if err := voteSetCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: eMsg, }); err != nil { @@ -1296,7 +1303,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En // the p2p channel. // // NOTE: We block on consensus state for proposals, block parts, and votes. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -1327,17 +1334,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop switch chID { case StateChannel: - err = r.handleStateMessage(ctx, envelope, msgI) - + err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet) case DataChannel: err = r.handleDataMessage(ctx, envelope, msgI) - case VoteChannel: err = r.handleVoteMessage(ctx, envelope, msgI) - case VoteSetBitsChannel: err = r.handleVoteSetBitsMessage(ctx, envelope, msgI) - default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } @@ -1350,13 +1353,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // execution will result in a PeerError being sent on the StateChannel. When // the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processStateCh(ctx context.Context) { - iter := r.stateCh.Receive(ctx) +func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) { + iter := chans.state.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err) - if serr := r.stateCh.SendError(ctx, p2p.PeerError{ + if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err) + if serr := chans.state.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -1371,13 +1374,13 @@ func (r *Reactor) processStateCh(ctx context.Context) { // execution will result in a PeerError being sent on the DataChannel. When // the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processDataCh(ctx context.Context) { - iter := r.dataCh.Receive(ctx) +func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) { + iter := chans.data.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err) - if serr := r.dataCh.SendError(ctx, p2p.PeerError{ + if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err) + if serr := chans.data.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -1392,13 +1395,13 @@ func (r *Reactor) processDataCh(ctx context.Context) { // execution will result in a PeerError being sent on the VoteChannel. When // the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processVoteCh(ctx context.Context) { - iter := r.voteCh.Receive(ctx) +func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) { + iter := chans.vote.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err) - if serr := r.voteCh.SendError(ctx, p2p.PeerError{ + if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err) + if serr := chans.vote.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -1413,18 +1416,18 @@ func (r *Reactor) processVoteCh(ctx context.Context) { // execution will result in a PeerError being sent on the VoteSetBitsChannel. // When the reactor is stopped, we will catch the signal and close the p2p // Channel gracefully. -func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { - iter := r.voteSetBitsCh.Receive(ctx) +func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle) { + iter := chans.votSet.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.voteSetBitsCh.ID, envelope); err != nil { + if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } - r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err) - if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{ + r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err) + if serr := chans.votSet.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -1437,18 +1440,18 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, chans channelBundle) { for { select { case <-ctx.Done(): return - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(ctx, peerUpdate) + case peerUpdate := <-peerUpdates.Updates(): + r.processPeerUpdate(ctx, peerUpdate, chans) } } } -func (r *Reactor) peerStatsRoutine(ctx context.Context) { +func (r *Reactor) peerStatsRoutine(ctx context.Context, peerUpdates *p2p.PeerUpdates) { for { if !r.IsRunning() { r.logger.Info("stopping peerStatsRoutine") @@ -1466,7 +1469,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) { switch msg.Msg.(type) { case *VoteMessage: if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { - r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{ + peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{ NodeID: msg.PeerID, Status: p2p.PeerStatusGood, }) @@ -1474,7 +1477,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) { case *BlockPartMessage: if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { - r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{ + peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{ NodeID: msg.PeerID, Status: p2p.PeerStatusGood, }) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 886e3794a..9c6af5c5b 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -104,16 +104,15 @@ func setup( for nodeID, node := range rts.network.Nodes { state := states[i] - reactor, err := NewReactor(ctx, + reactor := NewReactor( state.logger.With("node", nodeID), state, chCreator(nodeID), - node.MakePeerUpdates(ctx, t), + func(ctx context.Context) *p2p.PeerUpdates { return node.MakePeerUpdates(ctx, t) }, state.eventBus, true, NopMetrics(), ) - require.NoError(t, err) blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: testSubscriber, diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index c52e7e32b..4809df32e 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -47,9 +47,9 @@ type Reactor struct { service.BaseService logger log.Logger - evpool *Pool - evidenceCh *p2p.Channel - peerUpdates *p2p.PeerUpdates + evpool *Pool + chCreator p2p.ChannelCreator + peerEvents p2p.PeerEventSubscriber mtx sync.Mutex @@ -60,28 +60,22 @@ type Reactor struct { // service.Service interface. It accepts a p2p Channel dedicated for handling // envelopes with EvidenceList messages. func NewReactor( - ctx context.Context, logger log.Logger, chCreator p2p.ChannelCreator, - peerUpdates *p2p.PeerUpdates, + peerEvents p2p.PeerEventSubscriber, evpool *Pool, -) (*Reactor, error) { - evidenceCh, err := chCreator(ctx, GetChannelDescriptor()) - if err != nil { - return nil, err - } - +) *Reactor { r := &Reactor{ logger: logger, evpool: evpool, - evidenceCh: evidenceCh, - peerUpdates: peerUpdates, + chCreator: chCreator, + peerEvents: peerEvents, peerRoutines: make(map[types.NodeID]context.CancelFunc), } r.BaseService = *service.NewBaseService(logger, "Evidence", r) - return r, err + return r } // OnStart starts separate go routines for each p2p Channel and listens for @@ -89,18 +83,20 @@ func NewReactor( // messages on that p2p channel accordingly. The caller must be sure to execute // OnStop to ensure the outbound p2p Channels are closed. No error is returned. func (r *Reactor) OnStart(ctx context.Context) error { - go r.processEvidenceCh(ctx) - go r.processPeerUpdates(ctx) + ch, err := r.chCreator(ctx, GetChannelDescriptor()) + if err != nil { + return err + } + + go r.processEvidenceCh(ctx, ch) + go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch) return nil } // OnStop stops the reactor by signaling to all spawned goroutines to exit and // blocking until they all exit. -func (r *Reactor) OnStop() { - // Close the evidence db - r.evpool.Close() -} +func (r *Reactor) OnStop() { r.evpool.Close() } // handleEvidenceMessage handles envelopes sent from peers on the EvidenceChannel. // It returns an error only if the Envelope.Message is unknown for this channel @@ -164,13 +160,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // processEvidenceCh implements a blocking event loop where we listen for p2p // Envelope messages from the evidenceCh. -func (r *Reactor) processEvidenceCh(ctx context.Context) { - iter := r.evidenceCh.Receive(ctx) +func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) { + iter := evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.evidenceCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err) - if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{ + if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil { + r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err) + if serr := evidenceCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -191,7 +187,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) { // connects/disconnects frequently from the broadcasting peer(s). // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -214,7 +210,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda if !ok { pctx, pcancel := context.WithCancel(ctx) r.peerRoutines[peerUpdate.NodeID] = pcancel - go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID) + go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID, evidenceCh) } case p2p.PeerStatusDown: @@ -232,11 +228,11 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) { for { select { - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(ctx, peerUpdate) + case peerUpdate := <-peerUpdates.Updates(): + r.processPeerUpdate(ctx, peerUpdate, evidenceCh) case <-ctx.Done(): return } @@ -254,7 +250,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { // that the peer has already received or may not be ready for. // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) { +func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) { var next *clist.CElement defer func() { @@ -301,7 +297,7 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID // peer may receive this piece of evidence multiple times if it added and // removed frequently from the broadcasting peer. - if err := r.evidenceCh.Send(ctx, p2p.Envelope{ + if err := evidenceCh.Send(ctx, p2p.Envelope{ To: peerID, Message: evProto, }); err != nil { diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 0b2c2fb3b..2fdfa8c60 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -92,21 +92,20 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint require.NoError(t, err) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) - rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) - rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) + pu := p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) + rts.peerUpdates[nodeID] = pu + rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu) rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID]) chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { return rts.evidenceChannels[nodeID], nil } - rts.reactors[nodeID], err = evidence.NewReactor( - ctx, + rts.reactors[nodeID] = evidence.NewReactor( logger, chCreator, - rts.peerUpdates[nodeID], + func(ctx context.Context) *p2p.PeerUpdates { return pu }, rts.pools[nodeID]) - require.NoError(t, err) require.NoError(t, rts.reactors[nodeID].Start(ctx)) require.True(t, rts.reactors[nodeID].IsRunning()) diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 816589c9b..8f83f5006 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -40,13 +40,9 @@ type Reactor struct { mempool *TxMempool ids *IDs - // XXX: Currently, this is the only way to get information about a peer. Ideally, - // we rely on message-oriented communication to get necessary peer data. - // ref: https://github.com/tendermint/tendermint/issues/5670 - peerMgr PeerManager - - mempoolCh *p2p.Channel - peerUpdates *p2p.PeerUpdates + getPeerHeight func(types.NodeID) int64 + peerEvents p2p.PeerEventSubscriber + chCreator p2p.ChannelCreator // observePanic is a function for observing panics that were recovered in methods on // Reactor. observePanic is called with the recovered value. @@ -58,34 +54,27 @@ type Reactor struct { // NewReactor returns a reference to a new reactor. func NewReactor( - ctx context.Context, logger log.Logger, cfg *config.MempoolConfig, - peerMgr PeerManager, txmp *TxMempool, chCreator p2p.ChannelCreator, - peerUpdates *p2p.PeerUpdates, -) (*Reactor, error) { - - ch, err := chCreator(ctx, getChannelDescriptor(cfg)) - if err != nil { - return nil, err - } - + peerEvents p2p.PeerEventSubscriber, + getPeerHeight func(types.NodeID) int64, +) *Reactor { r := &Reactor{ - logger: logger, - cfg: cfg, - peerMgr: peerMgr, - mempool: txmp, - ids: NewMempoolIDs(), - mempoolCh: ch, - peerUpdates: peerUpdates, - peerRoutines: make(map[types.NodeID]context.CancelFunc), - observePanic: defaultObservePanic, + logger: logger, + cfg: cfg, + mempool: txmp, + ids: NewMempoolIDs(), + chCreator: chCreator, + peerEvents: peerEvents, + getPeerHeight: getPeerHeight, + peerRoutines: make(map[types.NodeID]context.CancelFunc), + observePanic: defaultObservePanic, } r.BaseService = *service.NewBaseService(logger, "Mempool", r) - return r, nil + return r } func defaultObservePanic(r interface{}) {} @@ -119,8 +108,13 @@ func (r *Reactor) OnStart(ctx context.Context) error { r.logger.Info("tx broadcasting is disabled") } - go r.processMempoolCh(ctx) - go r.processPeerUpdates(ctx) + ch, err := r.chCreator(ctx, getChannelDescriptor(r.cfg)) + if err != nil { + return err + } + + go r.processMempoolCh(ctx, ch) + go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch) return nil } @@ -203,13 +197,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // processMempoolCh implements a blocking event loop where we listen for p2p // Envelope messages from the mempoolCh. -func (r *Reactor) processMempoolCh(ctx context.Context) { - iter := r.mempoolCh.Receive(ctx) +func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) { + iter := mempoolCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err) - if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{ + if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil { + r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err) + if serr := mempoolCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -224,7 +218,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context) { // goroutine or not. If not, we start one for the newly added peer. For down or // removed peers, we remove the peer from the mempool peer ID set and signal to // stop the tx broadcasting goroutine. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -252,7 +246,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda r.ids.ReserveForPeer(peerUpdate.NodeID) // start a broadcast routine ensuring all txs are forwarded to the peer - go r.broadcastTxRoutine(pctx, peerUpdate.NodeID) + go r.broadcastTxRoutine(pctx, peerUpdate.NodeID, mempoolCh) } } @@ -273,18 +267,18 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) { for { select { case <-ctx.Done(): return - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(ctx, peerUpdate) + case peerUpdate := <-peerUpdates.Updates(): + r.processPeerUpdate(ctx, peerUpdate, mempoolCh) } } } -func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) { +func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) { peerMempoolID := r.ids.GetForPeer(peerID) var nextGossipTx *clist.CElement @@ -325,8 +319,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) { memTx := nextGossipTx.Value.(*WrappedTx) - if r.peerMgr != nil { - height := r.peerMgr.GetHeight(peerID) + if r.getPeerHeight != nil { + height := r.getPeerHeight(peerID) if height > 0 && height < memTx.height-1 { // allow for a lag of one block time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond) @@ -339,7 +333,7 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) { if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok { // Send the mempool tx to the corresponding peer. Note, the peer may be // behind and thus would not be able to process the mempool tx correctly. - if err := r.mempoolCh.Send(ctx, p2p.Envelope{ + if err := mempoolCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &protomem.Txs{ Txs: [][]byte{memTx.tx}, diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 64a8adca1..82a97aeec 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -79,17 +79,14 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode return rts.mempoolChannels[nodeID], nil } - rts.reactors[nodeID], err = NewReactor( - ctx, + rts.reactors[nodeID] = NewReactor( rts.logger.With("nodeID", nodeID), cfg.Mempool, - rts.network.Nodes[nodeID].PeerManager, mempool, chCreator, - rts.peerUpdates[nodeID], + func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }, + rts.network.Nodes[nodeID].PeerManager.GetHeight, ) - - require.NoError(t, err) rts.nodes = append(rts.nodes, nodeID) require.NoError(t, rts.reactors[nodeID].Start(ctx)) @@ -179,7 +176,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { // run the router rts.start(ctx, t) - go primaryReactor.broadcastTxRoutine(ctx, secondary) + go primaryReactor.broadcastTxRoutine(ctx, secondary, rts.mempoolChannels[primary]) wg := &sync.WaitGroup{} for i := 0; i < 50; i++ { diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index fdfc0d45c..6016ce0a5 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -259,7 +259,6 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) require.NoError(t, err) router, err := p2p.NewRouter( - ctx, n.logger, p2p.NopMetrics(), nodeInfo, diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 4044ad569..756551a49 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -828,6 +828,11 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress return addresses } +// PeerEventSubscriber describes the type of the subscription method, to assist +// in isolating reactors specific construction and lifecycle from the +// peer manager. +type PeerEventSubscriber func(context.Context) *PeerUpdates + // Subscribe subscribes to peer updates. The caller must consume the peer // updates in a timely fashion and close the subscription when done, otherwise // the PeerManager will halt. diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 178524af2..1c80763ee 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -80,9 +80,8 @@ type Reactor struct { logger log.Logger peerManager *p2p.PeerManager - pexCh *p2p.Channel - peerUpdates *p2p.PeerUpdates - + chCreator p2p.ChannelCreator + peerEvents p2p.PeerEventSubscriber // list of available peers to loop through and send peer requests to availablePeers map[types.NodeID]struct{} @@ -105,30 +104,23 @@ type Reactor struct { // NewReactor returns a reference to a new reactor. func NewReactor( - ctx context.Context, logger log.Logger, peerManager *p2p.PeerManager, channelCreator p2p.ChannelCreator, - peerUpdates *p2p.PeerUpdates, -) (*Reactor, error) { - - channel, err := channelCreator(ctx, ChannelDescriptor()) - if err != nil { - return nil, err - } - + peerEvents p2p.PeerEventSubscriber, +) *Reactor { r := &Reactor{ logger: logger, peerManager: peerManager, - pexCh: channel, - peerUpdates: peerUpdates, + chCreator: channelCreator, + peerEvents: peerEvents, availablePeers: make(map[types.NodeID]struct{}), requestsSent: make(map[types.NodeID]struct{}), lastReceivedRequests: make(map[types.NodeID]time.Time), } r.BaseService = *service.NewBaseService(logger, "PEX", r) - return r, nil + return r } // OnStart starts separate go routines for each p2p Channel and listens for @@ -136,8 +128,14 @@ func NewReactor( // messages on that p2p channel accordingly. The caller must be sure to execute // OnStop to ensure the outbound p2p Channels are closed. func (r *Reactor) OnStart(ctx context.Context) error { - go r.processPexCh(ctx) - go r.processPeerUpdates(ctx) + channel, err := r.chCreator(ctx, ChannelDescriptor()) + if err != nil { + return err + } + + peerUpdates := r.peerEvents(ctx) + go r.processPexCh(ctx, channel) + go r.processPeerUpdates(ctx, peerUpdates) return nil } @@ -147,11 +145,11 @@ func (r *Reactor) OnStop() {} // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. -func (r *Reactor) processPexCh(ctx context.Context) { +func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { incoming := make(chan *p2p.Envelope) go func() { defer close(incoming) - iter := r.pexCh.Receive(ctx) + iter := pexCh.Receive(ctx) for iter.Next(ctx) { select { case <-ctx.Done(): @@ -177,7 +175,7 @@ func (r *Reactor) processPexCh(ctx context.Context) { case <-timer.C: // Send a request for more peer addresses. - if err := r.sendRequestForPeers(ctx); err != nil { + if err := r.sendRequestForPeers(ctx, pexCh); err != nil { return // TODO(creachadair): Do we really want to stop processing the PEX // channel just because of an error here? @@ -192,11 +190,11 @@ func (r *Reactor) processPexCh(ctx context.Context) { } // A request from another peer, or a response to one of our requests. - dur, err := r.handlePexMessage(ctx, envelope) + dur, err := r.handlePexMessage(ctx, envelope, pexCh) if err != nil { r.logger.Error("failed to process message", - "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) - if serr := r.pexCh.SendError(ctx, p2p.PeerError{ + "ch_id", pexCh.ID, "envelope", envelope, "err", err) + if serr := pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -213,12 +211,12 @@ func (r *Reactor) processPexCh(ctx context.Context) { // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) { for { select { case <-ctx.Done(): return - case peerUpdate := <-r.peerUpdates.Updates(): + case peerUpdate := <-peerUpdates.Updates(): r.processPeerUpdate(peerUpdate) } } @@ -227,7 +225,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { // handlePexMessage handles envelopes sent from peers on the PexChannel. // If an update was received, a new polling interval is returned; otherwise the // duration is 0. -func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) { +func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -246,7 +244,7 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) URL: addr.String(), } } - return 0, r.pexCh.Send(ctx, p2p.Envelope{ + return 0, pexCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &protop2p.PexResponse{Addresses: pexAddresses}, }) @@ -310,7 +308,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // that peer a request for more peer addresses. The chosen peer is moved into // the requestsSent bucket so that we will not attempt to contact them again // until they've replied or updated. -func (r *Reactor) sendRequestForPeers(ctx context.Context) error { +func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error { r.mtx.Lock() defer r.mtx.Unlock() if len(r.availablePeers) == 0 { @@ -325,7 +323,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) error { break } - if err := r.pexCh.Send(ctx, p2p.Envelope{ + if err := pexCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &protop2p.PexRequest{}, }); err != nil { diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 288755e19..325ea72ab 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -23,8 +23,8 @@ import ( const ( checkFrequency = 500 * time.Millisecond defaultBufferSize = 2 - shortWait = 10 * time.Second - longWait = 60 * time.Second + shortWait = 5 * time.Second + longWait = 20 * time.Second firstNode = 0 secondNode = 1 @@ -211,7 +211,8 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) { require.Eventually(t, func() bool { // nolint:scopelint return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9 - }, longWait, checkFrequency) + }, longWait, checkFrequency, + "peer ratio is: %f", testNet.network.Nodes[nodeID].PeerManager.PeerRatio()) } } @@ -303,8 +304,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { return pexCh, nil } - reactor, err := pex.NewReactor(ctx, log.NewNopLogger(), peerManager, chCreator, peerUpdates) - require.NoError(t, err) + reactor := pex.NewReactor(log.NewNopLogger(), peerManager, chCreator, func(_ context.Context) *p2p.PeerUpdates { return peerUpdates }) require.NoError(t, reactor.Start(ctx)) t.Cleanup(reactor.Wait) @@ -381,6 +381,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT idx := 0 for nodeID := range rts.network.Nodes { + // make a copy to avoid getting hit by the range ref + // confusion: + nodeID := nodeID + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) @@ -393,15 +397,12 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT if idx < opts.MockNodes { rts.mocks = append(rts.mocks, nodeID) } else { - var err error - rts.reactors[nodeID], err = pex.NewReactor( - ctx, + rts.reactors[nodeID] = pex.NewReactor( rts.logger.With("nodeID", nodeID), rts.network.Nodes[nodeID].PeerManager, chCreator, - rts.peerUpdates[nodeID], + func(_ context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }, ) - require.NoError(t, err) } rts.nodes = append(rts.nodes, nodeID) @@ -426,9 +427,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT func (r *reactorTestSuite) start(ctx context.Context, t *testing.T) { t.Helper() - for _, reactor := range r.reactors { + for name, reactor := range r.reactors { require.NoError(t, reactor.Start(ctx)) require.True(t, reactor.IsRunning()) + t.Log("started", name) } } @@ -451,15 +453,12 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int return r.pexChannels[nodeID], nil } - var err error - r.reactors[nodeID], err = pex.NewReactor( - ctx, + r.reactors[nodeID] = pex.NewReactor( r.logger.With("nodeID", nodeID), r.network.Nodes[nodeID].PeerManager, chCreator, - r.peerUpdates[nodeID], + func(_ context.Context) *p2p.PeerUpdates { return r.peerUpdates[nodeID] }, ) - require.NoError(t, err) r.nodes = append(r.nodes, nodeID) r.total++ } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 40c24d56b..025769592 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -177,7 +177,6 @@ type Router struct { // listening on appropriate interfaces, and will be closed by the Router when it // stops. func NewRouter( - ctx context.Context, logger log.Logger, metrics *Metrics, nodeInfo types.NodeInfo, @@ -215,13 +214,6 @@ func NewRouter( router.BaseService = service.NewBaseService(logger, "router", router) - qf, err := router.createQueueFactory(ctx) - if err != nil { - return nil, err - } - - router.queueFactory = qf - for _, transport := range transports { for _, protocol := range transport.Protocols() { if _, ok := router.protocolTransports[protocol]; !ok { @@ -941,8 +933,22 @@ func (r *Router) NodeInfo() types.NodeInfo { return r.nodeInfo.Copy() } +func (r *Router) setupQueueFactory(ctx context.Context) error { + qf, err := r.createQueueFactory(ctx) + if err != nil { + return err + } + + r.queueFactory = qf + return nil +} + // OnStart implements service.Service. func (r *Router) OnStart(ctx context.Context) error { + if err := r.setupQueueFactory(ctx); err != nil { + return err + } + for _, transport := range r.transports { for _, endpoint := range r.endpoints { if err := transport.Listen(endpoint); err != nil { diff --git a/internal/p2p/router_init_test.go b/internal/p2p/router_init_test.go index 19b4aa94c..d58c79487 100644 --- a/internal/p2p/router_init_test.go +++ b/internal/p2p/router_init_test.go @@ -23,29 +23,35 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { t.Run("Default", func(t *testing.T) { require.Zero(t, os.Getenv("TM_P2P_QUEUE")) opts := RouterOptions{} - r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.NoError(t, err) + require.NoError(t, r.setupQueueFactory(ctx)) + _, ok := r.queueFactory(1).(*fifoQueue) require.True(t, ok) }) t.Run("Fifo", func(t *testing.T) { opts := RouterOptions{QueueType: queueTypeFifo} - r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.NoError(t, err) + require.NoError(t, r.setupQueueFactory(ctx)) + _, ok := r.queueFactory(1).(*fifoQueue) require.True(t, ok) }) t.Run("Priority", func(t *testing.T) { opts := RouterOptions{QueueType: queueTypePriority} - r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.NoError(t, err) + require.NoError(t, r.setupQueueFactory(ctx)) + q, ok := r.queueFactory(1).(*pqScheduler) require.True(t, ok) defer q.close() }) t.Run("NonExistant", func(t *testing.T) { opts := RouterOptions{QueueType: "fast"} - _, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + _, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.Error(t, err) require.Contains(t, err.Error(), "fast") }) diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 6142dc45f..5facfdaff 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -106,7 +106,6 @@ func TestRouter_Channel_Basic(t *testing.T) { require.NoError(t, err) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -409,7 +408,6 @@ func TestRouter_AcceptPeers(t *testing.T) { sub := peerManager.Subscribe(ctx) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -464,7 +462,6 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { require.NoError(t, err) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -502,7 +499,6 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { require.NoError(t, err) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -554,7 +550,6 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { require.NoError(t, err) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -658,7 +653,6 @@ func TestRouter_DialPeers(t *testing.T) { sub := peerManager.Subscribe(ctx) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -744,7 +738,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { require.True(t, added) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -819,7 +812,6 @@ func TestRouter_EvictPeers(t *testing.T) { sub := peerManager.Subscribe(ctx) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -882,7 +874,6 @@ func TestRouter_ChannelCompatability(t *testing.T) { require.NoError(t, err) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, @@ -938,7 +929,6 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { sub := peerManager.Subscribe(ctx) router, err := p2p.NewRouter( - ctx, log.NewNopLogger(), p2p.NopMetrics(), selfInfo, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index dba520a1c..15abf3ef0 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -16,7 +16,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/p2p/conn" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -139,13 +138,11 @@ type Reactor struct { stateStore sm.Store blockStore *store.BlockStore - conn abciclient.Client - tempDir string - snapshotCh *p2p.Channel - chunkCh *p2p.Channel - blockCh *p2p.Channel - paramsCh *p2p.Channel - peerUpdates *p2p.PeerUpdates + conn abciclient.Client + tempDir string + peerEvents p2p.PeerEventSubscriber + chCreator p2p.ChannelCreator + sendBlockError func(context.Context, p2p.PeerError) error // Dispatcher is used to multiplex light block requests and responses over multiple // peers used by the p2p state provider and in reverse sync. @@ -155,10 +152,13 @@ type Reactor struct { // These will only be set when a state sync is in progress. It is used to feed // received snapshots and chunks into the syncer and manage incoming and outgoing // providers. - mtx sync.RWMutex - syncer *syncer - providers map[types.NodeID]*BlockProvider - stateProvider StateProvider + mtx sync.RWMutex + initSyncer func() *syncer + requestSnaphot func() error + syncer *syncer + providers map[types.NodeID]*BlockProvider + initStateProvider func(ctx context.Context, chainID string, initialHeight int64) error + stateProvider StateProvider eventBus *eventbus.EventBus metrics *Metrics @@ -178,32 +178,13 @@ func NewReactor( logger log.Logger, conn abciclient.Client, channelCreator p2p.ChannelCreator, - peerUpdates *p2p.PeerUpdates, + peerEvents p2p.PeerEventSubscriber, stateStore sm.Store, blockStore *store.BlockStore, tempDir string, ssMetrics *Metrics, eventBus *eventbus.EventBus, -) (*Reactor, error) { - - chDesc := getChannelDescriptors() - - snapshotCh, err := channelCreator(ctx, chDesc[SnapshotChannel]) - if err != nil { - return nil, err - } - chunkCh, err := channelCreator(ctx, chDesc[ChunkChannel]) - if err != nil { - return nil, err - } - blockCh, err := channelCreator(ctx, chDesc[LightBlockChannel]) - if err != nil { - return nil, err - } - paramsCh, err := channelCreator(ctx, chDesc[ParamsChannel]) - if err != nil { - return nil, err - } +) *Reactor { r := &Reactor{ logger: logger, @@ -211,23 +192,19 @@ func NewReactor( initialHeight: initialHeight, cfg: cfg, conn: conn, - snapshotCh: snapshotCh, - chunkCh: chunkCh, - blockCh: blockCh, - paramsCh: paramsCh, - peerUpdates: peerUpdates, + chCreator: channelCreator, + peerEvents: peerEvents, tempDir: tempDir, stateStore: stateStore, blockStore: blockStore, peers: newPeerList(), - dispatcher: NewDispatcher(blockCh), providers: make(map[types.NodeID]*BlockProvider), metrics: ssMetrics, eventBus: eventBus, } r.BaseService = *service.NewBaseService(logger, "StateSync", r) - return r, nil + return r } // OnStart starts separate go routines for each p2p Channel and listens for @@ -237,8 +214,91 @@ func NewReactor( // The caller must be sure to execute OnStop to ensure the outbound p2p Channels are // closed. No error is returned. func (r *Reactor) OnStart(ctx context.Context) error { - go r.processChannels(ctx, r.snapshotCh, r.chunkCh, r.blockCh, r.paramsCh) - go r.processPeerUpdates(ctx) + // construct channels + chDesc := getChannelDescriptors() + snapshotCh, err := r.chCreator(ctx, chDesc[SnapshotChannel]) + if err != nil { + return err + } + chunkCh, err := r.chCreator(ctx, chDesc[ChunkChannel]) + if err != nil { + return err + } + blockCh, err := r.chCreator(ctx, chDesc[LightBlockChannel]) + if err != nil { + return err + } + paramsCh, err := r.chCreator(ctx, chDesc[ParamsChannel]) + if err != nil { + return err + } + + // define constructor and helper functions, that hold + // references to these channels for use later. This is not + // ideal. + r.initSyncer = func() *syncer { + return &syncer{ + logger: r.logger, + stateProvider: r.stateProvider, + conn: r.conn, + snapshots: newSnapshotPool(), + snapshotCh: snapshotCh, + chunkCh: chunkCh, + tempDir: r.tempDir, + fetchers: r.cfg.Fetchers, + retryTimeout: r.cfg.ChunkRequestTimeout, + metrics: r.metrics, + } + } + r.dispatcher = NewDispatcher(blockCh) + r.requestSnaphot = func() error { + // request snapshots from all currently connected peers + return snapshotCh.Send(ctx, p2p.Envelope{ + Broadcast: true, + Message: &ssproto.SnapshotsRequest{}, + }) + } + r.sendBlockError = blockCh.SendError + + r.initStateProvider = func(ctx context.Context, chainID string, initialHeight int64) error { + to := light.TrustOptions{ + Period: r.cfg.TrustPeriod, + Height: r.cfg.TrustHeight, + Hash: r.cfg.TrustHashBytes(), + } + spLogger := r.logger.With("module", "stateprovider") + spLogger.Info("initializing state provider", "trustPeriod", to.Period, + "trustHeight", to.Height, "useP2P", r.cfg.UseP2P) + + if r.cfg.UseP2P { + if err := r.waitForEnoughPeers(ctx, 2); err != nil { + return err + } + + peers := r.peers.All() + providers := make([]provider.Provider, len(peers)) + for idx, p := range peers { + providers[idx] = NewBlockProvider(p, chainID, r.dispatcher) + } + + stateProvider, err := NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, paramsCh, r.logger.With("module", "stateprovider")) + if err != nil { + return fmt.Errorf("failed to initialize P2P state provider: %w", err) + } + r.stateProvider = stateProvider + return nil + } + + stateProvider, err := NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger) + if err != nil { + return fmt.Errorf("failed to initialize RPC state provider: %w", err) + } + r.stateProvider = stateProvider + return nil + } + + go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh) + go r.processPeerUpdates(ctx, r.peerEvents(ctx)) return nil } @@ -281,16 +341,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { return sm.State{}, err } - r.syncer = newSyncer( - r.cfg, - r.logger, - r.conn, - r.stateProvider, - r.snapshotCh, - r.chunkCh, - r.tempDir, - r.metrics, - ) + r.syncer = r.initSyncer() r.mtx.Unlock() defer func() { @@ -301,15 +352,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.mtx.Unlock() }() - requestSnapshotsHook := func() error { - // request snapshots from all currently connected peers - return r.snapshotCh.Send(ctx, p2p.Envelope{ - Broadcast: true, - Message: &ssproto.SnapshotsRequest{}, - }) - } - - state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook) + state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, r.requestSnaphot) if err != nil { return sm.State{}, err } @@ -436,7 +479,7 @@ func (r *Reactor) backfill( r.logger.Info("backfill: fetched light block failed validate basic, removing peer...", "err", err, "height", height) queue.retry(height) - if serr := r.blockCh.SendError(ctx, p2p.PeerError{ + if serr := r.sendBlockError(ctx, p2p.PeerError{ NodeID: peer, Err: fmt.Errorf("received invalid light block: %w", err), }); serr != nil { @@ -473,7 +516,7 @@ func (r *Reactor) backfill( if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) { r.logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID", "trustedHash", w, "receivedHash", g, "height", resp.block.Height) - if err := r.blockCh.SendError(ctx, p2p.PeerError{ + if err := r.sendBlockError(ctx, p2p.PeerError{ NodeID: resp.peer, Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g), }); err != nil { @@ -534,7 +577,7 @@ func (r *Reactor) backfill( // handleSnapshotMessage handles envelopes sent from peers on the // SnapshotChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. -func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope) error { +func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -553,7 +596,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel "peer", envelope.From, ) - if err := r.snapshotCh.Send(ctx, p2p.Envelope{ + if err := snapshotCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &ssproto.SnapshotsResponse{ Height: snapshot.Height, @@ -589,8 +632,8 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel "failed to add snapshot", "height", msg.Height, "format", msg.Format, + "channel", snapshotCh.ID, "err", err, - "channel", r.snapshotCh.ID, ) return nil } @@ -606,7 +649,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel // handleChunkMessage handles envelopes sent from peers on the ChunkChannel. // It returns an error only if the Envelope.Message is unknown for this channel. // This should never be called outside of handleMessage. -func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope) error { +func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.ChunkRequest: r.logger.Debug( @@ -640,7 +683,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope "chunk", msg.Index, "peer", envelope.From, ) - if err := r.chunkCh.Send(ctx, p2p.Envelope{ + if err := chunkCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &ssproto.ChunkResponse{ Height: msg.Height, @@ -695,7 +738,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope return nil } -func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope) error { +func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.LightBlockRequest: r.logger.Info("received light block request", "height", msg.Height) @@ -705,7 +748,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env return err } if lb == nil { - if err := r.blockCh.Send(ctx, p2p.Envelope{ + if err := blockCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &ssproto.LightBlockResponse{ LightBlock: nil, @@ -724,7 +767,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env // NOTE: If we don't have the light block we will send a nil light block // back to the requested node, indicating that we don't have it. - if err := r.blockCh.Send(ctx, p2p.Envelope{ + if err := blockCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &ssproto.LightBlockResponse{ LightBlock: lbproto, @@ -752,7 +795,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env return nil } -func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope) error { +func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.ParamsRequest: r.logger.Debug("received consensus params request", "height", msg.Height) @@ -763,7 +806,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop } cpproto := cp.ToProto() - if err := r.paramsCh.Send(ctx, p2p.Envelope{ + if err := paramsCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &ssproto.ParamsResponse{ Height: msg.Height, @@ -801,7 +844,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -817,13 +860,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er switch envelope.ChannelID { case SnapshotChannel: - err = r.handleSnapshotMessage(ctx, envelope) + err = r.handleSnapshotMessage(ctx, envelope, chans[SnapshotChannel]) case ChunkChannel: - err = r.handleChunkMessage(ctx, envelope) + err = r.handleChunkMessage(ctx, envelope, chans[ChunkChannel]) case LightBlockChannel: - err = r.handleLightBlockMessage(ctx, envelope) + err = r.handleLightBlockMessage(ctx, envelope, chans[LightBlockChannel]) case ParamsChannel: - err = r.handleParamsMessage(ctx, envelope) + err = r.handleParamsMessage(ctx, envelope, chans[ParamsChannel]) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } @@ -840,7 +883,7 @@ func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) { ctx, cancel := context.WithCancel(ctx) defer cancel() - chanTable := make(map[conn.ChannelID]*p2p.Channel, len(chs)) + chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs)) for idx := range chs { ch := chs[idx] chanTable[ch.ID] = ch @@ -849,7 +892,7 @@ func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) { iter := p2p.MergedChannelIterator(ctx, chs...) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, envelope); err != nil { + if err := r.handleMessage(ctx, envelope, chanTable); err != nil { ch, ok := chanTable[envelope.ChannelID] if !ok { r.logger.Error("received impossible message", @@ -930,12 +973,12 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) { for { select { case <-ctx.Done(): return - case peerUpdate := <-r.peerUpdates.Updates(): + case peerUpdate := <-peerUpdates.Updates(): r.processPeerUpdate(ctx, peerUpdate) } } @@ -1040,41 +1083,6 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error { return nil } -func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error { - var err error - to := light.TrustOptions{ - Period: r.cfg.TrustPeriod, - Height: r.cfg.TrustHeight, - Hash: r.cfg.TrustHashBytes(), - } - spLogger := r.logger.With("module", "stateprovider") - spLogger.Info("initializing state provider", "trustPeriod", to.Period, - "trustHeight", to.Height, "useP2P", r.cfg.UseP2P) - - if r.cfg.UseP2P { - if err := r.waitForEnoughPeers(ctx, 2); err != nil { - return err - } - - peers := r.peers.All() - providers := make([]provider.Provider, len(peers)) - for idx, p := range peers { - providers[idx] = NewBlockProvider(p, chainID, r.dispatcher) - } - - r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh, spLogger) - if err != nil { - return fmt.Errorf("failed to initialize P2P state provider: %w", err) - } - } else { - r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger) - if err != nil { - return fmt.Errorf("failed to initialize RPC state provider: %w", err) - } - } - return nil -} - func (r *Reactor) TotalSnapshots() int64 { r.mtx.RLock() defer r.mtx.RUnlock() diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 709fe9a2c..f59a6e4ee 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -154,8 +154,7 @@ func setup( logger := log.NewNopLogger() - var err error - rts.reactor, err = NewReactor( + rts.reactor = NewReactor( ctx, factory.DefaultTestChainID, 1, @@ -163,25 +162,26 @@ func setup( logger.With("component", "reactor"), conn, chCreator, - rts.peerUpdates, + func(context.Context) *p2p.PeerUpdates { return rts.peerUpdates }, rts.stateStore, rts.blockStore, "", m, nil, // eventbus can be nil ) - require.NoError(t, err) - rts.syncer = newSyncer( - *cfg, - logger.With("component", "syncer"), - conn, - stateProvider, - rts.snapshotChannel, - rts.chunkChannel, - "", - rts.reactor.metrics, - ) + rts.syncer = &syncer{ + logger: logger, + stateProvider: stateProvider, + conn: conn, + snapshots: newSnapshotPool(), + snapshotCh: rts.snapshotChannel, + chunkCh: rts.chunkChannel, + tempDir: t.TempDir(), + fetchers: cfg.Fetchers, + retryTimeout: cfg.ChunkRequestTimeout, + metrics: rts.reactor.metrics, + } ctx, cancel := context.WithCancel(ctx) diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 78eb8d53a..c2cca9a7c 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -10,7 +10,6 @@ import ( abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" @@ -72,31 +71,6 @@ type syncer struct { processingSnapshot *snapshot } -// newSyncer creates a new syncer. -func newSyncer( - cfg config.StateSyncConfig, - logger log.Logger, - conn abciclient.Client, - stateProvider StateProvider, - snapshotCh *p2p.Channel, - chunkCh *p2p.Channel, - tempDir string, - metrics *Metrics, -) *syncer { - return &syncer{ - logger: logger, - stateProvider: stateProvider, - conn: conn, - snapshots: newSnapshotPool(), - snapshotCh: snapshotCh, - chunkCh: chunkCh, - tempDir: tempDir, - fetchers: cfg.Fetchers, - retryTimeout: cfg.ChunkRequestTimeout, - metrics: metrics, - } -} - // AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already // been added to the queue, or an error if there's no sync in progress. func (s *syncer) AddChunk(chunk *chunk) (bool, error) { diff --git a/node/node.go b/node/node.go index 34350e09f..7c3ca1268 100644 --- a/node/node.go +++ b/node/node.go @@ -88,12 +88,11 @@ func newDefaultNode( } if cfg.Mode == config.ModeSeed { return makeSeedNode( - ctx, + logger, cfg, config.DefaultDBProvider, nodeKey, defaultGenesisDocProviderFunc(cfg), - logger, ) } pval, err := makeDefaultPrivval(cfg) @@ -244,7 +243,7 @@ func makeNode( // TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Use a persistent peer database. - nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state) + nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } @@ -257,24 +256,21 @@ func makeNode( makeCloser(closers)) } - router, err := createRouter(ctx, logger, nodeMetrics.p2p, nodeInfo, nodeKey, - peerManager, cfg, proxyApp) + router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, peerManager, cfg, proxyApp) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), makeCloser(closers)) } - mpReactor, mp, err := createMempoolReactor(ctx, - cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger, - ) + mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool, + peerManager.Subscribe, router.OpenChannel, peerManager.GetHeight) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } - evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx, - cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus, - ) + evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider, + stateStore, blockStore, peerManager.Subscribe, router.OpenChannel, nodeMetrics.evidence, eventBus) closers = append(closers, edbCloser) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -295,11 +291,12 @@ func makeNode( // Determine whether we should do block sync. This must happen after the handshake, since the // app may modify the validator set, specifying ourself as the only validator. blockSync := !onlyValidatorIsUs(state, pubKey) + waitSync := stateSync || blockSync csReactor, csState, err := createConsensusReactor(ctx, cfg, stateStore, blockExec, blockStore, mp, evPool, - privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, - peerManager, router, logger, + privValidator, nodeMetrics.consensus, waitSync, eventBus, + peerManager, router.OpenChannel, logger, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -307,23 +304,18 @@ func makeNode( // Create the blockchain reactor. Note, we do not start block sync if we're // doing a state sync first. - bcReactor, err := blocksync.NewReactor(ctx, + bcReactor := blocksync.NewReactor( logger.With("module", "blockchain"), stateStore, blockExec, blockStore, csReactor, router.OpenChannel, - peerManager.Subscribe(ctx), + peerManager.Subscribe, blockSync && !stateSync, nodeMetrics.consensus, eventBus, ) - if err != nil { - return nil, combineCloseError( - fmt.Errorf("could not create blocksync reactor: %w", err), - makeCloser(closers)) - } // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. @@ -337,7 +329,7 @@ func makeNode( // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactor, err := statesync.NewReactor( + stateSyncReactor := statesync.NewReactor( ctx, genDoc.ChainID, genDoc.InitialHeight, @@ -345,23 +337,17 @@ func makeNode( logger.With("module", "statesync"), proxyApp, router.OpenChannel, - peerManager.Subscribe(ctx), + peerManager.Subscribe, stateStore, blockStore, cfg.StateSync.TempDir, nodeMetrics.statesync, eventBus, ) - if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } var pexReactor service.Service = service.NopService{} if cfg.P2P.PexReactor { - pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx)) - if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } + pexReactor = pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe) } node := &nodeImpl{ config: cfg, diff --git a/node/node_test.go b/node/node_test.go index a03d7286e..1a1fa6f81 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -581,12 +581,12 @@ func TestNodeNewSeedNode(t *testing.T) { logger := log.NewNopLogger() - ns, err := makeSeedNode(ctx, + ns, err := makeSeedNode( + logger, cfg, config.DefaultDBProvider, nodeKey, defaultGenesisDocProviderFunc(cfg), - logger, ) t.Cleanup(ns.Wait) t.Cleanup(leaktest.CheckTimeout(t, time.Second)) diff --git a/node/public.go b/node/public.go index af3aece8e..db292833e 100644 --- a/node/public.go +++ b/node/public.go @@ -68,7 +68,7 @@ func New( config.DefaultDBProvider, logger) case config.ModeSeed: - return makeSeedNode(ctx, conf, config.DefaultDBProvider, nodeKey, genProvider, logger) + return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider) default: return nil, fmt.Errorf("%q is not a valid mode", conf.Mode) } diff --git a/node/seed.go b/node/seed.go index 970896cc6..6da7ff05f 100644 --- a/node/seed.go +++ b/node/seed.go @@ -40,12 +40,11 @@ type seedNodeImpl struct { // makeSeedNode returns a new seed node, containing only p2p, pex reactor func makeSeedNode( - ctx context.Context, + logger log.Logger, cfg *config.Config, dbProvider config.DBProvider, nodeKey types.NodeKey, genesisDocProvider genesisDocProvider, - logger log.Logger, ) (service.Service, error) { if !cfg.P2P.PexReactor { return nil, errors.New("cannot run seed nodes with PEX disabled") @@ -76,19 +75,13 @@ func makeSeedNode( closer) } - router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey, - peerManager, cfg, nil) + router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, peerManager, cfg, nil) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), closer) } - pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx)) - if err != nil { - return nil, combineCloseError(err, closer) - } - node := &seedNodeImpl{ config: cfg, logger: logger, @@ -101,7 +94,7 @@ func makeSeedNode( shutdownOps: closer, - pexReactor: pexReactor, + pexReactor: pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe), } node.BaseService = *service.NewBaseService(logger, "SeedNode", node) diff --git a/node/setup.go b/node/setup.go index 48ffcb073..07626d611 100644 --- a/node/setup.go +++ b/node/setup.go @@ -169,14 +169,14 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { } func createMempoolReactor( - ctx context.Context, + logger log.Logger, cfg *config.Config, appClient abciclient.Client, store sm.Store, memplMetrics *mempool.Metrics, - peerManager *p2p.PeerManager, - router *p2p.Router, - logger log.Logger, + peerEvents p2p.PeerEventSubscriber, + chCreator p2p.ChannelCreator, + peerHeight func(types.NodeID) int64, ) (service.Service, mempool.Mempool, error) { logger = logger.With("module", "mempool") @@ -189,18 +189,14 @@ func createMempoolReactor( mempool.WithPostCheck(sm.TxPostCheckFromStore(store)), ) - reactor, err := mempool.NewReactor( - ctx, + reactor := mempool.NewReactor( logger, cfg.Mempool, - peerManager, mp, - router.OpenChannel, - peerManager.Subscribe(ctx), + chCreator, + peerEvents, + peerHeight, ) - if err != nil { - return nil, nil, err - } if cfg.Consensus.WaitForTxs() { mp.EnableTxsAvailable() @@ -210,14 +206,13 @@ func createMempoolReactor( } func createEvidenceReactor( - ctx context.Context, + logger log.Logger, cfg *config.Config, dbProvider config.DBProvider, store sm.Store, blockStore *store.BlockStore, - peerManager *p2p.PeerManager, - router *p2p.Router, - logger log.Logger, + peerEvents p2p.PeerEventSubscriber, + chCreator p2p.ChannelCreator, metrics *evidence.Metrics, eventBus *eventbus.EventBus, ) (*evidence.Reactor, *evidence.Pool, closer, error) { @@ -231,16 +226,12 @@ func createEvidenceReactor( evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus) - evidenceReactor, err := evidence.NewReactor( - ctx, + evidenceReactor := evidence.NewReactor( logger, - router.OpenChannel, - peerManager.Subscribe(ctx), + chCreator, + peerEvents, evidencePool, ) - if err != nil { - return nil, nil, dbCloser, fmt.Errorf("creating evidence reactor: %w", err) - } return evidenceReactor, evidencePool, dbCloser, nil } @@ -258,7 +249,7 @@ func createConsensusReactor( waitSync bool, eventBus *eventbus.EventBus, peerManager *p2p.PeerManager, - router *p2p.Router, + chCreator p2p.ChannelCreator, logger log.Logger, ) (*consensus.Reactor, *consensus.State, error) { logger = logger.With("module", "consensus") @@ -282,20 +273,15 @@ func createConsensusReactor( consensusState.SetPrivValidator(ctx, privValidator) } - reactor, err := consensus.NewReactor( - ctx, + reactor := consensus.NewReactor( logger, consensusState, - router.OpenChannel, - peerManager.Subscribe(ctx), + chCreator, + peerManager.Subscribe, eventBus, waitSync, csMetrics, ) - if err != nil { - return nil, nil, err - } - return reactor, consensusState, nil } @@ -375,7 +361,6 @@ func createPeerManager( } func createRouter( - ctx context.Context, logger log.Logger, p2pMetrics *p2p.Metrics, nodeInfo types.NodeInfo, @@ -405,7 +390,6 @@ func createRouter( } return p2p.NewRouter( - ctx, p2pLogger, p2pMetrics, nodeInfo, @@ -422,7 +406,7 @@ func makeNodeInfo( nodeKey types.NodeKey, eventSinks []indexer.EventSink, genDoc *types.GenesisDoc, - state sm.State, + versionInfo version.Consensus, ) (types.NodeInfo, error) { txIndexerStatus := "off" @@ -434,8 +418,8 @@ func makeNodeInfo( nodeInfo := types.NodeInfo{ ProtocolVersion: types.ProtocolVersion{ P2P: version.P2PProtocol, // global - Block: state.Version.Consensus.Block, - App: state.Version.Consensus.App, + Block: versionInfo.Block, + App: versionInfo.App, }, NodeID: nodeKey.ID, Network: genDoc.ChainID,