From 9d1e8eaad46e78d6c9d1caf912ab181c11a780c6 Mon Sep 17 00:00:00 2001
From: Sam Kleinman
Date: Tue, 5 Apr 2022 09:26:53 -0400
Subject: [PATCH 1/3] node: remove channel and peer update initialization from
construction (#8238)
---
internal/blocksync/reactor.go | 163 +++++++++--------
internal/blocksync/reactor_test.go | 6 +-
internal/consensus/byzantine_test.go | 6 +-
internal/consensus/invalid_test.go | 5 +-
internal/consensus/reactor.go | 251 ++++++++++++++-------------
internal/consensus/reactor_test.go | 5 +-
internal/evidence/reactor.go | 60 +++----
internal/evidence/reactor_test.go | 11 +-
internal/mempool/reactor.go | 80 ++++-----
internal/mempool/reactor_test.go | 11 +-
internal/p2p/p2ptest/network.go | 1 -
internal/p2p/peermanager.go | 5 +
internal/p2p/pex/reactor.go | 56 +++---
internal/p2p/pex/reactor_test.go | 31 ++--
internal/p2p/router.go | 22 ++-
internal/p2p/router_init_test.go | 14 +-
internal/p2p/router_test.go | 10 --
internal/statesync/reactor.go | 242 +++++++++++++-------------
internal/statesync/reactor_test.go | 28 +--
internal/statesync/syncer.go | 26 ---
node/node.go | 44 ++---
node/node_test.go | 4 +-
node/public.go | 2 +-
node/seed.go | 13 +-
node/setup.go | 58 +++----
25 files changed, 545 insertions(+), 609 deletions(-)
diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go
index 34ed1f0fc..6533fa046 100644
--- a/internal/blocksync/reactor.go
+++ b/internal/blocksync/reactor.go
@@ -11,6 +11,7 @@ import (
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
+ "github.com/tendermint/tendermint/internal/p2p/conn"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -80,8 +81,8 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool
- blockSyncCh *p2p.Channel
- peerUpdates *p2p.PeerUpdates
+ chCreator p2p.ChannelCreator
+ peerEvents p2p.PeerEventSubscriber
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@@ -94,23 +95,17 @@ type Reactor struct {
// NewReactor returns new reactor instance.
func NewReactor(
- ctx context.Context,
logger log.Logger,
stateStore sm.Store,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
channelCreator p2p.ChannelCreator,
- peerUpdates *p2p.PeerUpdates,
+ peerEvents p2p.PeerEventSubscriber,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
-) (*Reactor, error) {
- blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
- if err != nil {
- return nil, err
- }
-
+) *Reactor {
r := &Reactor{
logger: logger,
stateStore: stateStore,
@@ -118,14 +113,14 @@ func NewReactor(
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
- blockSyncCh: blockSyncCh,
- peerUpdates: peerUpdates,
+ chCreator: channelCreator,
+ peerEvents: peerEvents,
metrics: metrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
- return r, nil
+ return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -136,6 +131,12 @@ func NewReactor(
// If blockSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
+ blockSyncCh, err := r.chCreator(ctx, GetChannelDescriptor())
+ if err != nil {
+ return err
+ }
+ r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
+
state, err := r.stateStore.Load()
if err != nil {
return err
@@ -161,13 +162,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
if err := r.pool.Start(ctx); err != nil {
return err
}
- go r.requestRoutine(ctx)
+ go r.requestRoutine(ctx, blockSyncCh)
- go r.poolRoutine(ctx, false)
+ go r.poolRoutine(ctx, false, blockSyncCh)
}
- go r.processBlockSyncCh(ctx)
- go r.processPeerUpdates(ctx)
+ go r.processBlockSyncCh(ctx, blockSyncCh)
+ go r.processPeerUpdates(ctx, r.peerEvents(ctx), blockSyncCh)
return nil
}
@@ -182,7 +183,7 @@ func (r *Reactor) OnStop() {
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
-func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID) error {
+func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
block := r.store.LoadBlock(msg.Height)
if block != nil {
blockProto, err := block.ToProto()
@@ -191,7 +192,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
return err
}
- return r.blockSyncCh.Send(ctx, p2p.Envelope{
+ return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
})
@@ -199,55 +200,16 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
- return r.blockSyncCh.Send(ctx, p2p.Envelope{
+ return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
-// handleBlockSyncMessage handles envelopes sent from peers on the
-// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown
-// for this channel. This should never be called outside of handleMessage.
-func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Envelope) error {
- logger := r.logger.With("peer", envelope.From)
-
- switch msg := envelope.Message.(type) {
- case *bcproto.BlockRequest:
- return r.respondToPeer(ctx, msg, envelope.From)
- case *bcproto.BlockResponse:
- block, err := types.BlockFromProto(msg.Block)
- if err != nil {
- logger.Error("failed to convert block from proto", "err", err)
- return err
- }
-
- r.pool.AddBlock(envelope.From, block, block.Size())
-
- case *bcproto.StatusRequest:
- return r.blockSyncCh.Send(ctx, p2p.Envelope{
- To: envelope.From,
- Message: &bcproto.StatusResponse{
- Height: r.store.Height(),
- Base: r.store.Base(),
- },
- })
- case *bcproto.StatusResponse:
- r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
-
- case *bcproto.NoBlockResponse:
- logger.Debug("peer does not have the requested block", "height", msg.Height)
-
- default:
- return fmt.Errorf("received unknown message: %T", msg)
- }
-
- return nil
-}
-
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
-func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
+func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -263,7 +225,39 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case BlockSyncChannel:
- err = r.handleBlockSyncMessage(ctx, envelope)
+ switch msg := envelope.Message.(type) {
+ case *bcproto.BlockRequest:
+ return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
+ case *bcproto.BlockResponse:
+ block, err := types.BlockFromProto(msg.Block)
+ if err != nil {
+ r.logger.Error("failed to convert block from proto",
+ "peer", envelope.From,
+ "err", err)
+ return err
+ }
+
+ r.pool.AddBlock(envelope.From, block, block.Size())
+
+ case *bcproto.StatusRequest:
+ return blockSyncCh.Send(ctx, p2p.Envelope{
+ To: envelope.From,
+ Message: &bcproto.StatusResponse{
+ Height: r.store.Height(),
+ Base: r.store.Base(),
+ },
+ })
+ case *bcproto.StatusResponse:
+ r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
+
+ case *bcproto.NoBlockResponse:
+ r.logger.Debug("peer does not have the requested block",
+ "peer", envelope.From,
+ "height", msg.Height)
+
+ default:
+ return fmt.Errorf("received unknown message: %T", msg)
+ }
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
@@ -277,17 +271,17 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// message execution will result in a PeerError being sent on the BlockSyncChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
-func (r *Reactor) processBlockSyncCh(ctx context.Context) {
- iter := r.blockSyncCh.Receive(ctx)
+func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
+ iter := blockSyncCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.blockSyncCh.ID, envelope); err != nil {
+ if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
- r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
- if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
+ r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err)
+ if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -298,7 +292,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
}
// processPeerUpdate processes a PeerUpdate.
-func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
+func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
@@ -309,7 +303,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// send a status update the newly added peer
- if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
+ if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
@@ -317,7 +311,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
},
}); err != nil {
r.pool.RemovePeer(peerUpdate.NodeID)
- if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
+ if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerUpdate.NodeID,
Err: err,
}); err != nil {
@@ -333,13 +327,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
-func (r *Reactor) processPeerUpdates(ctx context.Context) {
+func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
for {
select {
case <-ctx.Done():
return
- case peerUpdate := <-r.peerUpdates.Updates():
- r.processPeerUpdate(ctx, peerUpdate)
+ case peerUpdate := <-peerUpdates.Updates():
+ r.processPeerUpdate(ctx, peerUpdate, blockSyncCh)
}
}
}
@@ -357,13 +351,18 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
r.syncStartTime = time.Now()
- go r.requestRoutine(ctx)
- go r.poolRoutine(ctx, true)
+ bsCh, err := r.chCreator(ctx, GetChannelDescriptor())
+ if err != nil {
+ return err
+ }
+
+ go r.requestRoutine(ctx, bsCh)
+ go r.poolRoutine(ctx, true, bsCh)
return nil
}
-func (r *Reactor) requestRoutine(ctx context.Context) {
+func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
@@ -372,11 +371,11 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
case <-ctx.Done():
return
case request := <-r.requestsCh:
- if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
+ if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}); err != nil {
- if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
+ if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: request.PeerID,
Err: err,
}); err != nil {
@@ -384,14 +383,14 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
}
}
case pErr := <-r.errorsCh:
- if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
+ if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID,
Err: pErr.err,
}); err != nil {
return
}
case <-statusUpdateTicker.C:
- if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
+ if err := blockSyncCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}); err != nil {
@@ -405,7 +404,7 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
-func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
+func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
@@ -523,7 +522,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
- if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
+ if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID,
Err: err,
}); serr != nil {
@@ -532,7 +531,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
- if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
+ if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID2,
Err: err,
}); serr != nil {
diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go
index e35d45a74..065d75301 100644
--- a/internal/blocksync/reactor_test.go
+++ b/internal/blocksync/reactor_test.go
@@ -190,20 +190,18 @@ func (rts *reactorTestSuite) addNode(
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}
- rts.reactors[nodeID], err = NewReactor(
- ctx,
+ rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
stateStore,
blockExec,
blockStore,
nil,
chCreator,
- rts.peerUpdates[nodeID],
+ func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
rts.blockSync,
consensus.NopMetrics(),
nil, // eventbus, can be nil
)
- require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())
diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go
index f631452bd..cfcf8b04f 100644
--- a/internal/consensus/byzantine_test.go
+++ b/internal/consensus/byzantine_test.go
@@ -141,8 +141,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// send two votes to all peers (1st to one half, 2nd to another half)
i := 0
for _, ps := range bzReactor.peers {
+ voteCh := rts.voteChannels[bzNodeID]
if i < len(bzReactor.peers)/2 {
- require.NoError(t, bzReactor.voteCh.Send(ctx,
+
+ require.NoError(t, voteCh.Send(ctx,
p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
@@ -150,7 +152,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
},
}))
} else {
- require.NoError(t, bzReactor.voteCh.Send(ctx,
+ require.NoError(t, voteCh.Send(ctx,
p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go
index f38598af9..93c5cea1b 100644
--- a/internal/consensus/invalid_test.go
+++ b/internal/consensus/invalid_test.go
@@ -57,7 +57,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
privVal := byzState.privValidator
byzState.doPrevote = func(ctx context.Context, height int64, round int32) {
defer close(signal)
- invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, privVal)
+ invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, rts.voteChannels[node.NodeID], privVal)
}
byzState.mtx.Unlock()
@@ -107,6 +107,7 @@ func invalidDoPrevoteFunc(
round int32,
cs *State,
r *Reactor,
+ voteCh *p2p.Channel,
pv types.PrivValidator,
) {
// routine to:
@@ -155,7 +156,7 @@ func invalidDoPrevoteFunc(
count := 0
for _, peerID := range ids {
count++
- err := r.voteCh.Send(ctx, p2p.Envelope{
+ err := voteCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &tmcons.Vote{
Vote: precommit.ToProto(),
diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go
index b11775679..4bc109515 100644
--- a/internal/consensus/reactor.go
+++ b/internal/consensus/reactor.go
@@ -126,11 +126,8 @@ type Reactor struct {
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus
- stateCh *p2p.Channel
- dataCh *p2p.Channel
- voteCh *p2p.Channel
- voteSetBitsCh *p2p.Channel
- peerUpdates *p2p.PeerUpdates
+ peerEvents p2p.PeerEventSubscriber
+ chCreator p2p.ChannelCreator
}
// NewReactor returns a reference to a new consensus reactor, which implements
@@ -138,49 +135,25 @@ type Reactor struct {
// to relevant p2p Channels and a channel to listen for peer updates on. The
// reactor will close all p2p Channels when stopping.
func NewReactor(
- ctx context.Context,
logger log.Logger,
cs *State,
channelCreator p2p.ChannelCreator,
- peerUpdates *p2p.PeerUpdates,
+ peerEvents p2p.PeerEventSubscriber,
eventBus *eventbus.EventBus,
waitSync bool,
metrics *Metrics,
-) (*Reactor, error) {
- chans := getChannelDescriptors()
- stateCh, err := channelCreator(ctx, chans[StateChannel])
- if err != nil {
- return nil, err
- }
-
- dataCh, err := channelCreator(ctx, chans[DataChannel])
- if err != nil {
- return nil, err
- }
-
- voteCh, err := channelCreator(ctx, chans[VoteChannel])
- if err != nil {
- return nil, err
- }
-
- voteSetBitsCh, err := channelCreator(ctx, chans[VoteSetBitsChannel])
- if err != nil {
- return nil, err
- }
+) *Reactor {
r := &Reactor{
- logger: logger,
- state: cs,
- waitSync: waitSync,
- rs: cs.GetRoundState(),
- peers: make(map[types.NodeID]*PeerState),
- eventBus: eventBus,
- Metrics: metrics,
- stateCh: stateCh,
- dataCh: dataCh,
- voteCh: voteCh,
- voteSetBitsCh: voteSetBitsCh,
- peerUpdates: peerUpdates,
- readySignal: make(chan struct{}),
+ logger: logger,
+ state: cs,
+ waitSync: waitSync,
+ rs: cs.GetRoundState(),
+ peers: make(map[types.NodeID]*PeerState),
+ eventBus: eventBus,
+ Metrics: metrics,
+ peerEvents: peerEvents,
+ chCreator: channelCreator,
+ readySignal: make(chan struct{}),
}
r.BaseService = *service.NewBaseService(logger, "Consensus", r)
@@ -188,7 +161,14 @@ func NewReactor(
close(r.readySignal)
}
- return r, nil
+ return r
+}
+
+type channelBundle struct {
+ state *p2p.Channel
+ data *p2p.Channel
+ vote *p2p.Channel
+ votSet *p2p.Channel
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -198,13 +178,39 @@ func NewReactor(
func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Debug("consensus wait sync", "wait_sync", r.WaitSync())
+ peerUpdates := r.peerEvents(ctx)
+
+ var chBundle channelBundle
+ var err error
+
+ chans := getChannelDescriptors()
+ chBundle.state, err = r.chCreator(ctx, chans[StateChannel])
+ if err != nil {
+ return err
+ }
+
+ chBundle.data, err = r.chCreator(ctx, chans[DataChannel])
+ if err != nil {
+ return err
+ }
+
+ chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel])
+ if err != nil {
+ return err
+ }
+
+ chBundle.votSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel])
+ if err != nil {
+ return err
+ }
+
// start routine that computes peer statistics for evaluating peer quality
//
// TODO: Evaluate if we need this to be synchronized via WaitGroup as to not
// leak the goroutine when stopping the reactor.
- go r.peerStatsRoutine(ctx)
+ go r.peerStatsRoutine(ctx, peerUpdates)
- r.subscribeToBroadcastEvents()
+ r.subscribeToBroadcastEvents(chBundle.state)
go r.updateRoundStateRoutine()
if !r.WaitSync() {
@@ -213,11 +219,11 @@ func (r *Reactor) OnStart(ctx context.Context) error {
}
}
- go r.processStateCh(ctx)
- go r.processDataCh(ctx)
- go r.processVoteCh(ctx)
- go r.processVoteSetBitsCh(ctx)
- go r.processPeerUpdates(ctx)
+ go r.processStateCh(ctx, chBundle)
+ go r.processDataCh(ctx, chBundle)
+ go r.processVoteCh(ctx, chBundle)
+ go r.processVoteSetBitsCh(ctx, chBundle)
+ go r.processPeerUpdates(ctx, peerUpdates, chBundle)
return nil
}
@@ -318,16 +324,16 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
return ps, ok
}
-func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState) error {
- return r.stateCh.Send(ctx, p2p.Envelope{
+func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
+ return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: makeRoundStepMessage(rs),
})
}
-func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState) error {
+func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
psHeader := rs.ProposalBlockParts.Header()
- return r.stateCh.Send(ctx, p2p.Envelope{
+ return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.NewValidBlock{
Height: rs.Height,
@@ -339,8 +345,8 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
})
}
-func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) error {
- return r.stateCh.Send(ctx, p2p.Envelope{
+func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
+ return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.HasVote{
Height: vote.Height,
@@ -354,14 +360,14 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote)
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
-func (r *Reactor) subscribeToBroadcastEvents() {
+func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventNewRoundStepValue,
func(ctx context.Context, data tmevents.EventData) error {
- if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil {
+ if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState), stateCh); err != nil {
return err
}
select {
@@ -382,7 +388,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
listenerIDConsensus,
types.EventValidBlockValue,
func(ctx context.Context, data tmevents.EventData) error {
- return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState))
+ return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState), stateCh)
},
)
if err != nil {
@@ -393,7 +399,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
listenerIDConsensus,
types.EventVoteValue,
func(ctx context.Context, data tmevents.EventData) error {
- return r.broadcastHasVoteMessage(ctx, data.(*types.Vote))
+ return r.broadcastHasVoteMessage(ctx, data.(*types.Vote), stateCh)
},
)
if err != nil {
@@ -411,8 +417,8 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
}
}
-func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error {
- return r.stateCh.Send(ctx, p2p.Envelope{
+func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
+ return stateCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: makeRoundStepMessage(r.getRoundState()),
})
@@ -438,7 +444,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
return r.rs
}
-func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
+func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
@@ -487,7 +493,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
}
logger.Debug("sending block part for catchup", "round", prs.Round, "index", index)
- _ = r.dataCh.Send(ctx, p2p.Envelope{
+ _ = dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.BlockPart{
Height: prs.Height, // not our height, so it does not matter.
@@ -502,7 +508,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
time.Sleep(r.state.config.PeerGossipSleepDuration)
}
-func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) {
+func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
timer := time.NewTimer(0)
@@ -534,7 +540,7 @@ OUTER_LOOP:
}
logger.Debug("sending block part", "height", prs.Height, "round", prs.Round)
- if err := r.dataCh.Send(ctx, p2p.Envelope{
+ if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.BlockPart{
Height: rs.Height, // this tells peer that this part applies to us
@@ -581,7 +587,7 @@ OUTER_LOOP:
continue OUTER_LOOP
}
- r.gossipDataForCatchup(ctx, rs, prs, ps)
+ r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh)
continue OUTER_LOOP
}
@@ -608,7 +614,7 @@ OUTER_LOOP:
propProto := rs.Proposal.ToProto()
logger.Debug("sending proposal", "height", prs.Height, "round", prs.Round)
- if err := r.dataCh.Send(ctx, p2p.Envelope{
+ if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Proposal{
Proposal: *propProto,
@@ -631,7 +637,7 @@ OUTER_LOOP:
pPolProto := pPol.ToProto()
logger.Debug("sending POL", "height", prs.Height, "round", prs.Round)
- if err := r.dataCh.Send(ctx, p2p.Envelope{
+ if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
@@ -659,14 +665,14 @@ OUTER_LOOP:
// pickSendVote picks a vote and sends it to the peer. It will return true if
// there is a vote to send and false otherwise.
-func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader) (bool, error) {
+func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
vote, ok := ps.PickVoteToSend(votes)
if !ok {
return false, nil
}
r.logger.Debug("sending vote message", "ps", ps, "vote", vote)
- if err := r.voteCh.Send(ctx, p2p.Envelope{
+ if err := voteCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: vote.ToProto(),
@@ -687,12 +693,13 @@ func (r *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
+ voteCh *p2p.Channel,
) (bool, error) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
// if there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
- if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.LastCommit to send")
@@ -704,7 +711,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
- if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound)
@@ -715,7 +722,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
- if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
@@ -725,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
- if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round)); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Precommits(prs.Round) to send", "round", prs.Round)
@@ -735,7 +742,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are prevotes to send...(which are needed because of validBlock mechanism)
if prs.Round != -1 && prs.Round <= rs.Round {
- if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
@@ -746,7 +753,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
- if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound)
@@ -758,7 +765,7 @@ func (r *Reactor) gossipVotesForHeight(
return false, nil
}
-func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
+func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
// XXX: simple hack to throttle logs upon sleep
@@ -790,7 +797,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// if height matches, then send LastCommit, Prevotes, and Precommits
if rs.Height == prs.Height {
- if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps); err != nil {
+ if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil {
return
} else if ok {
continue
@@ -799,7 +806,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// special catchup logic -- if peer is lagging by height 1, send LastCommit
if prs.Height != 0 && rs.Height == prs.Height+1 {
- if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked rs.LastCommit to send", "height", prs.Height)
@@ -813,7 +820,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// Load the block commit for prs.Height, which contains precommit
// signatures for prs.Height.
if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil {
- if ok, err := r.pickSendVote(ctx, ps, commit); err != nil {
+ if ok, err := r.pickSendVote(ctx, ps, commit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked Catchup commit to send", "height", prs.Height)
@@ -847,7 +854,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
-func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
+func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
timer := time.NewTimer(0)
defer timer.Stop()
@@ -883,7 +890,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/Prevotes
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
- if err := r.stateCh.Send(ctx, p2p.Envelope{
+ if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -904,7 +911,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/ProposalPOL
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
- if err := r.stateCh.Send(ctx, p2p.Envelope{
+ if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -925,7 +932,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/Precommits
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
- if err := r.stateCh.Send(ctx, p2p.Envelope{
+ if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -950,7 +957,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
if prs.Height <= r.state.blockStore.Height() && prs.Height >= r.state.blockStore.Base() {
// maybe send Height/CatchupCommitRound/CatchupCommit
if commit := r.state.LoadCommit(prs.Height); commit != nil {
- if err := r.stateCh.Send(ctx, p2p.Envelope{
+ if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -983,7 +990,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// be the case, and we spawn all the relevant goroutine to broadcast messages to
// the peer. During peer removal, we remove the peer for our set of peers and
// signal to all spawned goroutines to gracefully exit in a non-blocking manner.
-func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
+func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -1024,14 +1031,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
return
}
// start goroutines for this peer
- go r.gossipDataRoutine(ctx, ps)
- go r.gossipVotesRoutine(ctx, ps)
- go r.queryMaj23Routine(ctx, ps)
+ go r.gossipDataRoutine(ctx, ps, chans.data)
+ go r.gossipVotesRoutine(ctx, ps, chans.vote)
+ go r.queryMaj23Routine(ctx, ps, chans.state)
// Send our state to the peer. If we're block-syncing, broadcast a
// RoundStepMessage later upon SwitchToConsensus().
if !r.WaitSync() {
- go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }()
+ go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID, chans.state) }()
}
}()
@@ -1058,7 +1065,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// If we fail to find the peer state for the envelope sender, we perform a no-op
// and return. This can happen when we process the envelope after the peer is
// removed.
-func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
+func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
ps, ok := r.GetPeerState(envelope.From)
if !ok || ps == nil {
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
@@ -1128,7 +1135,7 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope
eMsg.Votes = *votesProto
}
- if err := r.voteSetBitsCh.Send(ctx, p2p.Envelope{
+ if err := voteSetCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: eMsg,
}); err != nil {
@@ -1296,7 +1303,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En
// the p2p channel.
//
// NOTE: We block on consensus state for proposals, block parts, and votes.
-func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
+func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -1327,17 +1334,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case StateChannel:
- err = r.handleStateMessage(ctx, envelope, msgI)
-
+ err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet)
case DataChannel:
err = r.handleDataMessage(ctx, envelope, msgI)
-
case VoteChannel:
err = r.handleVoteMessage(ctx, envelope, msgI)
-
case VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msgI)
-
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
@@ -1350,13 +1353,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// execution will result in a PeerError being sent on the StateChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
-func (r *Reactor) processStateCh(ctx context.Context) {
- iter := r.stateCh.Receive(ctx)
+func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) {
+ iter := chans.state.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil {
- r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err)
- if serr := r.stateCh.SendError(ctx, p2p.PeerError{
+ if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil {
+ r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err)
+ if serr := chans.state.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1371,13 +1374,13 @@ func (r *Reactor) processStateCh(ctx context.Context) {
// execution will result in a PeerError being sent on the DataChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
-func (r *Reactor) processDataCh(ctx context.Context) {
- iter := r.dataCh.Receive(ctx)
+func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) {
+ iter := chans.data.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil {
- r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err)
- if serr := r.dataCh.SendError(ctx, p2p.PeerError{
+ if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil {
+ r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err)
+ if serr := chans.data.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1392,13 +1395,13 @@ func (r *Reactor) processDataCh(ctx context.Context) {
// execution will result in a PeerError being sent on the VoteChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
-func (r *Reactor) processVoteCh(ctx context.Context) {
- iter := r.voteCh.Receive(ctx)
+func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) {
+ iter := chans.vote.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil {
- r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err)
- if serr := r.voteCh.SendError(ctx, p2p.PeerError{
+ if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil {
+ r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err)
+ if serr := chans.vote.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1413,18 +1416,18 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
// execution will result in a PeerError being sent on the VoteSetBitsChannel.
// When the reactor is stopped, we will catch the signal and close the p2p
// Channel gracefully.
-func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
- iter := r.voteSetBitsCh.Receive(ctx)
+func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle) {
+ iter := chans.votSet.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.voteSetBitsCh.ID, envelope); err != nil {
+ if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
- r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err)
- if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{
+ r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err)
+ if serr := chans.votSet.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1437,18 +1440,18 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
-func (r *Reactor) processPeerUpdates(ctx context.Context) {
+func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, chans channelBundle) {
for {
select {
case <-ctx.Done():
return
- case peerUpdate := <-r.peerUpdates.Updates():
- r.processPeerUpdate(ctx, peerUpdate)
+ case peerUpdate := <-peerUpdates.Updates():
+ r.processPeerUpdate(ctx, peerUpdate, chans)
}
}
}
-func (r *Reactor) peerStatsRoutine(ctx context.Context) {
+func (r *Reactor) peerStatsRoutine(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
if !r.IsRunning() {
r.logger.Info("stopping peerStatsRoutine")
@@ -1466,7 +1469,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
switch msg.Msg.(type) {
case *VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
- r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
+ peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})
@@ -1474,7 +1477,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
case *BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
- r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
+ peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})
diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go
index 886e3794a..9c6af5c5b 100644
--- a/internal/consensus/reactor_test.go
+++ b/internal/consensus/reactor_test.go
@@ -104,16 +104,15 @@ func setup(
for nodeID, node := range rts.network.Nodes {
state := states[i]
- reactor, err := NewReactor(ctx,
+ reactor := NewReactor(
state.logger.With("node", nodeID),
state,
chCreator(nodeID),
- node.MakePeerUpdates(ctx, t),
+ func(ctx context.Context) *p2p.PeerUpdates { return node.MakePeerUpdates(ctx, t) },
state.eventBus,
true,
NopMetrics(),
)
- require.NoError(t, err)
blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: testSubscriber,
diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go
index c52e7e32b..4809df32e 100644
--- a/internal/evidence/reactor.go
+++ b/internal/evidence/reactor.go
@@ -47,9 +47,9 @@ type Reactor struct {
service.BaseService
logger log.Logger
- evpool *Pool
- evidenceCh *p2p.Channel
- peerUpdates *p2p.PeerUpdates
+ evpool *Pool
+ chCreator p2p.ChannelCreator
+ peerEvents p2p.PeerEventSubscriber
mtx sync.Mutex
@@ -60,28 +60,22 @@ type Reactor struct {
// service.Service interface. It accepts a p2p Channel dedicated for handling
// envelopes with EvidenceList messages.
func NewReactor(
- ctx context.Context,
logger log.Logger,
chCreator p2p.ChannelCreator,
- peerUpdates *p2p.PeerUpdates,
+ peerEvents p2p.PeerEventSubscriber,
evpool *Pool,
-) (*Reactor, error) {
- evidenceCh, err := chCreator(ctx, GetChannelDescriptor())
- if err != nil {
- return nil, err
- }
-
+) *Reactor {
r := &Reactor{
logger: logger,
evpool: evpool,
- evidenceCh: evidenceCh,
- peerUpdates: peerUpdates,
+ chCreator: chCreator,
+ peerEvents: peerEvents,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
}
r.BaseService = *service.NewBaseService(logger, "Evidence", r)
- return r, err
+ return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -89,18 +83,20 @@ func NewReactor(
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
- go r.processEvidenceCh(ctx)
- go r.processPeerUpdates(ctx)
+ ch, err := r.chCreator(ctx, GetChannelDescriptor())
+ if err != nil {
+ return err
+ }
+
+ go r.processEvidenceCh(ctx, ch)
+ go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch)
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
-func (r *Reactor) OnStop() {
- // Close the evidence db
- r.evpool.Close()
-}
+func (r *Reactor) OnStop() { r.evpool.Close() }
// handleEvidenceMessage handles envelopes sent from peers on the EvidenceChannel.
// It returns an error only if the Envelope.Message is unknown for this channel
@@ -164,13 +160,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh.
-func (r *Reactor) processEvidenceCh(ctx context.Context) {
- iter := r.evidenceCh.Receive(ctx)
+func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
+ iter := evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.evidenceCh.ID, envelope); err != nil {
- r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
- if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
+ if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil {
+ r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err)
+ if serr := evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -191,7 +187,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
// connects/disconnects frequently from the broadcasting peer(s).
//
// REF: https://github.com/tendermint/tendermint/issues/4727
-func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
+func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -214,7 +210,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
if !ok {
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
- go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID)
+ go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID, evidenceCh)
}
case p2p.PeerStatusDown:
@@ -232,11 +228,11 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
-func (r *Reactor) processPeerUpdates(ctx context.Context) {
+func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
for {
select {
- case peerUpdate := <-r.peerUpdates.Updates():
- r.processPeerUpdate(ctx, peerUpdate)
+ case peerUpdate := <-peerUpdates.Updates():
+ r.processPeerUpdate(ctx, peerUpdate, evidenceCh)
case <-ctx.Done():
return
}
@@ -254,7 +250,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
-func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) {
+func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
var next *clist.CElement
defer func() {
@@ -301,7 +297,7 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.
- if err := r.evidenceCh.Send(ctx, p2p.Envelope{
+ if err := evidenceCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: evProto,
}); err != nil {
diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go
index 0b2c2fb3b..2fdfa8c60 100644
--- a/internal/evidence/reactor_test.go
+++ b/internal/evidence/reactor_test.go
@@ -92,21 +92,20 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
- rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
- rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
+ pu := p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
+ rts.peerUpdates[nodeID] = pu
+ rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.evidenceChannels[nodeID], nil
}
- rts.reactors[nodeID], err = evidence.NewReactor(
- ctx,
+ rts.reactors[nodeID] = evidence.NewReactor(
logger,
chCreator,
- rts.peerUpdates[nodeID],
+ func(ctx context.Context) *p2p.PeerUpdates { return pu },
rts.pools[nodeID])
- require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())
diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go
index 816589c9b..8f83f5006 100644
--- a/internal/mempool/reactor.go
+++ b/internal/mempool/reactor.go
@@ -40,13 +40,9 @@ type Reactor struct {
mempool *TxMempool
ids *IDs
- // XXX: Currently, this is the only way to get information about a peer. Ideally,
- // we rely on message-oriented communication to get necessary peer data.
- // ref: https://github.com/tendermint/tendermint/issues/5670
- peerMgr PeerManager
-
- mempoolCh *p2p.Channel
- peerUpdates *p2p.PeerUpdates
+ getPeerHeight func(types.NodeID) int64
+ peerEvents p2p.PeerEventSubscriber
+ chCreator p2p.ChannelCreator
// observePanic is a function for observing panics that were recovered in methods on
// Reactor. observePanic is called with the recovered value.
@@ -58,34 +54,27 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor.
func NewReactor(
- ctx context.Context,
logger log.Logger,
cfg *config.MempoolConfig,
- peerMgr PeerManager,
txmp *TxMempool,
chCreator p2p.ChannelCreator,
- peerUpdates *p2p.PeerUpdates,
-) (*Reactor, error) {
-
- ch, err := chCreator(ctx, getChannelDescriptor(cfg))
- if err != nil {
- return nil, err
- }
-
+ peerEvents p2p.PeerEventSubscriber,
+ getPeerHeight func(types.NodeID) int64,
+) *Reactor {
r := &Reactor{
- logger: logger,
- cfg: cfg,
- peerMgr: peerMgr,
- mempool: txmp,
- ids: NewMempoolIDs(),
- mempoolCh: ch,
- peerUpdates: peerUpdates,
- peerRoutines: make(map[types.NodeID]context.CancelFunc),
- observePanic: defaultObservePanic,
+ logger: logger,
+ cfg: cfg,
+ mempool: txmp,
+ ids: NewMempoolIDs(),
+ chCreator: chCreator,
+ peerEvents: peerEvents,
+ getPeerHeight: getPeerHeight,
+ peerRoutines: make(map[types.NodeID]context.CancelFunc),
+ observePanic: defaultObservePanic,
}
r.BaseService = *service.NewBaseService(logger, "Mempool", r)
- return r, nil
+ return r
}
func defaultObservePanic(r interface{}) {}
@@ -119,8 +108,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Info("tx broadcasting is disabled")
}
- go r.processMempoolCh(ctx)
- go r.processPeerUpdates(ctx)
+ ch, err := r.chCreator(ctx, getChannelDescriptor(r.cfg))
+ if err != nil {
+ return err
+ }
+
+ go r.processMempoolCh(ctx, ch)
+ go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch)
return nil
}
@@ -203,13 +197,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processMempoolCh implements a blocking event loop where we listen for p2p
// Envelope messages from the mempoolCh.
-func (r *Reactor) processMempoolCh(ctx context.Context) {
- iter := r.mempoolCh.Receive(ctx)
+func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
+ iter := mempoolCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
- r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
- if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{
+ if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil {
+ r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err)
+ if serr := mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -224,7 +218,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context) {
// goroutine or not. If not, we start one for the newly added peer. For down or
// removed peers, we remove the peer from the mempool peer ID set and signal to
// stop the tx broadcasting goroutine.
-func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
+func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -252,7 +246,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
r.ids.ReserveForPeer(peerUpdate.NodeID)
// start a broadcast routine ensuring all txs are forwarded to the peer
- go r.broadcastTxRoutine(pctx, peerUpdate.NodeID)
+ go r.broadcastTxRoutine(pctx, peerUpdate.NodeID, mempoolCh)
}
}
@@ -273,18 +267,18 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
-func (r *Reactor) processPeerUpdates(ctx context.Context) {
+func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
for {
select {
case <-ctx.Done():
return
- case peerUpdate := <-r.peerUpdates.Updates():
- r.processPeerUpdate(ctx, peerUpdate)
+ case peerUpdate := <-peerUpdates.Updates():
+ r.processPeerUpdate(ctx, peerUpdate, mempoolCh)
}
}
}
-func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
+func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement
@@ -325,8 +319,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
memTx := nextGossipTx.Value.(*WrappedTx)
- if r.peerMgr != nil {
- height := r.peerMgr.GetHeight(peerID)
+ if r.getPeerHeight != nil {
+ height := r.getPeerHeight(peerID)
if height > 0 && height < memTx.height-1 {
// allow for a lag of one block
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
@@ -339,7 +333,7 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
// Send the mempool tx to the corresponding peer. Note, the peer may be
// behind and thus would not be able to process the mempool tx correctly.
- if err := r.mempoolCh.Send(ctx, p2p.Envelope{
+ if err := mempoolCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protomem.Txs{
Txs: [][]byte{memTx.tx},
diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go
index 64a8adca1..82a97aeec 100644
--- a/internal/mempool/reactor_test.go
+++ b/internal/mempool/reactor_test.go
@@ -79,17 +79,14 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
return rts.mempoolChannels[nodeID], nil
}
- rts.reactors[nodeID], err = NewReactor(
- ctx,
+ rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
cfg.Mempool,
- rts.network.Nodes[nodeID].PeerManager,
mempool,
chCreator,
- rts.peerUpdates[nodeID],
+ func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
+ rts.network.Nodes[nodeID].PeerManager.GetHeight,
)
-
- require.NoError(t, err)
rts.nodes = append(rts.nodes, nodeID)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
@@ -179,7 +176,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
// run the router
rts.start(ctx, t)
- go primaryReactor.broadcastTxRoutine(ctx, secondary)
+ go primaryReactor.broadcastTxRoutine(ctx, secondary, rts.mempoolChannels[primary])
wg := &sync.WaitGroup{}
for i := 0; i < 50; i++ {
diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go
index fdfc0d45c..6016ce0a5 100644
--- a/internal/p2p/p2ptest/network.go
+++ b/internal/p2p/p2ptest/network.go
@@ -259,7 +259,6 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
require.NoError(t, err)
router, err := p2p.NewRouter(
- ctx,
n.logger,
p2p.NopMetrics(),
nodeInfo,
diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go
index 4044ad569..756551a49 100644
--- a/internal/p2p/peermanager.go
+++ b/internal/p2p/peermanager.go
@@ -828,6 +828,11 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
return addresses
}
+// PeerEventSubscriber describes the type of the subscription method, to assist
+// in isolating reactors specific construction and lifecycle from the
+// peer manager.
+type PeerEventSubscriber func(context.Context) *PeerUpdates
+
// Subscribe subscribes to peer updates. The caller must consume the peer
// updates in a timely fashion and close the subscription when done, otherwise
// the PeerManager will halt.
diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go
index 178524af2..1c80763ee 100644
--- a/internal/p2p/pex/reactor.go
+++ b/internal/p2p/pex/reactor.go
@@ -80,9 +80,8 @@ type Reactor struct {
logger log.Logger
peerManager *p2p.PeerManager
- pexCh *p2p.Channel
- peerUpdates *p2p.PeerUpdates
-
+ chCreator p2p.ChannelCreator
+ peerEvents p2p.PeerEventSubscriber
// list of available peers to loop through and send peer requests to
availablePeers map[types.NodeID]struct{}
@@ -105,30 +104,23 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor.
func NewReactor(
- ctx context.Context,
logger log.Logger,
peerManager *p2p.PeerManager,
channelCreator p2p.ChannelCreator,
- peerUpdates *p2p.PeerUpdates,
-) (*Reactor, error) {
-
- channel, err := channelCreator(ctx, ChannelDescriptor())
- if err != nil {
- return nil, err
- }
-
+ peerEvents p2p.PeerEventSubscriber,
+) *Reactor {
r := &Reactor{
logger: logger,
peerManager: peerManager,
- pexCh: channel,
- peerUpdates: peerUpdates,
+ chCreator: channelCreator,
+ peerEvents: peerEvents,
availablePeers: make(map[types.NodeID]struct{}),
requestsSent: make(map[types.NodeID]struct{}),
lastReceivedRequests: make(map[types.NodeID]time.Time),
}
r.BaseService = *service.NewBaseService(logger, "PEX", r)
- return r, nil
+ return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -136,8 +128,14 @@ func NewReactor(
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
func (r *Reactor) OnStart(ctx context.Context) error {
- go r.processPexCh(ctx)
- go r.processPeerUpdates(ctx)
+ channel, err := r.chCreator(ctx, ChannelDescriptor())
+ if err != nil {
+ return err
+ }
+
+ peerUpdates := r.peerEvents(ctx)
+ go r.processPexCh(ctx, channel)
+ go r.processPeerUpdates(ctx, peerUpdates)
return nil
}
@@ -147,11 +145,11 @@ func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
-func (r *Reactor) processPexCh(ctx context.Context) {
+func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
incoming := make(chan *p2p.Envelope)
go func() {
defer close(incoming)
- iter := r.pexCh.Receive(ctx)
+ iter := pexCh.Receive(ctx)
for iter.Next(ctx) {
select {
case <-ctx.Done():
@@ -177,7 +175,7 @@ func (r *Reactor) processPexCh(ctx context.Context) {
case <-timer.C:
// Send a request for more peer addresses.
- if err := r.sendRequestForPeers(ctx); err != nil {
+ if err := r.sendRequestForPeers(ctx, pexCh); err != nil {
return
// TODO(creachadair): Do we really want to stop processing the PEX
// channel just because of an error here?
@@ -192,11 +190,11 @@ func (r *Reactor) processPexCh(ctx context.Context) {
}
// A request from another peer, or a response to one of our requests.
- dur, err := r.handlePexMessage(ctx, envelope)
+ dur, err := r.handlePexMessage(ctx, envelope, pexCh)
if err != nil {
r.logger.Error("failed to process message",
- "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
- if serr := r.pexCh.SendError(ctx, p2p.PeerError{
+ "ch_id", pexCh.ID, "envelope", envelope, "err", err)
+ if serr := pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -213,12 +211,12 @@ func (r *Reactor) processPexCh(ctx context.Context) {
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
-func (r *Reactor) processPeerUpdates(ctx context.Context) {
+func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
- case peerUpdate := <-r.peerUpdates.Updates():
+ case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
}
}
@@ -227,7 +225,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
-func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) {
+func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -246,7 +244,7 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
URL: addr.String(),
}
}
- return 0, r.pexCh.Send(ctx, p2p.Envelope{
+ return 0, pexCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses},
})
@@ -310,7 +308,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
-func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
+func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if len(r.availablePeers) == 0 {
@@ -325,7 +323,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
break
}
- if err := r.pexCh.Send(ctx, p2p.Envelope{
+ if err := pexCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequest{},
}); err != nil {
diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go
index 288755e19..325ea72ab 100644
--- a/internal/p2p/pex/reactor_test.go
+++ b/internal/p2p/pex/reactor_test.go
@@ -23,8 +23,8 @@ import (
const (
checkFrequency = 500 * time.Millisecond
defaultBufferSize = 2
- shortWait = 10 * time.Second
- longWait = 60 * time.Second
+ shortWait = 5 * time.Second
+ longWait = 20 * time.Second
firstNode = 0
secondNode = 1
@@ -211,7 +211,8 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
require.Eventually(t, func() bool {
// nolint:scopelint
return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9
- }, longWait, checkFrequency)
+ }, longWait, checkFrequency,
+ "peer ratio is: %f", testNet.network.Nodes[nodeID].PeerManager.PeerRatio())
}
}
@@ -303,8 +304,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
return pexCh, nil
}
- reactor, err := pex.NewReactor(ctx, log.NewNopLogger(), peerManager, chCreator, peerUpdates)
- require.NoError(t, err)
+ reactor := pex.NewReactor(log.NewNopLogger(), peerManager, chCreator, func(_ context.Context) *p2p.PeerUpdates { return peerUpdates })
require.NoError(t, reactor.Start(ctx))
t.Cleanup(reactor.Wait)
@@ -381,6 +381,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
idx := 0
for nodeID := range rts.network.Nodes {
+ // make a copy to avoid getting hit by the range ref
+ // confusion:
+ nodeID := nodeID
+
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
@@ -393,15 +397,12 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
if idx < opts.MockNodes {
rts.mocks = append(rts.mocks, nodeID)
} else {
- var err error
- rts.reactors[nodeID], err = pex.NewReactor(
- ctx,
+ rts.reactors[nodeID] = pex.NewReactor(
rts.logger.With("nodeID", nodeID),
rts.network.Nodes[nodeID].PeerManager,
chCreator,
- rts.peerUpdates[nodeID],
+ func(_ context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
)
- require.NoError(t, err)
}
rts.nodes = append(rts.nodes, nodeID)
@@ -426,9 +427,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
func (r *reactorTestSuite) start(ctx context.Context, t *testing.T) {
t.Helper()
- for _, reactor := range r.reactors {
+ for name, reactor := range r.reactors {
require.NoError(t, reactor.Start(ctx))
require.True(t, reactor.IsRunning())
+ t.Log("started", name)
}
}
@@ -451,15 +453,12 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
return r.pexChannels[nodeID], nil
}
- var err error
- r.reactors[nodeID], err = pex.NewReactor(
- ctx,
+ r.reactors[nodeID] = pex.NewReactor(
r.logger.With("nodeID", nodeID),
r.network.Nodes[nodeID].PeerManager,
chCreator,
- r.peerUpdates[nodeID],
+ func(_ context.Context) *p2p.PeerUpdates { return r.peerUpdates[nodeID] },
)
- require.NoError(t, err)
r.nodes = append(r.nodes, nodeID)
r.total++
}
diff --git a/internal/p2p/router.go b/internal/p2p/router.go
index 40c24d56b..025769592 100644
--- a/internal/p2p/router.go
+++ b/internal/p2p/router.go
@@ -177,7 +177,6 @@ type Router struct {
// listening on appropriate interfaces, and will be closed by the Router when it
// stops.
func NewRouter(
- ctx context.Context,
logger log.Logger,
metrics *Metrics,
nodeInfo types.NodeInfo,
@@ -215,13 +214,6 @@ func NewRouter(
router.BaseService = service.NewBaseService(logger, "router", router)
- qf, err := router.createQueueFactory(ctx)
- if err != nil {
- return nil, err
- }
-
- router.queueFactory = qf
-
for _, transport := range transports {
for _, protocol := range transport.Protocols() {
if _, ok := router.protocolTransports[protocol]; !ok {
@@ -941,8 +933,22 @@ func (r *Router) NodeInfo() types.NodeInfo {
return r.nodeInfo.Copy()
}
+func (r *Router) setupQueueFactory(ctx context.Context) error {
+ qf, err := r.createQueueFactory(ctx)
+ if err != nil {
+ return err
+ }
+
+ r.queueFactory = qf
+ return nil
+}
+
// OnStart implements service.Service.
func (r *Router) OnStart(ctx context.Context) error {
+ if err := r.setupQueueFactory(ctx); err != nil {
+ return err
+ }
+
for _, transport := range r.transports {
for _, endpoint := range r.endpoints {
if err := transport.Listen(endpoint); err != nil {
diff --git a/internal/p2p/router_init_test.go b/internal/p2p/router_init_test.go
index 19b4aa94c..d58c79487 100644
--- a/internal/p2p/router_init_test.go
+++ b/internal/p2p/router_init_test.go
@@ -23,29 +23,35 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
- r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
+ r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
+ require.NoError(t, r.setupQueueFactory(ctx))
+
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
- r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
+ r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
+ require.NoError(t, r.setupQueueFactory(ctx))
+
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
- r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
+ r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
+ require.NoError(t, r.setupQueueFactory(ctx))
+
q, ok := r.queueFactory(1).(*pqScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
- _, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
+ _, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})
diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go
index 6142dc45f..5facfdaff 100644
--- a/internal/p2p/router_test.go
+++ b/internal/p2p/router_test.go
@@ -106,7 +106,6 @@ func TestRouter_Channel_Basic(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -409,7 +408,6 @@ func TestRouter_AcceptPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -464,7 +462,6 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -502,7 +499,6 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -554,7 +550,6 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -658,7 +653,6 @@ func TestRouter_DialPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -744,7 +738,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
require.True(t, added)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -819,7 +812,6 @@ func TestRouter_EvictPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -882,7 +874,6 @@ func TestRouter_ChannelCompatability(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -938,7 +929,6 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
- ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go
index dba520a1c..15abf3ef0 100644
--- a/internal/statesync/reactor.go
+++ b/internal/statesync/reactor.go
@@ -16,7 +16,6 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
- "github.com/tendermint/tendermint/internal/p2p/conn"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -139,13 +138,11 @@ type Reactor struct {
stateStore sm.Store
blockStore *store.BlockStore
- conn abciclient.Client
- tempDir string
- snapshotCh *p2p.Channel
- chunkCh *p2p.Channel
- blockCh *p2p.Channel
- paramsCh *p2p.Channel
- peerUpdates *p2p.PeerUpdates
+ conn abciclient.Client
+ tempDir string
+ peerEvents p2p.PeerEventSubscriber
+ chCreator p2p.ChannelCreator
+ sendBlockError func(context.Context, p2p.PeerError) error
// Dispatcher is used to multiplex light block requests and responses over multiple
// peers used by the p2p state provider and in reverse sync.
@@ -155,10 +152,13 @@ type Reactor struct {
// These will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the syncer and manage incoming and outgoing
// providers.
- mtx sync.RWMutex
- syncer *syncer
- providers map[types.NodeID]*BlockProvider
- stateProvider StateProvider
+ mtx sync.RWMutex
+ initSyncer func() *syncer
+ requestSnaphot func() error
+ syncer *syncer
+ providers map[types.NodeID]*BlockProvider
+ initStateProvider func(ctx context.Context, chainID string, initialHeight int64) error
+ stateProvider StateProvider
eventBus *eventbus.EventBus
metrics *Metrics
@@ -178,32 +178,13 @@ func NewReactor(
logger log.Logger,
conn abciclient.Client,
channelCreator p2p.ChannelCreator,
- peerUpdates *p2p.PeerUpdates,
+ peerEvents p2p.PeerEventSubscriber,
stateStore sm.Store,
blockStore *store.BlockStore,
tempDir string,
ssMetrics *Metrics,
eventBus *eventbus.EventBus,
-) (*Reactor, error) {
-
- chDesc := getChannelDescriptors()
-
- snapshotCh, err := channelCreator(ctx, chDesc[SnapshotChannel])
- if err != nil {
- return nil, err
- }
- chunkCh, err := channelCreator(ctx, chDesc[ChunkChannel])
- if err != nil {
- return nil, err
- }
- blockCh, err := channelCreator(ctx, chDesc[LightBlockChannel])
- if err != nil {
- return nil, err
- }
- paramsCh, err := channelCreator(ctx, chDesc[ParamsChannel])
- if err != nil {
- return nil, err
- }
+) *Reactor {
r := &Reactor{
logger: logger,
@@ -211,23 +192,19 @@ func NewReactor(
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
- snapshotCh: snapshotCh,
- chunkCh: chunkCh,
- blockCh: blockCh,
- paramsCh: paramsCh,
- peerUpdates: peerUpdates,
+ chCreator: channelCreator,
+ peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
- dispatcher: NewDispatcher(blockCh),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
- return r, nil
+ return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -237,8 +214,91 @@ func NewReactor(
// The caller must be sure to execute OnStop to ensure the outbound p2p Channels are
// closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
- go r.processChannels(ctx, r.snapshotCh, r.chunkCh, r.blockCh, r.paramsCh)
- go r.processPeerUpdates(ctx)
+ // construct channels
+ chDesc := getChannelDescriptors()
+ snapshotCh, err := r.chCreator(ctx, chDesc[SnapshotChannel])
+ if err != nil {
+ return err
+ }
+ chunkCh, err := r.chCreator(ctx, chDesc[ChunkChannel])
+ if err != nil {
+ return err
+ }
+ blockCh, err := r.chCreator(ctx, chDesc[LightBlockChannel])
+ if err != nil {
+ return err
+ }
+ paramsCh, err := r.chCreator(ctx, chDesc[ParamsChannel])
+ if err != nil {
+ return err
+ }
+
+ // define constructor and helper functions, that hold
+ // references to these channels for use later. This is not
+ // ideal.
+ r.initSyncer = func() *syncer {
+ return &syncer{
+ logger: r.logger,
+ stateProvider: r.stateProvider,
+ conn: r.conn,
+ snapshots: newSnapshotPool(),
+ snapshotCh: snapshotCh,
+ chunkCh: chunkCh,
+ tempDir: r.tempDir,
+ fetchers: r.cfg.Fetchers,
+ retryTimeout: r.cfg.ChunkRequestTimeout,
+ metrics: r.metrics,
+ }
+ }
+ r.dispatcher = NewDispatcher(blockCh)
+ r.requestSnaphot = func() error {
+ // request snapshots from all currently connected peers
+ return snapshotCh.Send(ctx, p2p.Envelope{
+ Broadcast: true,
+ Message: &ssproto.SnapshotsRequest{},
+ })
+ }
+ r.sendBlockError = blockCh.SendError
+
+ r.initStateProvider = func(ctx context.Context, chainID string, initialHeight int64) error {
+ to := light.TrustOptions{
+ Period: r.cfg.TrustPeriod,
+ Height: r.cfg.TrustHeight,
+ Hash: r.cfg.TrustHashBytes(),
+ }
+ spLogger := r.logger.With("module", "stateprovider")
+ spLogger.Info("initializing state provider", "trustPeriod", to.Period,
+ "trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
+
+ if r.cfg.UseP2P {
+ if err := r.waitForEnoughPeers(ctx, 2); err != nil {
+ return err
+ }
+
+ peers := r.peers.All()
+ providers := make([]provider.Provider, len(peers))
+ for idx, p := range peers {
+ providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
+ }
+
+ stateProvider, err := NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, paramsCh, r.logger.With("module", "stateprovider"))
+ if err != nil {
+ return fmt.Errorf("failed to initialize P2P state provider: %w", err)
+ }
+ r.stateProvider = stateProvider
+ return nil
+ }
+
+ stateProvider, err := NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
+ if err != nil {
+ return fmt.Errorf("failed to initialize RPC state provider: %w", err)
+ }
+ r.stateProvider = stateProvider
+ return nil
+ }
+
+ go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
+ go r.processPeerUpdates(ctx, r.peerEvents(ctx))
return nil
}
@@ -281,16 +341,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
return sm.State{}, err
}
- r.syncer = newSyncer(
- r.cfg,
- r.logger,
- r.conn,
- r.stateProvider,
- r.snapshotCh,
- r.chunkCh,
- r.tempDir,
- r.metrics,
- )
+ r.syncer = r.initSyncer()
r.mtx.Unlock()
defer func() {
@@ -301,15 +352,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.mtx.Unlock()
}()
- requestSnapshotsHook := func() error {
- // request snapshots from all currently connected peers
- return r.snapshotCh.Send(ctx, p2p.Envelope{
- Broadcast: true,
- Message: &ssproto.SnapshotsRequest{},
- })
- }
-
- state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
+ state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, r.requestSnaphot)
if err != nil {
return sm.State{}, err
}
@@ -436,7 +479,7 @@ func (r *Reactor) backfill(
r.logger.Info("backfill: fetched light block failed validate basic, removing peer...",
"err", err, "height", height)
queue.retry(height)
- if serr := r.blockCh.SendError(ctx, p2p.PeerError{
+ if serr := r.sendBlockError(ctx, p2p.PeerError{
NodeID: peer,
Err: fmt.Errorf("received invalid light block: %w", err),
}); serr != nil {
@@ -473,7 +516,7 @@ func (r *Reactor) backfill(
if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
r.logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
"trustedHash", w, "receivedHash", g, "height", resp.block.Height)
- if err := r.blockCh.SendError(ctx, p2p.PeerError{
+ if err := r.sendBlockError(ctx, p2p.PeerError{
NodeID: resp.peer,
Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
}); err != nil {
@@ -534,7 +577,7 @@ func (r *Reactor) backfill(
// handleSnapshotMessage handles envelopes sent from peers on the
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
-func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope) error {
+func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -553,7 +596,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"peer", envelope.From,
)
- if err := r.snapshotCh.Send(ctx, p2p.Envelope{
+ if err := snapshotCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
@@ -589,8 +632,8 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"failed to add snapshot",
"height", msg.Height,
"format", msg.Format,
+ "channel", snapshotCh.ID,
"err", err,
- "channel", r.snapshotCh.ID,
)
return nil
}
@@ -606,7 +649,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
// It returns an error only if the Envelope.Message is unknown for this channel.
// This should never be called outside of handleMessage.
-func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope) error {
+func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ChunkRequest:
r.logger.Debug(
@@ -640,7 +683,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
"chunk", msg.Index,
"peer", envelope.From,
)
- if err := r.chunkCh.Send(ctx, p2p.Envelope{
+ if err := chunkCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
@@ -695,7 +738,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
return nil
}
-func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope) error {
+func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.LightBlockRequest:
r.logger.Info("received light block request", "height", msg.Height)
@@ -705,7 +748,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return err
}
if lb == nil {
- if err := r.blockCh.Send(ctx, p2p.Envelope{
+ if err := blockCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
@@ -724,7 +767,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
// NOTE: If we don't have the light block we will send a nil light block
// back to the requested node, indicating that we don't have it.
- if err := r.blockCh.Send(ctx, p2p.Envelope{
+ if err := blockCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: lbproto,
@@ -752,7 +795,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return nil
}
-func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope) error {
+func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ParamsRequest:
r.logger.Debug("received consensus params request", "height", msg.Height)
@@ -763,7 +806,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
}
cpproto := cp.ToProto()
- if err := r.paramsCh.Send(ctx, p2p.Envelope{
+ if err := paramsCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.ParamsResponse{
Height: msg.Height,
@@ -801,7 +844,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
-func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
+func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -817,13 +860,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
switch envelope.ChannelID {
case SnapshotChannel:
- err = r.handleSnapshotMessage(ctx, envelope)
+ err = r.handleSnapshotMessage(ctx, envelope, chans[SnapshotChannel])
case ChunkChannel:
- err = r.handleChunkMessage(ctx, envelope)
+ err = r.handleChunkMessage(ctx, envelope, chans[ChunkChannel])
case LightBlockChannel:
- err = r.handleLightBlockMessage(ctx, envelope)
+ err = r.handleLightBlockMessage(ctx, envelope, chans[LightBlockChannel])
case ParamsChannel:
- err = r.handleParamsMessage(ctx, envelope)
+ err = r.handleParamsMessage(ctx, envelope, chans[ParamsChannel])
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
@@ -840,7 +883,7 @@ func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- chanTable := make(map[conn.ChannelID]*p2p.Channel, len(chs))
+ chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs))
for idx := range chs {
ch := chs[idx]
chanTable[ch.ID] = ch
@@ -849,7 +892,7 @@ func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
iter := p2p.MergedChannelIterator(ctx, chs...)
for iter.Next(ctx) {
envelope := iter.Envelope()
- if err := r.handleMessage(ctx, envelope); err != nil {
+ if err := r.handleMessage(ctx, envelope, chanTable); err != nil {
ch, ok := chanTable[envelope.ChannelID]
if !ok {
r.logger.Error("received impossible message",
@@ -930,12 +973,12 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
-func (r *Reactor) processPeerUpdates(ctx context.Context) {
+func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
- case peerUpdate := <-r.peerUpdates.Updates():
+ case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
}
}
@@ -1040,41 +1083,6 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error {
return nil
}
-func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
- var err error
- to := light.TrustOptions{
- Period: r.cfg.TrustPeriod,
- Height: r.cfg.TrustHeight,
- Hash: r.cfg.TrustHashBytes(),
- }
- spLogger := r.logger.With("module", "stateprovider")
- spLogger.Info("initializing state provider", "trustPeriod", to.Period,
- "trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
-
- if r.cfg.UseP2P {
- if err := r.waitForEnoughPeers(ctx, 2); err != nil {
- return err
- }
-
- peers := r.peers.All()
- providers := make([]provider.Provider, len(peers))
- for idx, p := range peers {
- providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
- }
-
- r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh, spLogger)
- if err != nil {
- return fmt.Errorf("failed to initialize P2P state provider: %w", err)
- }
- } else {
- r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
- if err != nil {
- return fmt.Errorf("failed to initialize RPC state provider: %w", err)
- }
- }
- return nil
-}
-
func (r *Reactor) TotalSnapshots() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go
index 709fe9a2c..f59a6e4ee 100644
--- a/internal/statesync/reactor_test.go
+++ b/internal/statesync/reactor_test.go
@@ -154,8 +154,7 @@ func setup(
logger := log.NewNopLogger()
- var err error
- rts.reactor, err = NewReactor(
+ rts.reactor = NewReactor(
ctx,
factory.DefaultTestChainID,
1,
@@ -163,25 +162,26 @@ func setup(
logger.With("component", "reactor"),
conn,
chCreator,
- rts.peerUpdates,
+ func(context.Context) *p2p.PeerUpdates { return rts.peerUpdates },
rts.stateStore,
rts.blockStore,
"",
m,
nil, // eventbus can be nil
)
- require.NoError(t, err)
- rts.syncer = newSyncer(
- *cfg,
- logger.With("component", "syncer"),
- conn,
- stateProvider,
- rts.snapshotChannel,
- rts.chunkChannel,
- "",
- rts.reactor.metrics,
- )
+ rts.syncer = &syncer{
+ logger: logger,
+ stateProvider: stateProvider,
+ conn: conn,
+ snapshots: newSnapshotPool(),
+ snapshotCh: rts.snapshotChannel,
+ chunkCh: rts.chunkChannel,
+ tempDir: t.TempDir(),
+ fetchers: cfg.Fetchers,
+ retryTimeout: cfg.ChunkRequestTimeout,
+ metrics: rts.reactor.metrics,
+ }
ctx, cancel := context.WithCancel(ctx)
diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go
index 78eb8d53a..c2cca9a7c 100644
--- a/internal/statesync/syncer.go
+++ b/internal/statesync/syncer.go
@@ -10,7 +10,6 @@ import (
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
- "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
@@ -72,31 +71,6 @@ type syncer struct {
processingSnapshot *snapshot
}
-// newSyncer creates a new syncer.
-func newSyncer(
- cfg config.StateSyncConfig,
- logger log.Logger,
- conn abciclient.Client,
- stateProvider StateProvider,
- snapshotCh *p2p.Channel,
- chunkCh *p2p.Channel,
- tempDir string,
- metrics *Metrics,
-) *syncer {
- return &syncer{
- logger: logger,
- stateProvider: stateProvider,
- conn: conn,
- snapshots: newSnapshotPool(),
- snapshotCh: snapshotCh,
- chunkCh: chunkCh,
- tempDir: tempDir,
- fetchers: cfg.Fetchers,
- retryTimeout: cfg.ChunkRequestTimeout,
- metrics: metrics,
- }
-}
-
// AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
// been added to the queue, or an error if there's no sync in progress.
func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
diff --git a/node/node.go b/node/node.go
index 34350e09f..7c3ca1268 100644
--- a/node/node.go
+++ b/node/node.go
@@ -88,12 +88,11 @@ func newDefaultNode(
}
if cfg.Mode == config.ModeSeed {
return makeSeedNode(
- ctx,
+ logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
- logger,
)
}
pval, err := makeDefaultPrivval(cfg)
@@ -244,7 +243,7 @@ func makeNode(
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
- nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
+ nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
@@ -257,24 +256,21 @@ func makeNode(
makeCloser(closers))
}
- router, err := createRouter(ctx, logger, nodeMetrics.p2p, nodeInfo, nodeKey,
- peerManager, cfg, proxyApp)
+ router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
makeCloser(closers))
}
- mpReactor, mp, err := createMempoolReactor(ctx,
- cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
- )
+ mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
+ peerManager.Subscribe, router.OpenChannel, peerManager.GetHeight)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
- evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx,
- cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
- )
+ evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider,
+ stateStore, blockStore, peerManager.Subscribe, router.OpenChannel, nodeMetrics.evidence, eventBus)
closers = append(closers, edbCloser)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -295,11 +291,12 @@ func makeNode(
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
+ waitSync := stateSync || blockSync
csReactor, csState, err := createConsensusReactor(ctx,
cfg, stateStore, blockExec, blockStore, mp, evPool,
- privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
- peerManager, router, logger,
+ privValidator, nodeMetrics.consensus, waitSync, eventBus,
+ peerManager, router.OpenChannel, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -307,23 +304,18 @@ func makeNode(
// Create the blockchain reactor. Note, we do not start block sync if we're
// doing a state sync first.
- bcReactor, err := blocksync.NewReactor(ctx,
+ bcReactor := blocksync.NewReactor(
logger.With("module", "blockchain"),
stateStore,
blockExec,
blockStore,
csReactor,
router.OpenChannel,
- peerManager.Subscribe(ctx),
+ peerManager.Subscribe,
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
- if err != nil {
- return nil, combineCloseError(
- fmt.Errorf("could not create blocksync reactor: %w", err),
- makeCloser(closers))
- }
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
@@ -337,7 +329,7 @@ func makeNode(
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
- stateSyncReactor, err := statesync.NewReactor(
+ stateSyncReactor := statesync.NewReactor(
ctx,
genDoc.ChainID,
genDoc.InitialHeight,
@@ -345,23 +337,17 @@ func makeNode(
logger.With("module", "statesync"),
proxyApp,
router.OpenChannel,
- peerManager.Subscribe(ctx),
+ peerManager.Subscribe,
stateStore,
blockStore,
cfg.StateSync.TempDir,
nodeMetrics.statesync,
eventBus,
)
- if err != nil {
- return nil, combineCloseError(err, makeCloser(closers))
- }
var pexReactor service.Service = service.NopService{}
if cfg.P2P.PexReactor {
- pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
- if err != nil {
- return nil, combineCloseError(err, makeCloser(closers))
- }
+ pexReactor = pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe)
}
node := &nodeImpl{
config: cfg,
diff --git a/node/node_test.go b/node/node_test.go
index a03d7286e..1a1fa6f81 100644
--- a/node/node_test.go
+++ b/node/node_test.go
@@ -581,12 +581,12 @@ func TestNodeNewSeedNode(t *testing.T) {
logger := log.NewNopLogger()
- ns, err := makeSeedNode(ctx,
+ ns, err := makeSeedNode(
+ logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
- logger,
)
t.Cleanup(ns.Wait)
t.Cleanup(leaktest.CheckTimeout(t, time.Second))
diff --git a/node/public.go b/node/public.go
index af3aece8e..db292833e 100644
--- a/node/public.go
+++ b/node/public.go
@@ -68,7 +68,7 @@ func New(
config.DefaultDBProvider,
logger)
case config.ModeSeed:
- return makeSeedNode(ctx, conf, config.DefaultDBProvider, nodeKey, genProvider, logger)
+ return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
default:
return nil, fmt.Errorf("%q is not a valid mode", conf.Mode)
}
diff --git a/node/seed.go b/node/seed.go
index 970896cc6..6da7ff05f 100644
--- a/node/seed.go
+++ b/node/seed.go
@@ -40,12 +40,11 @@ type seedNodeImpl struct {
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
- ctx context.Context,
+ logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
nodeKey types.NodeKey,
genesisDocProvider genesisDocProvider,
- logger log.Logger,
) (service.Service, error) {
if !cfg.P2P.PexReactor {
return nil, errors.New("cannot run seed nodes with PEX disabled")
@@ -76,19 +75,13 @@ func makeSeedNode(
closer)
}
- router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey,
- peerManager, cfg, nil)
+ router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
- pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
- if err != nil {
- return nil, combineCloseError(err, closer)
- }
-
node := &seedNodeImpl{
config: cfg,
logger: logger,
@@ -101,7 +94,7 @@ func makeSeedNode(
shutdownOps: closer,
- pexReactor: pexReactor,
+ pexReactor: pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe),
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
diff --git a/node/setup.go b/node/setup.go
index 48ffcb073..07626d611 100644
--- a/node/setup.go
+++ b/node/setup.go
@@ -169,14 +169,14 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolReactor(
- ctx context.Context,
+ logger log.Logger,
cfg *config.Config,
appClient abciclient.Client,
store sm.Store,
memplMetrics *mempool.Metrics,
- peerManager *p2p.PeerManager,
- router *p2p.Router,
- logger log.Logger,
+ peerEvents p2p.PeerEventSubscriber,
+ chCreator p2p.ChannelCreator,
+ peerHeight func(types.NodeID) int64,
) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool")
@@ -189,18 +189,14 @@ func createMempoolReactor(
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
- reactor, err := mempool.NewReactor(
- ctx,
+ reactor := mempool.NewReactor(
logger,
cfg.Mempool,
- peerManager,
mp,
- router.OpenChannel,
- peerManager.Subscribe(ctx),
+ chCreator,
+ peerEvents,
+ peerHeight,
)
- if err != nil {
- return nil, nil, err
- }
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
@@ -210,14 +206,13 @@ func createMempoolReactor(
}
func createEvidenceReactor(
- ctx context.Context,
+ logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
store sm.Store,
blockStore *store.BlockStore,
- peerManager *p2p.PeerManager,
- router *p2p.Router,
- logger log.Logger,
+ peerEvents p2p.PeerEventSubscriber,
+ chCreator p2p.ChannelCreator,
metrics *evidence.Metrics,
eventBus *eventbus.EventBus,
) (*evidence.Reactor, *evidence.Pool, closer, error) {
@@ -231,16 +226,12 @@ func createEvidenceReactor(
evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus)
- evidenceReactor, err := evidence.NewReactor(
- ctx,
+ evidenceReactor := evidence.NewReactor(
logger,
- router.OpenChannel,
- peerManager.Subscribe(ctx),
+ chCreator,
+ peerEvents,
evidencePool,
)
- if err != nil {
- return nil, nil, dbCloser, fmt.Errorf("creating evidence reactor: %w", err)
- }
return evidenceReactor, evidencePool, dbCloser, nil
}
@@ -258,7 +249,7 @@ func createConsensusReactor(
waitSync bool,
eventBus *eventbus.EventBus,
peerManager *p2p.PeerManager,
- router *p2p.Router,
+ chCreator p2p.ChannelCreator,
logger log.Logger,
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
@@ -282,20 +273,15 @@ func createConsensusReactor(
consensusState.SetPrivValidator(ctx, privValidator)
}
- reactor, err := consensus.NewReactor(
- ctx,
+ reactor := consensus.NewReactor(
logger,
consensusState,
- router.OpenChannel,
- peerManager.Subscribe(ctx),
+ chCreator,
+ peerManager.Subscribe,
eventBus,
waitSync,
csMetrics,
)
- if err != nil {
- return nil, nil, err
- }
-
return reactor, consensusState, nil
}
@@ -375,7 +361,6 @@ func createPeerManager(
}
func createRouter(
- ctx context.Context,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
@@ -405,7 +390,6 @@ func createRouter(
}
return p2p.NewRouter(
- ctx,
p2pLogger,
p2pMetrics,
nodeInfo,
@@ -422,7 +406,7 @@ func makeNodeInfo(
nodeKey types.NodeKey,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
- state sm.State,
+ versionInfo version.Consensus,
) (types.NodeInfo, error) {
txIndexerStatus := "off"
@@ -434,8 +418,8 @@ func makeNodeInfo(
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
P2P: version.P2PProtocol, // global
- Block: state.Version.Consensus.Block,
- App: state.Version.Consensus.App,
+ Block: versionInfo.Block,
+ App: versionInfo.App,
},
NodeID: nodeKey.ID,
Network: genDoc.ChainID,
From 0a23b1e51d03643a0e6103e027ae0bf522c47f96 Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Tue, 5 Apr 2022 15:47:24 +0000
Subject: [PATCH 2/3] build(deps): Bump github.com/vektra/mockery/v2 from
2.10.2 to 2.10.4 (#8250)
Bumps [github.com/vektra/mockery/v2](https://github.com/vektra/mockery) from 2.10.2 to 2.10.4.
Release notes
Sourced from github.com/vektra/mockery/v2's releases.
v2.10.4
Changelog
- c943e69 Merge pull request #441 from cfstras/fix/support-more-env-keys
- ed87cf6 fix: allow configuring flags with "-" as Env var
- 17abd96 fix: unused config field
Tags
- 53114cf test: add test for env var configurations
v2.10.3
Changelog
- ee25bcf Add/update mocks
- 4703d1a Merge pull request #444 from vektra/remove_need_deps
- ba1f213 Remove packages.NeedDeps
- ed38b20 Update go.sum
Commits
c943e69 Merge pull request #441 from cfstras/fix/support-more-env-keys
4703d1a Merge pull request #444 from vektra/remove_need_deps
ed38b20 Update go.sum
ee25bcf Add/update mocks
ba1f213 Remove packages.NeedDeps
17abd96 fix: unused config field Tags
53114cf test: add test for env var configurations
ed87cf6 fix: allow configuring flags with "-" as Env var
- See full diff in compare view
[](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.
[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)
---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
---
abci/client/mocks/client.go | 1 -
abci/types/mocks/application.go | 1 -
go.mod | 2 +-
go.sum | 4 ++--
internal/consensus/mocks/cons_sync_reactor.go | 1 -
internal/evidence/mocks/block_store.go | 1 -
internal/state/indexer/mocks/event_sink.go | 1 -
internal/state/mocks/evidence_pool.go | 1 -
internal/state/mocks/store.go | 1 -
internal/statesync/mocks/state_provider.go | 1 -
rpc/client/mocks/client.go | 23 +++++++++++++++++++
11 files changed, 26 insertions(+), 11 deletions(-)
diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go
index 7743e5897..37df53979 100644
--- a/abci/client/mocks/client.go
+++ b/abci/client/mocks/client.go
@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
-
types "github.com/tendermint/tendermint/abci/types"
)
diff --git a/abci/types/mocks/application.go b/abci/types/mocks/application.go
index 45e2f1c45..30bf0f84c 100644
--- a/abci/types/mocks/application.go
+++ b/abci/types/mocks/application.go
@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
-
types "github.com/tendermint/tendermint/abci/types"
)
diff --git a/go.mod b/go.mod
index 7951b645f..4db965b87 100644
--- a/go.mod
+++ b/go.mod
@@ -41,7 +41,7 @@ require (
github.com/creachadair/atomicfile v0.2.4
github.com/golangci/golangci-lint v1.45.2
github.com/google/go-cmp v0.5.7
- github.com/vektra/mockery/v2 v2.10.2
+ github.com/vektra/mockery/v2 v2.10.4
gotest.tools v2.2.0+incompatible
)
diff --git a/go.sum b/go.sum
index 60b4842b6..94fc35977 100644
--- a/go.sum
+++ b/go.sum
@@ -1035,8 +1035,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
-github.com/vektra/mockery/v2 v2.10.2 h1:ISAFkB3rQS6Y3aDZzAKtDwgeyDknwNa1aBE3Zgx0h+I=
-github.com/vektra/mockery/v2 v2.10.2/go.mod h1:m/WO2UzWzqgVX3nvqpRQq70I4Z7jbSCRhdmkgtp+Ab4=
+github.com/vektra/mockery/v2 v2.10.4 h1:nMdsCKIS7ZdNTRNS/77Bx6Q/UbasGcfc3Nx7JO7HGTg=
+github.com/vektra/mockery/v2 v2.10.4/go.mod h1:m/WO2UzWzqgVX3nvqpRQq70I4Z7jbSCRhdmkgtp+Ab4=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
diff --git a/internal/consensus/mocks/cons_sync_reactor.go b/internal/consensus/mocks/cons_sync_reactor.go
index b254fc701..5ac592f0d 100644
--- a/internal/consensus/mocks/cons_sync_reactor.go
+++ b/internal/consensus/mocks/cons_sync_reactor.go
@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
-
state "github.com/tendermint/tendermint/internal/state"
)
diff --git a/internal/evidence/mocks/block_store.go b/internal/evidence/mocks/block_store.go
index 5ea8d8344..ef3346b2a 100644
--- a/internal/evidence/mocks/block_store.go
+++ b/internal/evidence/mocks/block_store.go
@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
-
types "github.com/tendermint/tendermint/types"
)
diff --git a/internal/state/indexer/mocks/event_sink.go b/internal/state/indexer/mocks/event_sink.go
index 6173480dd..d5555a417 100644
--- a/internal/state/indexer/mocks/event_sink.go
+++ b/internal/state/indexer/mocks/event_sink.go
@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
-
indexer "github.com/tendermint/tendermint/internal/state/indexer"
query "github.com/tendermint/tendermint/internal/pubsub/query"
diff --git a/internal/state/mocks/evidence_pool.go b/internal/state/mocks/evidence_pool.go
index b4f42e580..04e8be7bc 100644
--- a/internal/state/mocks/evidence_pool.go
+++ b/internal/state/mocks/evidence_pool.go
@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
-
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"
diff --git a/internal/state/mocks/store.go b/internal/state/mocks/store.go
index b7a58e415..02c69d3e0 100644
--- a/internal/state/mocks/store.go
+++ b/internal/state/mocks/store.go
@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
-
state "github.com/tendermint/tendermint/internal/state"
tendermintstate "github.com/tendermint/tendermint/proto/tendermint/state"
diff --git a/internal/statesync/mocks/state_provider.go b/internal/statesync/mocks/state_provider.go
index b19a6787f..b8d681631 100644
--- a/internal/statesync/mocks/state_provider.go
+++ b/internal/statesync/mocks/state_provider.go
@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
-
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"
diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go
index ffa1d1f29..d9049ea44 100644
--- a/rpc/client/mocks/client.go
+++ b/rpc/client/mocks/client.go
@@ -411,6 +411,29 @@ func (_m *Client) DumpConsensusState(_a0 context.Context) (*coretypes.ResultDump
return r0, r1
}
+// Events provides a mock function with given fields: ctx, req
+func (_m *Client) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
+ ret := _m.Called(ctx, req)
+
+ var r0 *coretypes.ResultEvents
+ if rf, ok := ret.Get(0).(func(context.Context, *coretypes.RequestEvents) *coretypes.ResultEvents); ok {
+ r0 = rf(ctx, req)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*coretypes.ResultEvents)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(context.Context, *coretypes.RequestEvents) error); ok {
+ r1 = rf(ctx, req)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
// Genesis provides a mock function with given fields: _a0
func (_m *Client) Genesis(_a0 context.Context) (*coretypes.ResultGenesis, error) {
ret := _m.Called(_a0)
From 60f88194ec0907ad546688cf22060a14b7996d39 Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Tue, 5 Apr 2022 16:56:48 +0000
Subject: [PATCH 3/3] build(deps): Bump github.com/BurntSushi/toml from 1.0.0
to 1.1.0 (#8251)
Bumps [github.com/BurntSushi/toml](https://github.com/BurntSushi/toml) from 1.0.0 to 1.1.0.
Release notes
Sourced from github.com/BurntSushi/toml's releases.
v1.1.0
Just a few bugfixes:
-
Skip fields with toml:"-" even when they're unsupported types. Previously something like this would fail to encode due to func being an unsupported type:
struct {
Str string `toml:"str"
Func func() `toml:"-"`
}
-
Multiline strings can't end with \. This is valid:
# Valid
key = """ foo \
"""
Invalid
key = """ foo \ """
Don't quote values in TOMLMarshaler. Previously they would always include quoting (e.g. "value"), while the entire point of this interface is to bypass that.
Commits
891d261 Don't error out if a multiline string ends with an incomplete UTF-8 sequence
ef65e34 Don't run Unmarshal() through Decode()
573cad4 Merge pull request #347 from zhsj/fix-32
f3633f4 Fix test on 32 bit arch
551f4a5 Merge pull request #344 from lucasbutn/hotfix-341-marshaler-shouldnot-writequ...
dec5825 Removed write quote in marshal to allow write other types than strings
2249a9c Multiline strings can't end with ""
51b22f2 Fix README
01e5516 Skip fields with toml:"-", even when they're unsupported types
87b9f05 Fix tests for older Go versions
- See full diff in compare view
[](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.
[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)
---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
---
go.mod | 2 +-
go.sum | 3 ++-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git a/go.mod b/go.mod
index 4db965b87..3b0004ba0 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/tendermint/tendermint
go 1.17
require (
- github.com/BurntSushi/toml v1.0.0
+ github.com/BurntSushi/toml v1.1.0
github.com/adlio/schema v1.3.0
github.com/btcsuite/btcd v0.22.0-beta
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
diff --git a/go.sum b/go.sum
index 94fc35977..7374c90d0 100644
--- a/go.sum
+++ b/go.sum
@@ -68,8 +68,9 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/BurntSushi/toml v1.0.0 h1:dtDWrepsVPfW9H/4y7dDgFc2MBUSeJhlaDtK13CxFlU=
github.com/BurntSushi/toml v1.0.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
+github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=