node: remove channel and peer update initialization from construction (#8238)

This commit is contained in:
Sam Kleinman
2022-04-05 09:26:53 -04:00
committed by GitHub
parent 97f7021712
commit 9d1e8eaad4
25 changed files with 545 additions and 609 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -80,8 +81,8 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool
blockSyncCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@@ -94,23 +95,17 @@ type Reactor struct {
// NewReactor returns new reactor instance.
func NewReactor(
ctx context.Context,
logger log.Logger,
stateStore sm.Store,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
stateStore: stateStore,
@@ -118,14 +113,14 @@ func NewReactor(
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
peerUpdates: peerUpdates,
chCreator: channelCreator,
peerEvents: peerEvents,
metrics: metrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
return r, nil
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -136,6 +131,12 @@ func NewReactor(
// If blockSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
blockSyncCh, err := r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
state, err := r.stateStore.Load()
if err != nil {
return err
@@ -161,13 +162,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
if err := r.pool.Start(ctx); err != nil {
return err
}
go r.requestRoutine(ctx)
go r.requestRoutine(ctx, blockSyncCh)
go r.poolRoutine(ctx, false)
go r.poolRoutine(ctx, false, blockSyncCh)
}
go r.processBlockSyncCh(ctx)
go r.processPeerUpdates(ctx)
go r.processBlockSyncCh(ctx, blockSyncCh)
go r.processPeerUpdates(ctx, r.peerEvents(ctx), blockSyncCh)
return nil
}
@@ -182,7 +183,7 @@ func (r *Reactor) OnStop() {
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID) error {
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
block := r.store.LoadBlock(msg.Height)
if block != nil {
blockProto, err := block.ToProto()
@@ -191,7 +192,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
return err
}
return r.blockSyncCh.Send(ctx, p2p.Envelope{
return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
})
@@ -199,55 +200,16 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
return r.blockSyncCh.Send(ctx, p2p.Envelope{
return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
// handleBlockSyncMessage handles envelopes sent from peers on the
// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
case *bcproto.BlockRequest:
return r.respondToPeer(ctx, msg, envelope.From)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
logger.Error("failed to convert block from proto", "err", err)
return err
}
r.pool.AddBlock(envelope.From, block, block.Size())
case *bcproto.StatusRequest:
return r.blockSyncCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
Height: r.store.Height(),
Base: r.store.Base(),
},
})
case *bcproto.StatusResponse:
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
logger.Debug("peer does not have the requested block", "height", msg.Height)
default:
return fmt.Errorf("received unknown message: %T", msg)
}
return nil
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -263,7 +225,39 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case BlockSyncChannel:
err = r.handleBlockSyncMessage(ctx, envelope)
switch msg := envelope.Message.(type) {
case *bcproto.BlockRequest:
return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
r.logger.Error("failed to convert block from proto",
"peer", envelope.From,
"err", err)
return err
}
r.pool.AddBlock(envelope.From, block, block.Size())
case *bcproto.StatusRequest:
return blockSyncCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
Height: r.store.Height(),
Base: r.store.Base(),
},
})
case *bcproto.StatusResponse:
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
r.logger.Debug("peer does not have the requested block",
"peer", envelope.From,
"height", msg.Height)
default:
return fmt.Errorf("received unknown message: %T", msg)
}
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
@@ -277,17 +271,17 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// message execution will result in a PeerError being sent on the BlockSyncChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processBlockSyncCh(ctx context.Context) {
iter := r.blockSyncCh.Receive(ctx)
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
iter := blockSyncCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.blockSyncCh.ID, envelope); err != nil {
if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err)
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -298,7 +292,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
}
// processPeerUpdate processes a PeerUpdate.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
@@ -309,7 +303,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// send a status update the newly added peer
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
@@ -317,7 +311,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
},
}); err != nil {
r.pool.RemovePeer(peerUpdate.NodeID)
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerUpdate.NodeID,
Err: err,
}); err != nil {
@@ -333,13 +327,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, blockSyncCh)
}
}
}
@@ -357,13 +351,18 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
r.syncStartTime = time.Now()
go r.requestRoutine(ctx)
go r.poolRoutine(ctx, true)
bsCh, err := r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}
go r.requestRoutine(ctx, bsCh)
go r.poolRoutine(ctx, true, bsCh)
return nil
}
func (r *Reactor) requestRoutine(ctx context.Context) {
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
@@ -372,11 +371,11 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
case <-ctx.Done():
return
case request := <-r.requestsCh:
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}); err != nil {
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: request.PeerID,
Err: err,
}); err != nil {
@@ -384,14 +383,14 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
}
}
case pErr := <-r.errorsCh:
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID,
Err: pErr.err,
}); err != nil {
return
}
case <-statusUpdateTicker.C:
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
if err := blockSyncCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}); err != nil {
@@ -405,7 +404,7 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
@@ -523,7 +522,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID,
Err: err,
}); serr != nil {
@@ -532,7 +531,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID2,
Err: err,
}); serr != nil {

View File

@@ -190,20 +190,18 @@ func (rts *reactorTestSuite) addNode(
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
stateStore,
blockExec,
blockStore,
nil,
chCreator,
rts.peerUpdates[nodeID],
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
rts.blockSync,
consensus.NopMetrics(),
nil, // eventbus, can be nil
)
require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())

View File

