From d5fb82e414b49d1bb27eabcaded6213fec06f5ac Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Mon, 11 Jul 2022 16:22:40 -0400 Subject: [PATCH] p2p: make p2p.Channel an interface (#8967) This is (#8446) pulled from the `main/libp2p` branch but without any of the libp2p content, and is perhaps the easiest first step to enable pluggability at the peer layer, and makes it possible hoist shims (including for, say 0.34) into tendermint without touching the reactors. --- internal/blocksync/reactor.go | 16 +++--- internal/blocksync/reactor_test.go | 6 +- internal/consensus/invalid_test.go | 2 +- internal/consensus/reactor.go | 32 +++++------ internal/consensus/reactor_test.go | 10 ++-- internal/evidence/reactor.go | 8 +-- internal/evidence/reactor_test.go | 4 +- internal/mempool/reactor.go | 8 +-- internal/mempool/reactor_test.go | 6 +- internal/p2p/channel.go | 81 +++++++++++++++++---------- internal/p2p/channel_test.go | 4 +- internal/p2p/p2ptest/network.go | 12 ++-- internal/p2p/p2ptest/require.go | 12 ++-- internal/p2p/pex/reactor.go | 9 ++- internal/p2p/pex/reactor_test.go | 17 +++--- internal/p2p/router.go | 11 ++-- internal/p2p/router_test.go | 2 +- internal/statesync/dispatcher.go | 4 +- internal/statesync/dispatcher_test.go | 4 +- internal/statesync/reactor.go | 18 +++--- internal/statesync/reactor_test.go | 14 +++-- internal/statesync/stateprovider.go | 4 +- internal/statesync/syncer.go | 4 +- 23 files changed, 157 insertions(+), 131 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 6c1c060e7..c1b032b03 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -135,7 +135,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { if err != nil { return err } - r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil } + r.chCreator = func(context.Context, *conn.ChannelDescriptor) (p2p.Channel, error) { return blockSyncCh, nil } state, err := r.stateStore.Load() if err != nil { @@ -183,7 +183,7 @@ func (r *Reactor) OnStop() { // respondToPeer loads a block and sends it to the requesting peer, if we have it. // Otherwise, we'll respond saying we do not have it. -func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error { +func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh p2p.Channel) error { block := r.store.LoadBlock(msg.Height) if block == nil { r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) @@ -223,7 +223,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -298,7 +298,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo // message execution will result in a PeerError being sent on the BlockSyncChannel. // When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) { +func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh p2p.Channel) { iter := blockSyncCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() @@ -319,7 +319,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann } // processPeerUpdate processes a PeerUpdate. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) // XXX: Pool#RedoRequest can sometimes give us an empty peer. @@ -354,7 +354,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh p2p.Channel) { for { select { case <-ctx.Done(): @@ -396,7 +396,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { return nil } -func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) { +func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh p2p.Channel) { statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) defer statusUpdateTicker.Stop() @@ -438,7 +438,7 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) // do. // // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) { +func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh p2p.Channel) { var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 141eaf7ec..3ef2ec86f 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -37,7 +37,7 @@ type reactorTestSuite struct { reactors map[types.NodeID]*Reactor app map[types.NodeID]abciclient.Client - blockSyncChannels map[types.NodeID]*p2p.Channel + blockSyncChannels map[types.NodeID]p2p.Channel peerChans map[types.NodeID]chan p2p.PeerUpdate peerUpdates map[types.NodeID]*p2p.PeerUpdates } @@ -64,7 +64,7 @@ func setup( nodes: make([]types.NodeID, 0, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes), app: make(map[types.NodeID]abciclient.Client, numNodes), - blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes), + blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } @@ -177,7 +177,7 @@ func (rts *reactorTestSuite) addNode( rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.blockSyncChannels[nodeID], nil } diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index 93c5cea1b..4685bb318 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -107,7 +107,7 @@ func invalidDoPrevoteFunc( round int32, cs *State, r *Reactor, - voteCh *p2p.Channel, + voteCh p2p.Channel, pv types.PrivValidator, ) { // routine to: diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index c353e0c73..3ba95c836 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -165,10 +165,10 @@ func NewReactor( } type channelBundle struct { - state *p2p.Channel - data *p2p.Channel - vote *p2p.Channel - votSet *p2p.Channel + state p2p.Channel + data p2p.Channel + vote p2p.Channel + votSet p2p.Channel } // OnStart starts separate go routines for each p2p Channel and listens for @@ -310,14 +310,14 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) { return ps, ok } -func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { +func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error { return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: makeRoundStepMessage(rs), }) } -func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { +func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error { psHeader := rs.ProposalBlockParts.Header() return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, @@ -331,7 +331,7 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes }) } -func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error { +func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh p2p.Channel) error { return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: &tmcons.HasVote{ @@ -346,7 +346,7 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, // subscribeToBroadcastEvents subscribes for new round steps and votes using the // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. -func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) { +func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh p2p.Channel) { onStopCh := r.state.getOnStopCh() err := r.state.evsw.AddListenerForEvent( @@ -403,7 +403,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { } } -func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error { +func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh p2p.Channel) error { return stateCh.Send(ctx, p2p.Envelope{ To: peerID, Message: makeRoundStepMessage(r.getRoundState()), @@ -433,7 +433,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState { return r.rs } -func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) { +func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh p2p.Channel) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { @@ -497,7 +497,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta time.Sleep(r.state.config.PeerGossipSleepDuration) } -func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) { +func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh p2p.Channel) { logger := r.logger.With("peer", ps.peerID) timer := time.NewTimer(0) @@ -632,7 +632,7 @@ OUTER_LOOP: // pickSendVote picks a vote and sends it to the peer. It will return true if // there is a vote to send and false otherwise. -func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) { +func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh p2p.Channel) (bool, error) { vote, ok := ps.PickVoteToSend(votes) if !ok { return false, nil @@ -660,7 +660,7 @@ func (r *Reactor) gossipVotesForHeight( rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, - voteCh *p2p.Channel, + voteCh p2p.Channel, ) (bool, error) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) @@ -732,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight( return false, nil } -func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) { +func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh p2p.Channel) { logger := r.logger.With("peer", ps.peerID) timer := time.NewTimer(0) @@ -804,7 +804,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. -func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) { +func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh p2p.Channel) { timer := time.NewTimer(0) defer timer.Stop() @@ -1015,7 +1015,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // If we fail to find the peer state for the envelope sender, we perform a no-op // and return. This can happen when we process the envelope after the peer is // removed. -func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error { +func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error { ps, ok := r.GetPeerState(envelope.From) if !ok || ps == nil { r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel") diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 96cf800bd..d848f53e7 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -46,10 +46,10 @@ type reactorTestSuite struct { reactors map[types.NodeID]*Reactor subs map[types.NodeID]eventbus.Subscription blocksyncSubs map[types.NodeID]eventbus.Subscription - stateChannels map[types.NodeID]*p2p.Channel - dataChannels map[types.NodeID]*p2p.Channel - voteChannels map[types.NodeID]*p2p.Channel - voteSetBitsChannels map[types.NodeID]*p2p.Channel + stateChannels map[types.NodeID]p2p.Channel + dataChannels map[types.NodeID]p2p.Channel + voteChannels map[types.NodeID]p2p.Channel + voteSetBitsChannels map[types.NodeID]p2p.Channel } func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor { @@ -86,7 +86,7 @@ func setup( t.Cleanup(cancel) chCreator := func(nodeID types.NodeID) p2p.ChannelCreator { - return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) { switch desc.ID { case StateChannel: return rts.stateChannels[nodeID], nil diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 1d952d30e..d0bc28b13 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -159,7 +159,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er // processEvidenceCh implements a blocking event loop where we listen for p2p // Envelope messages from the evidenceCh. -func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) { +func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) { iter := evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() @@ -186,7 +186,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel // connects/disconnects frequently from the broadcasting peer(s). // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -227,7 +227,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) { for { select { case peerUpdate := <-peerUpdates.Updates(): @@ -249,7 +249,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU // that the peer has already received or may not be ready for. // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) { +func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) { var next *clist.CElement defer func() { diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index f23195fae..92566ccc8 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -38,7 +38,7 @@ type reactorTestSuite struct { logger log.Logger reactors map[types.NodeID]*evidence.Reactor pools map[types.NodeID]*evidence.Pool - evidenceChannels map[types.NodeID]*p2p.Channel + evidenceChannels map[types.NodeID]p2p.Channel peerUpdates map[types.NodeID]*p2p.PeerUpdates peerChans map[types.NodeID]chan p2p.PeerUpdate nodes []*p2ptest.Node @@ -96,7 +96,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu) rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID]) - chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.evidenceChannels[nodeID], nil } diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 28ee9e334..62cdf386c 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -194,7 +194,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er // processMempoolCh implements a blocking event loop where we listen for p2p // Envelope messages from the mempoolCh. -func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) { +func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh p2p.Channel) { iter := mempoolCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() @@ -215,7 +215,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) // goroutine or not. If not, we start one for the newly added peer. For down or // removed peers, we remove the peer from the mempool peer ID set and signal to // stop the tx broadcasting goroutine. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -264,7 +264,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh p2p.Channel) { for { select { case <-ctx.Done(): @@ -275,7 +275,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU } } -func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) { +func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh p2p.Channel) { peerMempoolID := r.ids.GetForPeer(peerID) var nextGossipTx *clist.CElement diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 034c5eaa2..ee7fe777f 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -30,7 +30,7 @@ type reactorTestSuite struct { logger log.Logger reactors map[types.NodeID]*Reactor - mempoolChannels map[types.NodeID]*p2p.Channel + mempoolChannels map[types.NodeID]p2p.Channel mempools map[types.NodeID]*TxMempool kvstores map[types.NodeID]*kvstore.Application @@ -51,7 +51,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode logger: log.NewNopLogger().With("testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), reactors: make(map[types.NodeID]*Reactor, numNodes), - mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes), + mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes), mempools: make(map[types.NodeID]*TxMempool, numNodes), kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), @@ -75,7 +75,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.mempoolChannels[nodeID], nil } diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index e33e7faa7..394656632 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "errors" "fmt" "sync" @@ -37,6 +38,16 @@ type Wrapper interface { Unwrap() (proto.Message, error) } +type Channel interface { + fmt.Stringer + + Err() error + + Send(context.Context, Envelope) error + SendError(context.Context, PeerError) error + Receive(context.Context) *ChannelIterator +} + // PeerError is a peer error reported via Channel.Error. // // FIXME: This currently just disconnects the peer, which is too simplistic. @@ -56,9 +67,9 @@ type PeerError struct { func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) } func (pe PeerError) Unwrap() error { return pe.Err } -// Channel is a bidirectional channel to exchange Protobuf messages with peers. +// legacyChannel is a bidirectional channel to exchange Protobuf messages with peers. // Each message is wrapped in an Envelope to specify its sender and receiver. -type Channel struct { +type legacyChannel struct { ID ChannelID inCh <-chan Envelope // inbound messages (peers to reactors) outCh chan<- Envelope // outbound messages (reactors to peers) @@ -69,9 +80,10 @@ type Channel struct { // NewChannel creates a new channel. It is primarily for internal and test // use, reactors should use Router.OpenChannel(). -func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel { - return &Channel{ +func NewChannel(id ChannelID, name string, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel { + return &legacyChannel{ ID: id, + name: name, inCh: inCh, outCh: outCh, errCh: errCh, @@ -80,7 +92,7 @@ func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh // Send blocks until the envelope has been sent, or until ctx ends. // An error only occurs if the context ends before the send completes. -func (ch *Channel) Send(ctx context.Context, envelope Envelope) error { +func (ch *legacyChannel) Send(ctx context.Context, envelope Envelope) error { select { case <-ctx.Done(): return ctx.Err() @@ -89,9 +101,15 @@ func (ch *Channel) Send(ctx context.Context, envelope Envelope) error { } } +func (ch *legacyChannel) Err() error { return nil } + // SendError blocks until the given error has been sent, or ctx ends. // An error only occurs if the context ends before the send completes. -func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { +func (ch *legacyChannel) SendError(ctx context.Context, pe PeerError) error { + if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) { + return nil + } + select { case <-ctx.Done(): return ctx.Err() @@ -100,18 +118,29 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { } } -func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) } +func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) } // Receive returns a new unbuffered iterator to receive messages from ch. // The iterator runs until ctx ends. -func (ch *Channel) Receive(ctx context.Context) *ChannelIterator { +func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator { iter := &ChannelIterator{ pipe: make(chan Envelope), // unbuffered } - go func() { + go func(pipe chan<- Envelope) { defer close(iter.pipe) - iteratorWorker(ctx, ch, iter.pipe) - }() + for { + select { + case <-ctx.Done(): + return + case envelope := <-ch.inCh: + select { + case <-ctx.Done(): + return + case pipe <- envelope: + } + } + } + }(iter.pipe) return iter } @@ -126,21 +155,6 @@ type ChannelIterator struct { current *Envelope } -func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) { - for { - select { - case <-ctx.Done(): - return - case envelope := <-ch.inCh: - select { - case <-ctx.Done(): - return - case pipe <- envelope: - } - } - } -} - // Next returns true when the Envelope value has advanced, and false // when the context is canceled or iteration should stop. If an iterator has returned false, // it will never return true again. @@ -179,7 +193,7 @@ func (iter *ChannelIterator) Envelope() *Envelope { return iter.current } // // This allows the caller to consume messages from multiple channels // without needing to manage the concurrency separately. -func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator { +func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator { iter := &ChannelIterator{ pipe: make(chan Envelope), // unbuffered } @@ -187,10 +201,17 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato for _, ch := range chs { wg.Add(1) - go func(ch *Channel) { + go func(ch Channel, pipe chan<- Envelope) { defer wg.Done() - iteratorWorker(ctx, ch, iter.pipe) - }(ch) + iter := ch.Receive(ctx) + for iter.Next(ctx) { + select { + case <-ctx.Done(): + return + case pipe <- *iter.Envelope(): + } + } + }(ch, iter.pipe) } done := make(chan struct{}) diff --git a/internal/p2p/channel_test.go b/internal/p2p/channel_test.go index e06e3e77e..eeaf77db2 100644 --- a/internal/p2p/channel_test.go +++ b/internal/p2p/channel_test.go @@ -16,13 +16,13 @@ type channelInternal struct { Error chan PeerError } -func testChannel(size int) (*channelInternal, *Channel) { +func testChannel(size int) (*channelInternal, *legacyChannel) { in := &channelInternal{ In: make(chan Envelope, size), Out: make(chan Envelope, size), Error: make(chan PeerError, size), } - ch := &Channel{ + ch := &legacyChannel{ inCh: in.In, outCh: in.Out, errCh: in.Error, diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 813344915..95c040b57 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -146,8 +146,8 @@ func (n *Network) MakeChannels( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) map[types.NodeID]*p2p.Channel { - channels := map[types.NodeID]*p2p.Channel{} +) map[types.NodeID]p2p.Channel { + channels := map[types.NodeID]p2p.Channel{} for _, node := range n.Nodes { channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc) } @@ -161,8 +161,8 @@ func (n *Network) MakeChannelsNoCleanup( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) map[types.NodeID]*p2p.Channel { - channels := map[types.NodeID]*p2p.Channel{} +) map[types.NodeID]p2p.Channel { + channels := map[types.NodeID]p2p.Channel{} for _, node := range n.Nodes { channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc) } @@ -304,7 +304,7 @@ func (n *Node) MakeChannel( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) *p2p.Channel { +) p2p.Channel { ctx, cancel := context.WithCancel(ctx) channel, err := n.Router.OpenChannel(ctx, chDesc) require.NoError(t, err) @@ -321,7 +321,7 @@ func (n *Node) MakeChannelNoCleanup( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) *p2p.Channel { +) p2p.Channel { channel, err := n.Router.OpenChannel(ctx, chDesc) require.NoError(t, err) return channel diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index 885e080d4..276bff390 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -15,7 +15,7 @@ import ( ) // RequireEmpty requires that the given channel is empty. -func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) { +func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) { t.Helper() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) @@ -32,7 +32,7 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) { } // RequireReceive requires that the given envelope is received on the channel. -func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) { +func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) { t.Helper() ctx, cancel := context.WithTimeout(ctx, time.Second) @@ -54,7 +54,7 @@ func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, exp // RequireReceiveUnordered requires that the given envelopes are all received on // the channel, ignoring order. -func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) { +func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -75,7 +75,7 @@ func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Cha } // RequireSend requires that the given envelope is sent on the channel. -func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) { +func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) { tctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -93,7 +93,7 @@ func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelo func RequireSendReceive( ctx context.Context, t *testing.T, - channel *p2p.Channel, + channel p2p.Channel, peerID types.NodeID, send proto.Message, receive proto.Message, @@ -116,7 +116,7 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp } // RequireError requires that the given peer error is submitted for a peer. -func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) { +func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) { tctx, tcancel := context.WithTimeout(ctx, time.Second) defer tcancel() diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index bd4737326..87677799d 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -145,7 +145,7 @@ func (r *Reactor) OnStop() {} // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. -func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { +func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) { incoming := make(chan *p2p.Envelope) go func() { defer close(incoming) @@ -192,8 +192,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { // A request from another peer, or a response to one of our requests. dur, err := r.handlePexMessage(ctx, envelope, pexCh) if err != nil { - r.logger.Error("failed to process message", - "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -225,7 +224,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU // handlePexMessage handles envelopes sent from peers on the PexChannel. // If an update was received, a new polling interval is returned; otherwise the // duration is 0. -func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) { +func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -308,7 +307,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // that peer a request for more peer addresses. The chosen peer is moved into // the requestsSent bucket so that we will not attempt to contact them again // until they've replied or updated. -func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error { +func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error { r.mtx.Lock() defer r.mtx.Unlock() if len(r.availablePeers) == 0 { diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index ec2f03d83..07f49f0d6 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -275,7 +275,7 @@ type singleTestReactor struct { pexInCh chan p2p.Envelope pexOutCh chan p2p.Envelope pexErrCh chan p2p.PeerError - pexCh *p2p.Channel + pexCh p2p.Channel peerCh chan p2p.PeerUpdate manager *p2p.PeerManager } @@ -287,8 +287,11 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { pexInCh := make(chan p2p.Envelope, chBuf) pexOutCh := make(chan p2p.Envelope, chBuf) pexErrCh := make(chan p2p.PeerError, chBuf) + + chDesc := pex.ChannelDescriptor() pexCh := p2p.NewChannel( - p2p.ChannelID(pex.PexChannel), + chDesc.ID, + chDesc.Name, pexInCh, pexOutCh, pexErrCh, @@ -299,7 +302,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return pexCh, nil } @@ -324,7 +327,7 @@ type reactorTestSuite struct { logger log.Logger reactors map[types.NodeID]*pex.Reactor - pexChannels map[types.NodeID]*p2p.Channel + pexChannels map[types.NodeID]p2p.Channel peerChans map[types.NodeID]chan p2p.PeerUpdate peerUpdates map[types.NodeID]*p2p.PeerUpdates @@ -367,7 +370,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT logger: log.NewNopLogger().With("testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, networkOpts), reactors: make(map[types.NodeID]*pex.Reactor, realNodes), - pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes), + pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, opts.TotalNodes), total: opts.TotalNodes, @@ -388,7 +391,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.pexChannels[nodeID], nil } @@ -448,7 +451,7 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID]) - chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return r.pexChannels[nodeID], nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 0e55049c1..4f3af1346 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -239,7 +239,7 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error // ChannelCreator allows routers to construct their own channels, // either by receiving a reference to Router.OpenChannel or using some // kind shim for testing purposes. -type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error) +type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error) // OpenChannel opens a new channel for the given message type. The caller must // close the channel when done, before stopping the Router. messageType is the @@ -247,7 +247,7 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error) // implement Wrapper to automatically (un)wrap multiple message types in a // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. -func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) { +func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error) { r.channelMtx.Lock() defer r.channelMtx.Unlock() @@ -262,11 +262,10 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C queue := r.queueFactory(chDesc.RecvBufferCapacity) outCh := make(chan Envelope, chDesc.RecvBufferCapacity) errCh := make(chan PeerError, chDesc.RecvBufferCapacity) - channel := NewChannel(id, queue.dequeue(), outCh, errCh) - channel.name = chDesc.Name + channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh) var wrapper Wrapper - if w, ok := messageType.(Wrapper); ok { + if w, ok := chDesc.MessageType.(Wrapper); ok { wrapper = w } @@ -287,7 +286,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C queue.close() }() - r.routeChannel(ctx, id, outCh, errCh, wrapper) + r.routeChannel(ctx, chDesc.ID, outCh, errCh, wrapper) }() return channel, nil diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 92f56f768..dd336510c 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -26,7 +26,7 @@ import ( "github.com/tendermint/tendermint/types" ) -func echoReactor(ctx context.Context, channel *p2p.Channel) { +func echoReactor(ctx context.Context, channel p2p.Channel) { iter := channel.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 9cdb34978..e7ad73148 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -26,14 +26,14 @@ var ( // NOTE: It is not the responsibility of the dispatcher to verify the light blocks. type Dispatcher struct { // the channel with which to send light block requests on - requestCh *p2p.Channel + requestCh p2p.Channel mtx sync.Mutex // all pending calls that have been dispatched and are awaiting an answer calls map[types.NodeID]chan *types.LightBlock } -func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher { +func NewDispatcher(requestChannel p2p.Channel) *Dispatcher { return &Dispatcher{ requestCh: requestChannel, calls: make(map[types.NodeID]chan *types.LightBlock), diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 8ec074bd1..8f6783e67 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -24,13 +24,13 @@ type channelInternal struct { Error chan p2p.PeerError } -func testChannel(size int) (*channelInternal, *p2p.Channel) { +func testChannel(size int) (*channelInternal, p2p.Channel) { in := &channelInternal{ In: make(chan p2p.Envelope, size), Out: make(chan p2p.Envelope, size), Error: make(chan p2p.PeerError, size), } - return in, p2p.NewChannel(0, in.In, in.Out, in.Error) + return in, p2p.NewChannel(0, "test", in.In, in.Out, in.Error) } func TestDispatcherBasic(t *testing.T) { diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index f4d72d017..deed8d0d3 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -305,7 +305,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { return nil } - go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{ + go r.processChannels(ctx, map[p2p.ChannelID]p2p.Channel{ SnapshotChannel: snapshotCh, ChunkChannel: chunkCh, LightBlockChannel: blockCh, @@ -611,7 +611,7 @@ func (r *Reactor) backfill( // handleSnapshotMessage handles envelopes sent from peers on the // SnapshotChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. -func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error { +func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh p2p.Channel) error { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -683,7 +683,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel // handleChunkMessage handles envelopes sent from peers on the ChunkChannel. // It returns an error only if the Envelope.Message is unknown for this channel. // This should never be called outside of handleMessage. -func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error { +func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.ChunkRequest: r.logger.Debug( @@ -772,7 +772,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope return nil } -func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error { +func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.LightBlockRequest: r.logger.Info("received light block request", "height", msg.Height) @@ -829,7 +829,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env return nil } -func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error { +func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.ParamsRequest: r.logger.Debug("received consensus params request", "height", msg.Height) @@ -878,7 +878,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -912,12 +912,12 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha // encountered during message execution will result in a PeerError being sent on // the respective channel. When the reactor is stopped, we will catch the signal // and close the p2p Channel gracefully. -func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) { - // make sure that the iterator gets cleaned up in case of error +func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]p2p.Channel) { + // make sure tht the iterator gets cleaned up in case of error ctx, cancel := context.WithCancel(ctx) defer cancel() - chs := make([]*p2p.Channel, 0, len(chanTable)) + chs := make([]p2p.Channel, 0, len(chanTable)) for key := range chanTable { chs = append(chs, chanTable[key]) } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index f57e228a7..b81c1ac2c 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -40,22 +40,22 @@ type reactorTestSuite struct { conn *clientmocks.Client stateProvider *mocks.StateProvider - snapshotChannel *p2p.Channel + snapshotChannel p2p.Channel snapshotInCh chan p2p.Envelope snapshotOutCh chan p2p.Envelope snapshotPeerErrCh chan p2p.PeerError - chunkChannel *p2p.Channel + chunkChannel p2p.Channel chunkInCh chan p2p.Envelope chunkOutCh chan p2p.Envelope chunkPeerErrCh chan p2p.PeerError - blockChannel *p2p.Channel + blockChannel p2p.Channel blockInCh chan p2p.Envelope blockOutCh chan p2p.Envelope blockPeerErrCh chan p2p.PeerError - paramsChannel *p2p.Channel + paramsChannel p2p.Channel paramsInCh chan p2p.Envelope paramsOutCh chan p2p.Envelope paramsPeerErrCh chan p2p.PeerError @@ -102,6 +102,7 @@ func setup( rts.snapshotChannel = p2p.NewChannel( SnapshotChannel, + "snapshot", rts.snapshotInCh, rts.snapshotOutCh, rts.snapshotPeerErrCh, @@ -109,6 +110,7 @@ func setup( rts.chunkChannel = p2p.NewChannel( ChunkChannel, + "chunk", rts.chunkInCh, rts.chunkOutCh, rts.chunkPeerErrCh, @@ -116,6 +118,7 @@ func setup( rts.blockChannel = p2p.NewChannel( LightBlockChannel, + "lightblock", rts.blockInCh, rts.blockOutCh, rts.blockPeerErrCh, @@ -123,6 +126,7 @@ func setup( rts.paramsChannel = p2p.NewChannel( ParamsChannel, + "params", rts.paramsInCh, rts.paramsOutCh, rts.paramsPeerErrCh, @@ -133,7 +137,7 @@ func setup( cfg := config.DefaultStateSyncConfig() - chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) { switch desc.ID { case SnapshotChannel: return rts.snapshotChannel, nil diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index a796b0b2e..a8110b71b 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -208,7 +208,7 @@ type stateProviderP2P struct { sync.Mutex // light.Client is not concurrency-safe lc *light.Client initialHeight int64 - paramsSendCh *p2p.Channel + paramsSendCh p2p.Channel paramsRecvCh chan types.ConsensusParams } @@ -220,7 +220,7 @@ func NewP2PStateProvider( initialHeight int64, providers []lightprovider.Provider, trustOptions light.TrustOptions, - paramsSendCh *p2p.Channel, + paramsSendCh p2p.Channel, logger log.Logger, ) (StateProvider, error) { if len(providers) < 2 { diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 47e058c19..a09b55892 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -56,8 +56,8 @@ type syncer struct { stateProvider StateProvider conn abciclient.Client snapshots *snapshotPool - snapshotCh *p2p.Channel - chunkCh *p2p.Channel + snapshotCh p2p.Channel + chunkCh p2p.Channel tempDir string fetchers int32 retryTimeout time.Duration