@@ -141,8 +141,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// send two votes to all peers (1st to one half, 2nd to another half)
i := 0
for _, ps := range bzReactor.peers {
voteCh := rts.voteChannels[bzNodeID]
if i < len(bzReactor.peers)/2 {
require.NoError(t, bzReactor.voteCh.Send(ctx,
require.NoError(t, voteCh.Send(ctx,
p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
@@ -150,7 +152,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
},
}))
} else {
require.NoError(t, bzReactor.voteCh.Send(ctx,
require.NoError(t, voteCh.Send(ctx,
p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{

View File

@@ -57,7 +57,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
privVal := byzState.privValidator
byzState.doPrevote = func(ctx context.Context, height int64, round int32) {
defer close(signal)
invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, privVal)
invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, rts.voteChannels[node.NodeID], privVal)
}
byzState.mtx.Unlock()
@@ -107,6 +107,7 @@ func invalidDoPrevoteFunc(
round int32,
cs *State,
r *Reactor,
voteCh *p2p.Channel,
pv types.PrivValidator,
) {
// routine to:
@@ -155,7 +156,7 @@ func invalidDoPrevoteFunc(
count := 0
for _, peerID := range ids {
count++
err := r.voteCh.Send(ctx, p2p.Envelope{
err := voteCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &tmcons.Vote{
Vote: precommit.ToProto(),

View File

@@ -126,11 +126,8 @@ type Reactor struct {
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel
dataCh *p2p.Channel
voteCh *p2p.Channel
voteSetBitsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
}
// NewReactor returns a reference to a new consensus reactor, which implements
@@ -138,49 +135,25 @@ type Reactor struct {
// to relevant p2p Channels and a channel to listen for peer updates on. The
// reactor will close all p2p Channels when stopping.
func NewReactor(
ctx context.Context,
logger log.Logger,
cs *State,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
eventBus *eventbus.EventBus,
waitSync bool,
metrics *Metrics,
) (*Reactor, error) {
chans := getChannelDescriptors()
stateCh, err := channelCreator(ctx, chans[StateChannel])
if err != nil {
return nil, err
}
dataCh, err := channelCreator(ctx, chans[DataChannel])
if err != nil {
return nil, err
}
voteCh, err := channelCreator(ctx, chans[VoteChannel])
if err != nil {
return nil, err
}
voteSetBitsCh, err := channelCreator(ctx, chans[VoteSetBitsChannel])
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
state: cs,
waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus,
Metrics: metrics,
stateCh: stateCh,
dataCh: dataCh,
voteCh: voteCh,
voteSetBitsCh: voteSetBitsCh,
peerUpdates: peerUpdates,
readySignal: make(chan struct{}),
logger: logger,
state: cs,
waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus,
Metrics: metrics,
peerEvents: peerEvents,
chCreator: channelCreator,
readySignal: make(chan struct{}),
}
r.BaseService = *service.NewBaseService(logger, "Consensus", r)
@@ -188,7 +161,14 @@ func NewReactor(
close(r.readySignal)
}
return r, nil
return r
}
type channelBundle struct {
state *p2p.Channel
data *p2p.Channel
vote *p2p.Channel
votSet *p2p.Channel
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -198,13 +178,39 @@ func NewReactor(
func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Debug("consensus wait sync", "wait_sync", r.WaitSync())
peerUpdates := r.peerEvents(ctx)
var chBundle channelBundle
var err error
chans := getChannelDescriptors()
chBundle.state, err = r.chCreator(ctx, chans[StateChannel])
if err != nil {
return err
}
chBundle.data, err = r.chCreator(ctx, chans[DataChannel])
if err != nil {
return err
}
chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel])
if err != nil {
return err
}
chBundle.votSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel])
if err != nil {
return err
}
// start routine that computes peer statistics for evaluating peer quality
//
// TODO: Evaluate if we need this to be synchronized via WaitGroup as to not
// leak the goroutine when stopping the reactor.
go r.peerStatsRoutine(ctx)
go r.peerStatsRoutine(ctx, peerUpdates)
r.subscribeToBroadcastEvents()
r.subscribeToBroadcastEvents(chBundle.state)
go r.updateRoundStateRoutine()
if !r.WaitSync() {
@@ -213,11 +219,11 @@ func (r *Reactor) OnStart(ctx context.Context) error {
}
}
go r.processStateCh(ctx)
go r.processDataCh(ctx)
go r.processVoteCh(ctx)
go r.processVoteSetBitsCh(ctx)
go r.processPeerUpdates(ctx)
go r.processStateCh(ctx, chBundle)
go r.processDataCh(ctx, chBundle)
go r.processVoteCh(ctx, chBundle)
go r.processVoteSetBitsCh(ctx, chBundle)
go r.processPeerUpdates(ctx, peerUpdates, chBundle)
return nil
}
@@ -318,16 +324,16 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
return ps, ok
}
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState) error {
return r.stateCh.Send(ctx, p2p.Envelope{
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: makeRoundStepMessage(rs),
})
}
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState) error {
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
psHeader := rs.ProposalBlockParts.Header()
return r.stateCh.Send(ctx, p2p.Envelope{
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.NewValidBlock{
Height: rs.Height,
@@ -339,8 +345,8 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
})
}
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) error {
return r.stateCh.Send(ctx, p2p.Envelope{
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.HasVote{
Height: vote.Height,
@@ -354,14 +360,14 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote)
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
func (r *Reactor) subscribeToBroadcastEvents() {
func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventNewRoundStepValue,
func(ctx context.Context, data tmevents.EventData) error {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState), stateCh); err != nil {
return err
}
select {
@@ -382,7 +388,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
listenerIDConsensus,
types.EventValidBlockValue,
func(ctx context.Context, data tmevents.EventData) error {
return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState))
return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState), stateCh)
},
)
if err != nil {
@@ -393,7 +399,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
listenerIDConsensus,
types.EventVoteValue,
func(ctx context.Context, data tmevents.EventData) error {
return r.broadcastHasVoteMessage(ctx, data.(*types.Vote))
return r.broadcastHasVoteMessage(ctx, data.(*types.Vote), stateCh)
},
)
if err != nil {
@@ -411,8 +417,8 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
}
}
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error {
return r.stateCh.Send(ctx, p2p.Envelope{
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: makeRoundStepMessage(r.getRoundState()),
})
@@ -438,7 +444,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
return r.rs
}
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
@@ -487,7 +493,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
}
logger.Debug("sending block part for catchup", "round", prs.Round, "index", index)
_ = r.dataCh.Send(ctx, p2p.Envelope{
_ = dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.BlockPart{
Height: prs.Height, // not our height, so it does not matter.
@@ -502,7 +508,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
time.Sleep(r.state.config.PeerGossipSleepDuration)
}
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) {
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
timer := time.NewTimer(0)
@@ -534,7 +540,7 @@ OUTER_LOOP:
}
logger.Debug("sending block part", "height", prs.Height, "round", prs.Round)
if err := r.dataCh.Send(ctx, p2p.Envelope{
if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.BlockPart{
Height: rs.Height, // this tells peer that this part applies to us
@@ -581,7 +587,7 @@ OUTER_LOOP:
continue OUTER_LOOP
}
r.gossipDataForCatchup(ctx, rs, prs, ps)
r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh)
continue OUTER_LOOP
}
@@ -608,7 +614,7 @@ OUTER_LOOP:
propProto := rs.Proposal.ToProto()
logger.Debug("sending proposal", "height", prs.Height, "round", prs.Round)
if err := r.dataCh.Send(ctx, p2p.Envelope{
if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Proposal{
Proposal: *propProto,
@@ -631,7 +637,7 @@ OUTER_LOOP:
pPolProto := pPol.ToProto()
logger.Debug("sending POL", "height", prs.Height, "round", prs.Round)
if err := r.dataCh.Send(ctx, p2p.Envelope{
if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
@@ -659,14 +665,14 @@ OUTER_LOOP:
// pickSendVote picks a vote and sends it to the peer. It will return true if
// there is a vote to send and false otherwise.
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader) (bool, error) {
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
vote, ok := ps.PickVoteToSend(votes)
if !ok {
return false, nil
}
r.logger.Debug("sending vote message", "ps", ps, "vote", vote)
if err := r.voteCh.Send(ctx, p2p.Envelope{
if err := voteCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: vote.ToProto(),
@@ -687,12 +693,13 @@ func (r *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
voteCh *p2p.Channel,
) (bool, error) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
// if there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.LastCommit to send")
@@ -704,7 +711,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound)
@@ -715,7 +722,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
@@ -725,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round)); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Precommits(prs.Round) to send", "round", prs.Round)
@@ -735,7 +742,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are prevotes to send...(which are needed because of validBlock mechanism)
if prs.Round != -1 && prs.Round <= rs.Round {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
@@ -746,7 +753,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound)
@@ -758,7 +765,7 @@ func (r *Reactor) gossipVotesForHeight(
return false, nil
}
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
// XXX: simple hack to throttle logs upon sleep
@@ -790,7 +797,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// if height matches, then send LastCommit, Prevotes, and Precommits
if rs.Height == prs.Height {
if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps); err != nil {
if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil {
return
} else if ok {
continue
@@ -799,7 +806,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// special catchup logic -- if peer is lagging by height 1, send LastCommit
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked rs.LastCommit to send", "height", prs.Height)
@@ -813,7 +820,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// Load the block commit for prs.Height, which contains precommit
// signatures for prs.Height.
if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ok, err := r.pickSendVote(ctx, ps, commit); err != nil {
if ok, err := r.pickSendVote(ctx, ps, commit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked Catchup commit to send", "height", prs.Height)
@@ -847,7 +854,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
timer := time.NewTimer(0)
defer timer.Stop()
@@ -883,7 +890,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/Prevotes
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -904,7 +911,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/ProposalPOL
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -925,7 +932,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/Precommits
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -950,7 +957,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
if prs.Height <= r.state.blockStore.Height() && prs.Height >= r.state.blockStore.Base() {
// maybe send Height/CatchupCommitRound/CatchupCommit
if commit := r.state.LoadCommit(prs.Height); commit != nil {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -983,7 +990,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// be the case, and we spawn all the relevant goroutine to broadcast messages to
// the peer. During peer removal, we remove the peer for our set of peers and
// signal to all spawned goroutines to gracefully exit in a non-blocking manner.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -1024,14 +1031,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
return
}
// start goroutines for this peer
go r.gossipDataRoutine(ctx, ps)
go r.gossipVotesRoutine(ctx, ps)
go r.queryMaj23Routine(ctx, ps)
go r.gossipDataRoutine(ctx, ps, chans.data)
go r.gossipVotesRoutine(ctx, ps, chans.vote)
go r.queryMaj23Routine(ctx, ps, chans.state)
// Send our state to the peer. If we're block-syncing, broadcast a
// RoundStepMessage later upon SwitchToConsensus().
if !r.WaitSync() {
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }()
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID, chans.state) }()
}
}()
@@ -1058,7 +1065,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// If we fail to find the peer state for the envelope sender, we perform a no-op
// and return. This can happen when we process the envelope after the peer is
// removed.
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
ps, ok := r.GetPeerState(envelope.From)
if !ok || ps == nil {
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
@@ -1128,7 +1135,7 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope
eMsg.Votes = *votesProto
}
if err := r.voteSetBitsCh.Send(ctx, p2p.Envelope{
if err := voteSetCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: eMsg,
}); err != nil {
@@ -1296,7 +1303,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En
// the p2p channel.
//
// NOTE: We block on consensus state for proposals, block parts, and votes.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -1327,17 +1334,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case StateChannel:
err = r.handleStateMessage(ctx, envelope, msgI)
err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet)
case DataChannel:
err = r.handleDataMessage(ctx, envelope, msgI)
case VoteChannel:
err = r.handleVoteMessage(ctx, envelope, msgI)
case VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msgI)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
@@ -1350,13 +1353,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// execution will result in a PeerError being sent on the StateChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processStateCh(ctx context.Context) {
iter := r.stateCh.Receive(ctx)
func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) {
iter := chans.state.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err)
if serr := r.stateCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err)
if serr := chans.state.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1371,13 +1374,13 @@ func (r *Reactor) processStateCh(ctx context.Context) {
// execution will result in a PeerError being sent on the DataChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processDataCh(ctx context.Context) {
iter := r.dataCh.Receive(ctx)
func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) {
iter := chans.data.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err)
if serr := r.dataCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err)
if serr := chans.data.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1392,13 +1395,13 @@ func (r *Reactor) processDataCh(ctx context.Context) {
// execution will result in a PeerError being sent on the VoteChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processVoteCh(ctx context.Context) {
iter := r.voteCh.Receive(ctx)
func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) {
iter := chans.vote.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err)
if serr := r.voteCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err)
if serr := chans.vote.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1413,18 +1416,18 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
// execution will result in a PeerError being sent on the VoteSetBitsChannel.
// When the reactor is stopped, we will catch the signal and close the p2p
// Channel gracefully.
func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
iter := r.voteSetBitsCh.Receive(ctx)
func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle) {
iter := chans.votSet.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.voteSetBitsCh.ID, envelope); err != nil {
if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err)
if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{
r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err)
if serr := chans.votSet.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1437,18 +1440,18 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, chans channelBundle) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, chans)
}
}
}
func (r *Reactor) peerStatsRoutine(ctx context.Context) {
func (r *Reactor) peerStatsRoutine(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
if !r.IsRunning() {
r.logger.Info("stopping peerStatsRoutine")
@@ -1466,7 +1469,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
switch msg.Msg.(type) {
case *VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})
@@ -1474,7 +1477,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
case *BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})

View File

@@ -104,16 +104,15 @@ func setup(
for nodeID, node := range rts.network.Nodes {
state := states[i]
reactor, err := NewReactor(ctx,
reactor := NewReactor(
state.logger.With("node", nodeID),
state,
chCreator(nodeID),
node.MakePeerUpdates(ctx, t),
func(ctx context.Context) *p2p.PeerUpdates { return node.MakePeerUpdates(ctx, t) },
state.eventBus,
true,
NopMetrics(),
)
require.NoError(t, err)
blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: testSubscriber,

View File

@@ -47,9 +47,9 @@ type Reactor struct {
service.BaseService
logger log.Logger
evpool *Pool
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
evpool *Pool
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
mtx sync.Mutex
@@ -60,28 +60,22 @@ type Reactor struct {
// service.Service interface. It accepts a p2p Channel dedicated for handling
// envelopes with EvidenceList messages.
func NewReactor(
ctx context.Context,
logger log.Logger,
chCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
evpool *Pool,
) (*Reactor, error) {
evidenceCh, err := chCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
evpool: evpool,
evidenceCh: evidenceCh,
peerUpdates: peerUpdates,
chCreator: chCreator,
peerEvents: peerEvents,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
}
r.BaseService = *service.NewBaseService(logger, "Evidence", r)
return r, err
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -89,18 +83,20 @@ func NewReactor(
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processEvidenceCh(ctx)
go r.processPeerUpdates(ctx)
ch, err := r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}
go r.processEvidenceCh(ctx, ch)
go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch)
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
// Close the evidence db
r.evpool.Close()
}
func (r *Reactor) OnStop() { r.evpool.Close() }
// handleEvidenceMessage handles envelopes sent from peers on the EvidenceChannel.
// It returns an error only if the Envelope.Message is unknown for this channel
@@ -164,13 +160,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh.
func (r *Reactor) processEvidenceCh(ctx context.Context) {
iter := r.evidenceCh.Receive(ctx)
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
iter := evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err)
if serr := evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -191,7 +187,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
// connects/disconnects frequently from the broadcasting peer(s).
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -214,7 +210,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
if !ok {
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID)
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID, evidenceCh)
}
case p2p.PeerStatusDown:
@@ -232,11 +228,11 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, evidenceCh)
case <-ctx.Done():
return
}
@@ -254,7 +250,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) {
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
var next *clist.CElement
defer func() {
@@ -301,7 +297,7 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.
if err := r.evidenceCh.Send(ctx, p2p.Envelope{
if err := evidenceCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: evProto,
}); err != nil {

View File

@@ -92,21 +92,20 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
pu := p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.peerUpdates[nodeID] = pu
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.evidenceChannels[nodeID], nil
}
rts.reactors[nodeID], err = evidence.NewReactor(
ctx,
rts.reactors[nodeID] = evidence.NewReactor(
logger,
chCreator,
rts.peerUpdates[nodeID],
func(ctx context.Context) *p2p.PeerUpdates { return pu },
rts.pools[nodeID])
require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())

View File

@@ -40,13 +40,9 @@ type Reactor struct {
mempool *TxMempool
ids *IDs
// XXX: Currently, this is the only way to get information about a peer. Ideally,
// we rely on message-oriented communication to get necessary peer data.
// ref: https://github.com/tendermint/tendermint/issues/5670
peerMgr PeerManager
mempoolCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
getPeerHeight func(types.NodeID) int64
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
// observePanic is a function for observing panics that were recovered in methods on
// Reactor. observePanic is called with the recovered value.
@@ -58,34 +54,27 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor.
func NewReactor(
ctx context.Context,
logger log.Logger,
cfg *config.MempoolConfig,
peerMgr PeerManager,
txmp *TxMempool,
chCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
) (*Reactor, error) {
ch, err := chCreator(ctx, getChannelDescriptor(cfg))
if err != nil {
return nil, err
}
peerEvents p2p.PeerEventSubscriber,
getPeerHeight func(types.NodeID) int64,
) *Reactor {
r := &Reactor{
logger: logger,
cfg: cfg,
peerMgr: peerMgr,
mempool: txmp,
ids: NewMempoolIDs(),
mempoolCh: ch,
peerUpdates: peerUpdates,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
observePanic: defaultObservePanic,
logger: logger,
cfg: cfg,
mempool: txmp,
ids: NewMempoolIDs(),
chCreator: chCreator,
peerEvents: peerEvents,
getPeerHeight: getPeerHeight,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
observePanic: defaultObservePanic,
}
r.BaseService = *service.NewBaseService(logger, "Mempool", r)
return r, nil
return r
}
func defaultObservePanic(r interface{}) {}
@@ -119,8 +108,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Info("tx broadcasting is disabled")
}
go r.processMempoolCh(ctx)
go r.processPeerUpdates(ctx)
ch, err := r.chCreator(ctx, getChannelDescriptor(r.cfg))
if err != nil {
return err
}
go r.processMempoolCh(ctx, ch)
go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch)
return nil
}
@@ -203,13 +197,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processMempoolCh implements a blocking event loop where we listen for p2p
// Envelope messages from the mempoolCh.
func (r *Reactor) processMempoolCh(ctx context.Context) {
iter := r.mempoolCh.Receive(ctx)
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
iter := mempoolCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err)
if serr := mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -224,7 +218,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context) {
// goroutine or not. If not, we start one for the newly added peer. For down or
// removed peers, we remove the peer from the mempool peer ID set and signal to
// stop the tx broadcasting goroutine.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -252,7 +246,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
r.ids.ReserveForPeer(peerUpdate.NodeID)
// start a broadcast routine ensuring all txs are forwarded to the peer
go r.broadcastTxRoutine(pctx, peerUpdate.NodeID)
go r.broadcastTxRoutine(pctx, peerUpdate.NodeID, mempoolCh)
}
}
@@ -273,18 +267,18 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, mempoolCh)
}
}
}
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement
@@ -325,8 +319,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
memTx := nextGossipTx.Value.(*WrappedTx)
if r.peerMgr != nil {
height := r.peerMgr.GetHeight(peerID)
if r.getPeerHeight != nil {
height := r.getPeerHeight(peerID)
if height > 0 && height < memTx.height-1 {
// allow for a lag of one block
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
@@ -339,7 +333,7 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
// Send the mempool tx to the corresponding peer. Note, the peer may be
// behind and thus would not be able to process the mempool tx correctly.
if err := r.mempoolCh.Send(ctx, p2p.Envelope{
if err := mempoolCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protomem.Txs{
Txs: [][]byte{memTx.tx},

View File

@@ -79,17 +79,14 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
return rts.mempoolChannels[nodeID], nil
}
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
cfg.Mempool,
rts.network.Nodes[nodeID].PeerManager,
mempool,
chCreator,
rts.peerUpdates[nodeID],
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
rts.network.Nodes[nodeID].PeerManager.GetHeight,
)
require.NoError(t, err)
rts.nodes = append(rts.nodes, nodeID)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
@@ -179,7 +176,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
// run the router
rts.start(ctx, t)
go primaryReactor.broadcastTxRoutine(ctx, secondary)
go primaryReactor.broadcastTxRoutine(ctx, secondary, rts.mempoolChannels[primary])
wg := &sync.WaitGroup{}
for i := 0; i < 50; i++ {

View File

@@ -259,7 +259,6 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
n.logger,
p2p.NopMetrics(),
nodeInfo,

View File

@@ -828,6 +828,11 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
return addresses
}
// PeerEventSubscriber describes the type of the subscription method, to assist
// in isolating reactors specific construction and lifecycle from the
// peer manager.
type PeerEventSubscriber func(context.Context) *PeerUpdates
// Subscribe subscribes to peer updates. The caller must consume the peer
// updates in a timely fashion and close the subscription when done, otherwise
// the PeerManager will halt.

View File

@@ -80,9 +80,8 @@ type Reactor struct {
logger log.Logger
peerManager *p2p.PeerManager
pexCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
// list of available peers to loop through and send peer requests to
availablePeers map[types.NodeID]struct{}
@@ -105,30 +104,23 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor.
func NewReactor(
ctx context.Context,
logger log.Logger,
peerManager *p2p.PeerManager,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
) (*Reactor, error) {
channel, err := channelCreator(ctx, ChannelDescriptor())
if err != nil {
return nil, err
}
peerEvents p2p.PeerEventSubscriber,
) *Reactor {
r := &Reactor{
logger: logger,
peerManager: peerManager,
pexCh: channel,
peerUpdates: peerUpdates,
chCreator: channelCreator,
peerEvents: peerEvents,
availablePeers: make(map[types.NodeID]struct{}),
requestsSent: make(map[types.NodeID]struct{}),
lastReceivedRequests: make(map[types.NodeID]time.Time),
}
r.BaseService = *service.NewBaseService(logger, "PEX", r)
return r, nil
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -136,8 +128,14 @@ func NewReactor(
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processPexCh(ctx)
go r.processPeerUpdates(ctx)
channel, err := r.chCreator(ctx, ChannelDescriptor())
if err != nil {
return err
}
peerUpdates := r.peerEvents(ctx)
go r.processPexCh(ctx, channel)
go r.processPeerUpdates(ctx, peerUpdates)
return nil
}
@@ -147,11 +145,11 @@ func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func (r *Reactor) processPexCh(ctx context.Context) {
func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
incoming := make(chan *p2p.Envelope)
go func() {
defer close(incoming)
iter := r.pexCh.Receive(ctx)
iter := pexCh.Receive(ctx)
for iter.Next(ctx) {
select {
case <-ctx.Done():
@@ -177,7 +175,7 @@ func (r *Reactor) processPexCh(ctx context.Context) {
case <-timer.C:
// Send a request for more peer addresses.
if err := r.sendRequestForPeers(ctx); err != nil {
if err := r.sendRequestForPeers(ctx, pexCh); err != nil {
return
// TODO(creachadair): Do we really want to stop processing the PEX
// channel just because of an error here?
@@ -192,11 +190,11 @@ func (r *Reactor) processPexCh(ctx context.Context) {
}
// A request from another peer, or a response to one of our requests.
dur, err := r.handlePexMessage(ctx, envelope)
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
if err != nil {
r.logger.Error("failed to process message",
"ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
if serr := r.pexCh.SendError(ctx, p2p.PeerError{
"ch_id", pexCh.ID, "envelope", envelope, "err", err)
if serr := pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -213,12 +211,12 @@ func (r *Reactor) processPexCh(ctx context.Context) {
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
}
}
@@ -227,7 +225,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) {
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -246,7 +244,7 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
URL: addr.String(),
}
}
return 0, r.pexCh.Send(ctx, p2p.Envelope{
return 0, pexCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses},
})
@@ -310,7 +308,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if len(r.availablePeers) == 0 {
@@ -325,7 +323,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
break
}
if err := r.pexCh.Send(ctx, p2p.Envelope{
if err := pexCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequest{},
}); err != nil {

View File

@@ -23,8 +23,8 @@ import (
const (
checkFrequency = 500 * time.Millisecond
defaultBufferSize = 2
shortWait = 10 * time.Second
longWait = 60 * time.Second
shortWait = 5 * time.Second
longWait = 20 * time.Second
firstNode = 0
secondNode = 1
@@ -211,7 +211,8 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
require.Eventually(t, func() bool {
// nolint:scopelint
return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9
}, longWait, checkFrequency)
}, longWait, checkFrequency,
"peer ratio is: %f", testNet.network.Nodes[nodeID].PeerManager.PeerRatio())
}
}
@@ -303,8 +304,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
return pexCh, nil
}
reactor, err := pex.NewReactor(ctx, log.NewNopLogger(), peerManager, chCreator, peerUpdates)
require.NoError(t, err)
reactor := pex.NewReactor(log.NewNopLogger(), peerManager, chCreator, func(_ context.Context) *p2p.PeerUpdates { return peerUpdates })
require.NoError(t, reactor.Start(ctx))
t.Cleanup(reactor.Wait)
@@ -381,6 +381,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
idx := 0
for nodeID := range rts.network.Nodes {
// make a copy to avoid getting hit by the range ref
// confusion:
nodeID := nodeID
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
@@ -393,15 +397,12 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
if idx < opts.MockNodes {
rts.mocks = append(rts.mocks, nodeID)
} else {
var err error
rts.reactors[nodeID], err = pex.NewReactor(
ctx,
rts.reactors[nodeID] = pex.NewReactor(
rts.logger.With("nodeID", nodeID),
rts.network.Nodes[nodeID].PeerManager,
chCreator,
rts.peerUpdates[nodeID],
func(_ context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
)
require.NoError(t, err)
}
rts.nodes = append(rts.nodes, nodeID)
@@ -426,9 +427,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
func (r *reactorTestSuite) start(ctx context.Context, t *testing.T) {
t.Helper()
for _, reactor := range r.reactors {
for name, reactor := range r.reactors {
require.NoError(t, reactor.Start(ctx))
require.True(t, reactor.IsRunning())
t.Log("started", name)
}
}
@@ -451,15 +453,12 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
return r.pexChannels[nodeID], nil
}
var err error
r.reactors[nodeID], err = pex.NewReactor(
ctx,
r.reactors[nodeID] = pex.NewReactor(
r.logger.With("nodeID", nodeID),
r.network.Nodes[nodeID].PeerManager,
chCreator,
r.peerUpdates[nodeID],
func(_ context.Context) *p2p.PeerUpdates { return r.peerUpdates[nodeID] },
)
require.NoError(t, err)
r.nodes = append(r.nodes, nodeID)
r.total++
}

View File

@@ -177,7 +177,6 @@ type Router struct {
// listening on appropriate interfaces, and will be closed by the Router when it
// stops.
func NewRouter(
ctx context.Context,
logger log.Logger,
metrics *Metrics,
nodeInfo types.NodeInfo,
@@ -215,13 +214,6 @@ func NewRouter(
router.BaseService = service.NewBaseService(logger, "router", router)
qf, err := router.createQueueFactory(ctx)
if err != nil {
return nil, err
}
router.queueFactory = qf
for _, transport := range transports {
for _, protocol := range transport.Protocols() {
if _, ok := router.protocolTransports[protocol]; !ok {
@@ -941,8 +933,22 @@ func (r *Router) NodeInfo() types.NodeInfo {
return r.nodeInfo.Copy()
}
func (r *Router) setupQueueFactory(ctx context.Context) error {
qf, err := r.createQueueFactory(ctx)
if err != nil {
return err
}
r.queueFactory = qf
return nil
}
// OnStart implements service.Service.
func (r *Router) OnStart(ctx context.Context) error {
if err := r.setupQueueFactory(ctx); err != nil {
return err
}
for _, transport := range r.transports {
for _, endpoint := range r.endpoints {
if err := transport.Listen(endpoint); err != nil {

View File

@@ -23,29 +23,35 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
q, ok := r.queueFactory(1).(*pqScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
_, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})

View File

@@ -106,7 +106,6 @@ func TestRouter_Channel_Basic(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -409,7 +408,6 @@ func TestRouter_AcceptPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -464,7 +462,6 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -502,7 +499,6 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -554,7 +550,6 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -658,7 +653,6 @@ func TestRouter_DialPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -744,7 +738,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
require.True(t, added)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -819,7 +812,6 @@ func TestRouter_EvictPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -882,7 +874,6 @@ func TestRouter_ChannelCompatability(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
@@ -938,7 +929,6 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,

View File

@@ -16,7 +16,6 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -139,13 +138,11 @@ type Reactor struct {
stateStore sm.Store
blockStore *store.BlockStore
conn abciclient.Client
tempDir string
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
blockCh *p2p.Channel
paramsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
conn abciclient.Client
tempDir string
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
sendBlockError func(context.Context, p2p.PeerError) error
// Dispatcher is used to multiplex light block requests and responses over multiple
// peers used by the p2p state provider and in reverse sync.
@@ -155,10 +152,13 @@ type Reactor struct {
// These will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the syncer and manage incoming and outgoing
// providers.
mtx sync.RWMutex
syncer *syncer
providers map[types.NodeID]*BlockProvider
stateProvider StateProvider
mtx sync.RWMutex
initSyncer func() *syncer
requestSnaphot func() error
syncer *syncer
providers map[types.NodeID]*BlockProvider
initStateProvider func(ctx context.Context, chainID string, initialHeight int64) error
stateProvider StateProvider
eventBus *eventbus.EventBus
metrics *Metrics
@@ -178,32 +178,13 @@ func NewReactor(
logger log.Logger,
conn abciclient.Client,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
stateStore sm.Store,
blockStore *store.BlockStore,
tempDir string,
ssMetrics *Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
chDesc := getChannelDescriptors()
snapshotCh, err := channelCreator(ctx, chDesc[SnapshotChannel])
if err != nil {
return nil, err
}
chunkCh, err := channelCreator(ctx, chDesc[ChunkChannel])
if err != nil {
return nil, err
}
blockCh, err := channelCreator(ctx, chDesc[LightBlockChannel])
if err != nil {
return nil, err
}
paramsCh, err := channelCreator(ctx, chDesc[ParamsChannel])
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
@@ -211,23 +192,19 @@ func NewReactor(
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
snapshotCh: snapshotCh,
chunkCh: chunkCh,
blockCh: blockCh,
paramsCh: paramsCh,
peerUpdates: peerUpdates,
chCreator: channelCreator,
peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
dispatcher: NewDispatcher(blockCh),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
return r, nil
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -237,8 +214,91 @@ func NewReactor(
// The caller must be sure to execute OnStop to ensure the outbound p2p Channels are
// closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processChannels(ctx, r.snapshotCh, r.chunkCh, r.blockCh, r.paramsCh)
go r.processPeerUpdates(ctx)
// construct channels
chDesc := getChannelDescriptors()
snapshotCh, err := r.chCreator(ctx, chDesc[SnapshotChannel])
if err != nil {
return err
}
chunkCh, err := r.chCreator(ctx, chDesc[ChunkChannel])
if err != nil {
return err
}
blockCh, err := r.chCreator(ctx, chDesc[LightBlockChannel])
if err != nil {
return err
}
paramsCh, err := r.chCreator(ctx, chDesc[ParamsChannel])
if err != nil {
return err
}
// define constructor and helper functions, that hold
// references to these channels for use later. This is not
// ideal.
r.initSyncer = func() *syncer {
return &syncer{
logger: r.logger,
stateProvider: r.stateProvider,
conn: r.conn,
snapshots: newSnapshotPool(),
snapshotCh: snapshotCh,
chunkCh: chunkCh,
tempDir: r.tempDir,
fetchers: r.cfg.Fetchers,
retryTimeout: r.cfg.ChunkRequestTimeout,
metrics: r.metrics,
}
}
r.dispatcher = NewDispatcher(blockCh)
r.requestSnaphot = func() error {
// request snapshots from all currently connected peers
return snapshotCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
})
}
r.sendBlockError = blockCh.SendError
r.initStateProvider = func(ctx context.Context, chainID string, initialHeight int64) error {
to := light.TrustOptions{
Period: r.cfg.TrustPeriod,
Height: r.cfg.TrustHeight,
Hash: r.cfg.TrustHashBytes(),
}
spLogger := r.logger.With("module", "stateprovider")
spLogger.Info("initializing state provider", "trustPeriod", to.Period,
"trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
if r.cfg.UseP2P {
if err := r.waitForEnoughPeers(ctx, 2); err != nil {
return err
}
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
}
stateProvider, err := NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, paramsCh, r.logger.With("module", "stateprovider"))
if err != nil {
return fmt.Errorf("failed to initialize P2P state provider: %w", err)
}
r.stateProvider = stateProvider
return nil
}
stateProvider, err := NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
if err != nil {
return fmt.Errorf("failed to initialize RPC state provider: %w", err)
}
r.stateProvider = stateProvider
return nil
}
go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
go r.processPeerUpdates(ctx, r.peerEvents(ctx))
return nil
}
@@ -281,16 +341,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
return sm.State{}, err
}
r.syncer = newSyncer(
r.cfg,
r.logger,
r.conn,
r.stateProvider,
r.snapshotCh,
r.chunkCh,
r.tempDir,
r.metrics,
)
r.syncer = r.initSyncer()
r.mtx.Unlock()
defer func() {
@@ -301,15 +352,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.mtx.Unlock()
}()
requestSnapshotsHook := func() error {
// request snapshots from all currently connected peers
return r.snapshotCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
})
}
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, r.requestSnaphot)
if err != nil {
return sm.State{}, err
}
@@ -436,7 +479,7 @@ func (r *Reactor) backfill(
r.logger.Info("backfill: fetched light block failed validate basic, removing peer...",
"err", err, "height", height)
queue.retry(height)
if serr := r.blockCh.SendError(ctx, p2p.PeerError{
if serr := r.sendBlockError(ctx, p2p.PeerError{
NodeID: peer,
Err: fmt.Errorf("received invalid light block: %w", err),
}); serr != nil {
@@ -473,7 +516,7 @@ func (r *Reactor) backfill(
if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
r.logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
"trustedHash", w, "receivedHash", g, "height", resp.block.Height)
if err := r.blockCh.SendError(ctx, p2p.PeerError{
if err := r.sendBlockError(ctx, p2p.PeerError{
NodeID: resp.peer,
Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
}); err != nil {
@@ -534,7 +577,7 @@ func (r *Reactor) backfill(
// handleSnapshotMessage handles envelopes sent from peers on the
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -553,7 +596,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"peer", envelope.From,
)
if err := r.snapshotCh.Send(ctx, p2p.Envelope{
if err := snapshotCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
@@ -589,8 +632,8 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"failed to add snapshot",
"height", msg.Height,
"format", msg.Format,
"channel", snapshotCh.ID,
"err", err,
"channel", r.snapshotCh.ID,
)
return nil
}
@@ -606,7 +649,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
// It returns an error only if the Envelope.Message is unknown for this channel.
// This should never be called outside of handleMessage.
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ChunkRequest:
r.logger.Debug(
@@ -640,7 +683,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
"chunk", msg.Index,
"peer", envelope.From,
)
if err := r.chunkCh.Send(ctx, p2p.Envelope{
if err := chunkCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
@@ -695,7 +738,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
return nil
}
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.LightBlockRequest:
r.logger.Info("received light block request", "height", msg.Height)
@@ -705,7 +748,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return err
}
if lb == nil {
if err := r.blockCh.Send(ctx, p2p.Envelope{
if err := blockCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
@@ -724,7 +767,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
// NOTE: If we don't have the light block we will send a nil light block
// back to the requested node, indicating that we don't have it.
if err := r.blockCh.Send(ctx, p2p.Envelope{
if err := blockCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: lbproto,
@@ -752,7 +795,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return nil
}
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ParamsRequest:
r.logger.Debug("received consensus params request", "height", msg.Height)
@@ -763,7 +806,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
}
cpproto := cp.ToProto()
if err := r.paramsCh.Send(ctx, p2p.Envelope{
if err := paramsCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.ParamsResponse{
Height: msg.Height,
@@ -801,7 +844,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -817,13 +860,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
switch envelope.ChannelID {
case SnapshotChannel:
err = r.handleSnapshotMessage(ctx, envelope)
err = r.handleSnapshotMessage(ctx, envelope, chans[SnapshotChannel])
case ChunkChannel:
err = r.handleChunkMessage(ctx, envelope)
err = r.handleChunkMessage(ctx, envelope, chans[ChunkChannel])
case LightBlockChannel:
err = r.handleLightBlockMessage(ctx, envelope)
err = r.handleLightBlockMessage(ctx, envelope, chans[LightBlockChannel])
case ParamsChannel:
err = r.handleParamsMessage(ctx, envelope)
err = r.handleParamsMessage(ctx, envelope, chans[ParamsChannel])
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
@@ -840,7 +883,7 @@ func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chanTable := make(map[conn.ChannelID]*p2p.Channel, len(chs))
chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs))
for idx := range chs {
ch := chs[idx]
chanTable[ch.ID] = ch
@@ -849,7 +892,7 @@ func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
iter := p2p.MergedChannelIterator(ctx, chs...)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, envelope); err != nil {
if err := r.handleMessage(ctx, envelope, chanTable); err != nil {
ch, ok := chanTable[envelope.ChannelID]
if !ok {
r.logger.Error("received impossible message",
@@ -930,12 +973,12 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
}
}
@@ -1040,41 +1083,6 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error {
return nil
}
func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
var err error
to := light.TrustOptions{
Period: r.cfg.TrustPeriod,
Height: r.cfg.TrustHeight,
Hash: r.cfg.TrustHashBytes(),
}
spLogger := r.logger.With("module", "stateprovider")
spLogger.Info("initializing state provider", "trustPeriod", to.Period,
"trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
if r.cfg.UseP2P {
if err := r.waitForEnoughPeers(ctx, 2); err != nil {
return err
}
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
}
r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh, spLogger)
if err != nil {
return fmt.Errorf("failed to initialize P2P state provider: %w", err)
}
} else {
r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
if err != nil {
return fmt.Errorf("failed to initialize RPC state provider: %w", err)
}
}
return nil
}
func (r *Reactor) TotalSnapshots() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()

View File

@@ -154,8 +154,7 @@ func setup(
logger := log.NewNopLogger()
var err error
rts.reactor, err = NewReactor(
rts.reactor = NewReactor(
ctx,
factory.DefaultTestChainID,
1,
@@ -163,25 +162,26 @@ func setup(
logger.With("component", "reactor"),
conn,
chCreator,
rts.peerUpdates,
func(context.Context) *p2p.PeerUpdates { return rts.peerUpdates },
rts.stateStore,
rts.blockStore,
"",
m,
nil, // eventbus can be nil
)
require.NoError(t, err)
rts.syncer = newSyncer(
*cfg,
logger.With("component", "syncer"),
conn,
stateProvider,
rts.snapshotChannel,
rts.chunkChannel,
"",
rts.reactor.metrics,
)
rts.syncer = &syncer{
logger: logger,
stateProvider: stateProvider,
conn: conn,
snapshots: newSnapshotPool(),
snapshotCh: rts.snapshotChannel,
chunkCh: rts.chunkChannel,
tempDir: t.TempDir(),
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: rts.reactor.metrics,
}
ctx, cancel := context.WithCancel(ctx)

View File

@@ -10,7 +10,6 @@ import (
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
@@ -72,31 +71,6 @@ type syncer struct {
processingSnapshot *snapshot
}
// newSyncer creates a new syncer.
func newSyncer(
cfg config.StateSyncConfig,
logger log.Logger,
conn abciclient.Client,
stateProvider StateProvider,
snapshotCh *p2p.Channel,
chunkCh *p2p.Channel,
tempDir string,
metrics *Metrics,
) *syncer {
return &syncer{
logger: logger,
stateProvider: stateProvider,
conn: conn,
snapshots: newSnapshotPool(),
snapshotCh: snapshotCh,
chunkCh: chunkCh,
tempDir: tempDir,
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: metrics,
}
}
// AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
// been added to the queue, or an error if there's no sync in progress.
func (s *syncer) AddChunk(chunk *chunk) (bool, error) {

View File

@@ -88,12 +88,11 @@ func newDefaultNode(
}
if cfg.Mode == config.ModeSeed {
return makeSeedNode(
ctx,
logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
logger,
)
}
pval, err := makeDefaultPrivval(cfg)
@@ -244,7 +243,7 @@ func makeNode(
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
@@ -257,24 +256,21 @@ func makeNode(
makeCloser(closers))
}
router, err := createRouter(ctx, logger, nodeMetrics.p2p, nodeInfo, nodeKey,
peerManager, cfg, proxyApp)
router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
makeCloser(closers))
}
mpReactor, mp, err := createMempoolReactor(ctx,
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
)
mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
peerManager.Subscribe, router.OpenChannel, peerManager.GetHeight)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider,
stateStore, blockStore, peerManager.Subscribe, router.OpenChannel, nodeMetrics.evidence, eventBus)
closers = append(closers, edbCloser)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -295,11 +291,12 @@ func makeNode(
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
waitSync := stateSync || blockSync
csReactor, csState, err := createConsensusReactor(ctx,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger,
privValidator, nodeMetrics.consensus, waitSync, eventBus,
peerManager, router.OpenChannel, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -307,23 +304,18 @@ func makeNode(
// Create the blockchain reactor. Note, we do not start block sync if we're
// doing a state sync first.
bcReactor, err := blocksync.NewReactor(ctx,
bcReactor := blocksync.NewReactor(
logger.With("module", "blockchain"),
stateStore,
blockExec,
blockStore,
csReactor,
router.OpenChannel,
peerManager.Subscribe(ctx),
peerManager.Subscribe,
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("could not create blocksync reactor: %w", err),
makeCloser(closers))
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
@@ -337,7 +329,7 @@ func makeNode(
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactor, err := statesync.NewReactor(
stateSyncReactor := statesync.NewReactor(
ctx,
genDoc.ChainID,
genDoc.InitialHeight,
@@ -345,23 +337,17 @@ func makeNode(
logger.With("module", "statesync"),
proxyApp,
router.OpenChannel,
peerManager.Subscribe(ctx),
peerManager.Subscribe,
stateStore,
blockStore,
cfg.StateSync.TempDir,
nodeMetrics.statesync,
eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
var pexReactor service.Service = service.NopService{}
if cfg.P2P.PexReactor {
pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
pexReactor = pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe)
}
node := &nodeImpl{
config: cfg,

View File

@@ -581,12 +581,12 @@ func TestNodeNewSeedNode(t *testing.T) {
logger := log.NewNopLogger()
ns, err := makeSeedNode(ctx,
ns, err := makeSeedNode(
logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
logger,
)
t.Cleanup(ns.Wait)
t.Cleanup(leaktest.CheckTimeout(t, time.Second))

View File

@@ -68,7 +68,7 @@ func New(
config.DefaultDBProvider,
logger)
case config.ModeSeed:
return makeSeedNode(ctx, conf, config.DefaultDBProvider, nodeKey, genProvider, logger)
return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
default:
return nil, fmt.Errorf("%q is not a valid mode", conf.Mode)
}

View File

@@ -40,12 +40,11 @@ type seedNodeImpl struct {
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
nodeKey types.NodeKey,
genesisDocProvider genesisDocProvider,
logger log.Logger,
) (service.Service, error) {
if !cfg.P2P.PexReactor {
return nil, errors.New("cannot run seed nodes with PEX disabled")
@@ -76,19 +75,13 @@ func makeSeedNode(
closer)
}
router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey,
peerManager, cfg, nil)
router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, closer)
}
node := &seedNodeImpl{
config: cfg,
logger: logger,
@@ -101,7 +94,7 @@ func makeSeedNode(
shutdownOps: closer,
pexReactor: pexReactor,
pexReactor: pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe),
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)

View File

@@ -169,14 +169,14 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolReactor(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
appClient abciclient.Client,
store sm.Store,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
peerEvents p2p.PeerEventSubscriber,
chCreator p2p.ChannelCreator,
peerHeight func(types.NodeID) int64,
) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool")
@@ -189,18 +189,14 @@ func createMempoolReactor(
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
reactor, err := mempool.NewReactor(
ctx,
reactor := mempool.NewReactor(
logger,
cfg.Mempool,
peerManager,
mp,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerEvents,
peerHeight,
)
if err != nil {
return nil, nil, err
}
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
@@ -210,14 +206,13 @@ func createMempoolReactor(
}
func createEvidenceReactor(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
store sm.Store,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
peerEvents p2p.PeerEventSubscriber,
chCreator p2p.ChannelCreator,
metrics *evidence.Metrics,
eventBus *eventbus.EventBus,
) (*evidence.Reactor, *evidence.Pool, closer, error) {
@@ -231,16 +226,12 @@ func createEvidenceReactor(
evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus)
evidenceReactor, err := evidence.NewReactor(
ctx,
evidenceReactor := evidence.NewReactor(
logger,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerEvents,
evidencePool,
)
if err != nil {
return nil, nil, dbCloser, fmt.Errorf("creating evidence reactor: %w", err)
}
return evidenceReactor, evidencePool, dbCloser, nil
}
@@ -258,7 +249,7 @@ func createConsensusReactor(
waitSync bool,
eventBus *eventbus.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
chCreator p2p.ChannelCreator,
logger log.Logger,
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
@@ -282,20 +273,15 @@ func createConsensusReactor(
consensusState.SetPrivValidator(ctx, privValidator)
}
reactor, err := consensus.NewReactor(
ctx,
reactor := consensus.NewReactor(
logger,
consensusState,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerManager.Subscribe,
eventBus,
waitSync,
csMetrics,
)
if err != nil {
return nil, nil, err
}
return reactor, consensusState, nil
}
@@ -375,7 +361,6 @@ func createPeerManager(
}
func createRouter(
ctx context.Context,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
@@ -405,7 +390,6 @@ func createRouter(
}
return p2p.NewRouter(
ctx,
p2pLogger,
p2pMetrics,
nodeInfo,
@@ -422,7 +406,7 @@ func makeNodeInfo(
nodeKey types.NodeKey,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
state sm.State,
versionInfo version.Consensus,
) (types.NodeInfo, error) {
txIndexerStatus := "off"
@@ -434,8 +418,8 @@ func makeNodeInfo(
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
P2P: version.P2PProtocol, // global
Block: state.Version.Consensus.Block,
App: state.Version.Consensus.App,
Block: versionInfo.Block,
App: versionInfo.App,
},
NodeID: nodeKey.ID,
Network: genDoc.ChainID,