diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 6c1c060e7..c1b032b03 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -135,7 +135,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { if err != nil { return err } - r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil } + r.chCreator = func(context.Context, *conn.ChannelDescriptor) (p2p.Channel, error) { return blockSyncCh, nil } state, err := r.stateStore.Load() if err != nil { @@ -183,7 +183,7 @@ func (r *Reactor) OnStop() { // respondToPeer loads a block and sends it to the requesting peer, if we have it. // Otherwise, we'll respond saying we do not have it. -func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error { +func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh p2p.Channel) error { block := r.store.LoadBlock(msg.Height) if block == nil { r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) @@ -223,7 +223,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -298,7 +298,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo // message execution will result in a PeerError being sent on the BlockSyncChannel. // When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) { +func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh p2p.Channel) { iter := blockSyncCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() @@ -319,7 +319,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann } // processPeerUpdate processes a PeerUpdate. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) // XXX: Pool#RedoRequest can sometimes give us an empty peer. @@ -354,7 +354,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh p2p.Channel) { for { select { case <-ctx.Done(): @@ -396,7 +396,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { return nil } -func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) { +func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh p2p.Channel) { statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) defer statusUpdateTicker.Stop() @@ -438,7 +438,7 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) // do. // // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) { +func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh p2p.Channel) { var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 141eaf7ec..3ef2ec86f 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -37,7 +37,7 @@ type reactorTestSuite struct { reactors map[types.NodeID]*Reactor app map[types.NodeID]abciclient.Client - blockSyncChannels map[types.NodeID]*p2p.Channel + blockSyncChannels map[types.NodeID]p2p.Channel peerChans map[types.NodeID]chan p2p.PeerUpdate peerUpdates map[types.NodeID]*p2p.PeerUpdates } @@ -64,7 +64,7 @@ func setup( nodes: make([]types.NodeID, 0, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes), app: make(map[types.NodeID]abciclient.Client, numNodes), - blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes), + blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } @@ -177,7 +177,7 @@ func (rts *reactorTestSuite) addNode( rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.blockSyncChannels[nodeID], nil } diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index 93c5cea1b..4685bb318 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -107,7 +107,7 @@ func invalidDoPrevoteFunc( round int32, cs *State, r *Reactor, - voteCh *p2p.Channel, + voteCh p2p.Channel, pv types.PrivValidator, ) { // routine to: diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index c353e0c73..3ba95c836 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -165,10 +165,10 @@ func NewReactor( } type channelBundle struct { - state *p2p.Channel - data *p2p.Channel - vote *p2p.Channel - votSet *p2p.Channel + state p2p.Channel + data p2p.Channel + vote p2p.Channel + votSet p2p.Channel } // OnStart starts separate go routines for each p2p Channel and listens for @@ -310,14 +310,14 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) { return ps, ok } -func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { +func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error { return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: makeRoundStepMessage(rs), }) } -func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { +func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error { psHeader := rs.ProposalBlockParts.Header() return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, @@ -331,7 +331,7 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes }) } -func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error { +func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh p2p.Channel) error { return stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: &tmcons.HasVote{ @@ -346,7 +346,7 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, // subscribeToBroadcastEvents subscribes for new round steps and votes using the // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. -func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) { +func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh p2p.Channel) { onStopCh := r.state.getOnStopCh() err := r.state.evsw.AddListenerForEvent( @@ -403,7 +403,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { } } -func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error { +func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh p2p.Channel) error { return stateCh.Send(ctx, p2p.Envelope{ To: peerID, Message: makeRoundStepMessage(r.getRoundState()), @@ -433,7 +433,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState { return r.rs } -func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) { +func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh p2p.Channel) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { @@ -497,7 +497,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta time.Sleep(r.state.config.PeerGossipSleepDuration) } -func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) { +func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh p2p.Channel) { logger := r.logger.With("peer", ps.peerID) timer := time.NewTimer(0) @@ -632,7 +632,7 @@ OUTER_LOOP: // pickSendVote picks a vote and sends it to the peer. It will return true if // there is a vote to send and false otherwise. -func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) { +func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh p2p.Channel) (bool, error) { vote, ok := ps.PickVoteToSend(votes) if !ok { return false, nil @@ -660,7 +660,7 @@ func (r *Reactor) gossipVotesForHeight( rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, - voteCh *p2p.Channel, + voteCh p2p.Channel, ) (bool, error) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) @@ -732,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight( return false, nil } -func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) { +func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh p2p.Channel) { logger := r.logger.With("peer", ps.peerID) timer := time.NewTimer(0) @@ -804,7 +804,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. -func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) { +func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh p2p.Channel) { timer := time.NewTimer(0) defer timer.Stop() @@ -1015,7 +1015,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // If we fail to find the peer state for the envelope sender, we perform a no-op // and return. This can happen when we process the envelope after the peer is // removed. -func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error { +func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error { ps, ok := r.GetPeerState(envelope.From) if !ok || ps == nil { r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel") diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 96cf800bd..d848f53e7 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -46,10 +46,10 @@ type reactorTestSuite struct { reactors map[types.NodeID]*Reactor subs map[types.NodeID]eventbus.Subscription blocksyncSubs map[types.NodeID]eventbus.Subscription - stateChannels map[types.NodeID]*p2p.Channel - dataChannels map[types.NodeID]*p2p.Channel - voteChannels map[types.NodeID]*p2p.Channel - voteSetBitsChannels map[types.NodeID]*p2p.Channel + stateChannels map[types.NodeID]p2p.Channel + dataChannels map[types.NodeID]p2p.Channel + voteChannels map[types.NodeID]p2p.Channel + voteSetBitsChannels map[types.NodeID]p2p.Channel } func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor { @@ -86,7 +86,7 @@ func setup( t.Cleanup(cancel) chCreator := func(nodeID types.NodeID) p2p.ChannelCreator { - return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) { switch desc.ID { case StateChannel: return rts.stateChannels[nodeID], nil diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 1d952d30e..d0bc28b13 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -159,7 +159,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er // processEvidenceCh implements a blocking event loop where we listen for p2p // Envelope messages from the evidenceCh. -func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) { +func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) { iter := evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() @@ -186,7 +186,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel // connects/disconnects frequently from the broadcasting peer(s). // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -227,7 +227,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) { for { select { case peerUpdate := <-peerUpdates.Updates(): @@ -249,7 +249,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU // that the peer has already received or may not be ready for. // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) { +func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) { var next *clist.CElement defer func() { diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index f23195fae..92566ccc8 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -38,7 +38,7 @@ type reactorTestSuite struct { logger log.Logger reactors map[types.NodeID]*evidence.Reactor pools map[types.NodeID]*evidence.Pool - evidenceChannels map[types.NodeID]*p2p.Channel + evidenceChannels map[types.NodeID]p2p.Channel peerUpdates map[types.NodeID]*p2p.PeerUpdates peerChans map[types.NodeID]chan p2p.PeerUpdate nodes []*p2ptest.Node @@ -96,7 +96,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu) rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID]) - chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.evidenceChannels[nodeID], nil } diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 28ee9e334..62cdf386c 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -194,7 +194,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er // processMempoolCh implements a blocking event loop where we listen for p2p // Envelope messages from the mempoolCh. -func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) { +func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh p2p.Channel) { iter := mempoolCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() @@ -215,7 +215,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) // goroutine or not. If not, we start one for the newly added peer. For down or // removed peers, we remove the peer from the mempool peer ID set and signal to // stop the tx broadcasting goroutine. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh p2p.Channel) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -264,7 +264,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh p2p.Channel) { for { select { case <-ctx.Done(): @@ -275,7 +275,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU } } -func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) { +func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh p2p.Channel) { peerMempoolID := r.ids.GetForPeer(peerID) var nextGossipTx *clist.CElement diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 034c5eaa2..ee7fe777f 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -30,7 +30,7 @@ type reactorTestSuite struct { logger log.Logger reactors map[types.NodeID]*Reactor - mempoolChannels map[types.NodeID]*p2p.Channel + mempoolChannels map[types.NodeID]p2p.Channel mempools map[types.NodeID]*TxMempool kvstores map[types.NodeID]*kvstore.Application @@ -51,7 +51,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode logger: log.NewNopLogger().With("testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), reactors: make(map[types.NodeID]*Reactor, numNodes), - mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes), + mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes), mempools: make(map[types.NodeID]*TxMempool, numNodes), kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), @@ -75,7 +75,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.mempoolChannels[nodeID], nil } diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index e33e7faa7..394656632 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "errors" "fmt" "sync" @@ -37,6 +38,16 @@ type Wrapper interface { Unwrap() (proto.Message, error) } +type Channel interface { + fmt.Stringer + + Err() error + + Send(context.Context, Envelope) error + SendError(context.Context, PeerError) error + Receive(context.Context) *ChannelIterator +} + // PeerError is a peer error reported via Channel.Error. // // FIXME: This currently just disconnects the peer, which is too simplistic. @@ -56,9 +67,9 @@ type PeerError struct { func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) } func (pe PeerError) Unwrap() error { return pe.Err } -// Channel is a bidirectional channel to exchange Protobuf messages with peers. +// legacyChannel is a bidirectional channel to exchange Protobuf messages with peers. // Each message is wrapped in an Envelope to specify its sender and receiver. -type Channel struct { +type legacyChannel struct { ID ChannelID inCh <-chan Envelope // inbound messages (peers to reactors) outCh chan<- Envelope // outbound messages (reactors to peers) @@ -69,9 +80,10 @@ type Channel struct { // NewChannel creates a new channel. It is primarily for internal and test // use, reactors should use Router.OpenChannel(). -func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel { - return &Channel{ +func NewChannel(id ChannelID, name string, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel { + return &legacyChannel{ ID: id, + name: name, inCh: inCh, outCh: outCh, errCh: errCh, @@ -80,7 +92,7 @@ func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh // Send blocks until the envelope has been sent, or until ctx ends. // An error only occurs if the context ends before the send completes. -func (ch *Channel) Send(ctx context.Context, envelope Envelope) error { +func (ch *legacyChannel) Send(ctx context.Context, envelope Envelope) error { select { case <-ctx.Done(): return ctx.Err() @@ -89,9 +101,15 @@ func (ch *Channel) Send(ctx context.Context, envelope Envelope) error { } } +func (ch *legacyChannel) Err() error { return nil } + // SendError blocks until the given error has been sent, or ctx ends. // An error only occurs if the context ends before the send completes. -func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { +func (ch *legacyChannel) SendError(ctx context.Context, pe PeerError) error { + if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) { + return nil + } + select { case <-ctx.Done(): return ctx.Err() @@ -100,18 +118,29 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { } } -func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) } +func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) } // Receive returns a new unbuffered iterator to receive messages from ch. // The iterator runs until ctx ends. -func (ch *Channel) Receive(ctx context.Context) *ChannelIterator { +func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator { iter := &ChannelIterator{ pipe: make(chan Envelope), // unbuffered } - go func() { + go func(pipe chan<- Envelope) { defer close(iter.pipe) - iteratorWorker(ctx, ch, iter.pipe) - }() + for { + select { + case <-ctx.Done(): + return + case envelope := <-ch.inCh: + select { + case <-ctx.Done(): + return + case pipe <- envelope: + } + } + } + }(iter.pipe) return iter } @@ -126,21 +155,6 @@ type ChannelIterator struct { current *Envelope } -func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) { - for { - select { - case <-ctx.Done(): - return - case envelope := <-ch.inCh: - select { - case <-ctx.Done(): - return - case pipe <- envelope: - } - } - } -} - // Next returns true when the Envelope value has advanced, and false // when the context is canceled or iteration should stop. If an iterator has returned false, // it will never return true again. @@ -179,7 +193,7 @@ func (iter *ChannelIterator) Envelope() *Envelope { return iter.current } // // This allows the caller to consume messages from multiple channels // without needing to manage the concurrency separately. -func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator { +func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator { iter := &ChannelIterator{ pipe: make(chan Envelope), // unbuffered } @@ -187,10 +201,17 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato for _, ch := range chs { wg.Add(1) - go func(ch *Channel) { + go func(ch Channel, pipe chan<- Envelope) { defer wg.Done() - iteratorWorker(ctx, ch, iter.pipe) - }(ch) + iter := ch.Receive(ctx) + for iter.Next(ctx) { + select { + case <-ctx.Done(): + return + case pipe <- *iter.Envelope(): + } + } + }(ch, iter.pipe) } done := make(chan struct{}) diff --git a/internal/p2p/channel_test.go b/internal/p2p/channel_test.go index e06e3e77e..eeaf77db2 100644 --- a/internal/p2p/channel_test.go +++ b/internal/p2p/channel_test.go @@ -16,13 +16,13 @@ type channelInternal struct { Error chan PeerError } -func testChannel(size int) (*channelInternal, *Channel) { +func testChannel(size int) (*channelInternal, *legacyChannel) { in := &channelInternal{ In: make(chan Envelope, size), Out: make(chan Envelope, size), Error: make(chan PeerError, size), } - ch := &Channel{ + ch := &legacyChannel{ inCh: in.In, outCh: in.Out, errCh: in.Error, diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 813344915..95c040b57 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -146,8 +146,8 @@ func (n *Network) MakeChannels( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) map[types.NodeID]*p2p.Channel { - channels := map[types.NodeID]*p2p.Channel{} +) map[types.NodeID]p2p.Channel { + channels := map[types.NodeID]p2p.Channel{} for _, node := range n.Nodes { channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc) } @@ -161,8 +161,8 @@ func (n *Network) MakeChannelsNoCleanup( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) map[types.NodeID]*p2p.Channel { - channels := map[types.NodeID]*p2p.Channel{} +) map[types.NodeID]p2p.Channel { + channels := map[types.NodeID]p2p.Channel{} for _, node := range n.Nodes { channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc) } @@ -304,7 +304,7 @@ func (n *Node) MakeChannel( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) *p2p.Channel { +) p2p.Channel { ctx, cancel := context.WithCancel(ctx) channel, err := n.Router.OpenChannel(ctx, chDesc) require.NoError(t, err) @@ -321,7 +321,7 @@ func (n *Node) MakeChannelNoCleanup( ctx context.Context, t *testing.T, chDesc *p2p.ChannelDescriptor, -) *p2p.Channel { +) p2p.Channel { channel, err := n.Router.OpenChannel(ctx, chDesc) require.NoError(t, err) return channel diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index 885e080d4..276bff390 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -15,7 +15,7 @@ import ( ) // RequireEmpty requires that the given channel is empty. -func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) { +func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) { t.Helper() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) @@ -32,7 +32,7 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) { } // RequireReceive requires that the given envelope is received on the channel. -func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) { +func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) { t.Helper() ctx, cancel := context.WithTimeout(ctx, time.Second) @@ -54,7 +54,7 @@ func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, exp // RequireReceiveUnordered requires that the given envelopes are all received on // the channel, ignoring order. -func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) { +func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -75,7 +75,7 @@ func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Cha } // RequireSend requires that the given envelope is sent on the channel. -func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) { +func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) { tctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -93,7 +93,7 @@ func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelo func RequireSendReceive( ctx context.Context, t *testing.T, - channel *p2p.Channel, + channel p2p.Channel, peerID types.NodeID, send proto.Message, receive proto.Message, @@ -116,7 +116,7 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp } // RequireError requires that the given peer error is submitted for a peer. -func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) { +func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) { tctx, tcancel := context.WithTimeout(ctx, time.Second) defer tcancel() diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index bd4737326..87677799d 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -145,7 +145,7 @@ func (r *Reactor) OnStop() {} // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. -func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { +func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) { incoming := make(chan *p2p.Envelope) go func() { defer close(incoming) @@ -192,8 +192,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { // A request from another peer, or a response to one of our requests. dur, err := r.handlePexMessage(ctx, envelope, pexCh) if err != nil { - r.logger.Error("failed to process message", - "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -225,7 +224,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU // handlePexMessage handles envelopes sent from peers on the PexChannel. // If an update was received, a new polling interval is returned; otherwise the // duration is 0. -func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) { +func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -308,7 +307,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // that peer a request for more peer addresses. The chosen peer is moved into // the requestsSent bucket so that we will not attempt to contact them again // until they've replied or updated. -func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error { +func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error { r.mtx.Lock() defer r.mtx.Unlock() if len(r.availablePeers) == 0 { diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index ec2f03d83..07f49f0d6 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -275,7 +275,7 @@ type singleTestReactor struct { pexInCh chan p2p.Envelope pexOutCh chan p2p.Envelope pexErrCh chan p2p.PeerError - pexCh *p2p.Channel + pexCh p2p.Channel peerCh chan p2p.PeerUpdate manager *p2p.PeerManager } @@ -287,8 +287,11 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { pexInCh := make(chan p2p.Envelope, chBuf) pexOutCh := make(chan p2p.Envelope, chBuf) pexErrCh := make(chan p2p.PeerError, chBuf) + + chDesc := pex.ChannelDescriptor() pexCh := p2p.NewChannel( - p2p.ChannelID(pex.PexChannel), + chDesc.ID, + chDesc.Name, pexInCh, pexOutCh, pexErrCh, @@ -299,7 +302,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return pexCh, nil } @@ -324,7 +327,7 @@ type reactorTestSuite struct { logger log.Logger reactors map[types.NodeID]*pex.Reactor - pexChannels map[types.NodeID]*p2p.Channel + pexChannels map[types.NodeID]p2p.Channel peerChans map[types.NodeID]chan p2p.PeerUpdate peerUpdates map[types.NodeID]*p2p.PeerUpdates @@ -367,7 +370,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT logger: log.NewNopLogger().With("testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, networkOpts), reactors: make(map[types.NodeID]*pex.Reactor, realNodes), - pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes), + pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, opts.TotalNodes), total: opts.TotalNodes, @@ -388,7 +391,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return rts.pexChannels[nodeID], nil } @@ -448,7 +451,7 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID]) - chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return r.pexChannels[nodeID], nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 0e55049c1..4f3af1346 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -239,7 +239,7 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error // ChannelCreator allows routers to construct their own channels, // either by receiving a reference to Router.OpenChannel or using some // kind shim for testing purposes. -type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error) +type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error) // OpenChannel opens a new channel for the given message type. The caller must // close the channel when done, before stopping the Router. messageType is the @@ -247,7 +247,7 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error) // implement Wrapper to automatically (un)wrap multiple message types in a // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. -func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) { +func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error) { r.channelMtx.Lock() defer r.channelMtx.Unlock() @@ -262,11 +262,10 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C queue := r.queueFactory(chDesc.RecvBufferCapacity) outCh := make(chan Envelope, chDesc.RecvBufferCapacity) errCh := make(chan PeerError, chDesc.RecvBufferCapacity) - channel := NewChannel(id, queue.dequeue(), outCh, errCh) - channel.name = chDesc.Name + channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh) var wrapper Wrapper - if w, ok := messageType.(Wrapper); ok { + if w, ok := chDesc.MessageType.(Wrapper); ok { wrapper = w } @@ -287,7 +286,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C queue.close() }() - r.routeChannel(ctx, id, outCh, errCh, wrapper) + r.routeChannel(ctx, chDesc.ID, outCh, errCh, wrapper) }() return channel, nil diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 92f56f768..dd336510c 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -26,7 +26,7 @@ import ( "github.com/tendermint/tendermint/types" ) -func echoReactor(ctx context.Context, channel *p2p.Channel) { +func echoReactor(ctx context.Context, channel p2p.Channel) { iter := channel.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 9cdb34978..e7ad73148 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -26,14 +26,14 @@ var ( // NOTE: It is not the responsibility of the dispatcher to verify the light blocks. type Dispatcher struct { // the channel with which to send light block requests on - requestCh *p2p.Channel + requestCh p2p.Channel mtx sync.Mutex // all pending calls that have been dispatched and are awaiting an answer calls map[types.NodeID]chan *types.LightBlock } -func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher { +func NewDispatcher(requestChannel p2p.Channel) *Dispatcher { return &Dispatcher{ requestCh: requestChannel, calls: make(map[types.NodeID]chan *types.LightBlock), diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 8ec074bd1..8f6783e67 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -24,13 +24,13 @@ type channelInternal struct { Error chan p2p.PeerError } -func testChannel(size int) (*channelInternal, *p2p.Channel) { +func testChannel(size int) (*channelInternal, p2p.Channel) { in := &channelInternal{ In: make(chan p2p.Envelope, size), Out: make(chan p2p.Envelope, size), Error: make(chan p2p.PeerError, size), } - return in, p2p.NewChannel(0, in.In, in.Out, in.Error) + return in, p2p.NewChannel(0, "test", in.In, in.Out, in.Error) } func TestDispatcherBasic(t *testing.T) { diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index f4d72d017..deed8d0d3 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -305,7 +305,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { return nil } - go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{ + go r.processChannels(ctx, map[p2p.ChannelID]p2p.Channel{ SnapshotChannel: snapshotCh, ChunkChannel: chunkCh, LightBlockChannel: blockCh, @@ -611,7 +611,7 @@ func (r *Reactor) backfill( // handleSnapshotMessage handles envelopes sent from peers on the // SnapshotChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. -func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error { +func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh p2p.Channel) error { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -683,7 +683,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel // handleChunkMessage handles envelopes sent from peers on the ChunkChannel. // It returns an error only if the Envelope.Message is unknown for this channel. // This should never be called outside of handleMessage. -func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error { +func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.ChunkRequest: r.logger.Debug( @@ -772,7 +772,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope return nil } -func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error { +func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.LightBlockRequest: r.logger.Info("received light block request", "height", msg.Height) @@ -829,7 +829,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env return nil } -func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error { +func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh p2p.Channel) error { switch msg := envelope.Message.(type) { case *ssproto.ParamsRequest: r.logger.Debug("received consensus params request", "height", msg.Height) @@ -878,7 +878,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -912,12 +912,12 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha // encountered during message execution will result in a PeerError being sent on // the respective channel. When the reactor is stopped, we will catch the signal // and close the p2p Channel gracefully. -func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) { - // make sure that the iterator gets cleaned up in case of error +func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]p2p.Channel) { + // make sure tht the iterator gets cleaned up in case of error ctx, cancel := context.WithCancel(ctx) defer cancel() - chs := make([]*p2p.Channel, 0, len(chanTable)) + chs := make([]p2p.Channel, 0, len(chanTable)) for key := range chanTable { chs = append(chs, chanTable[key]) } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index f57e228a7..b81c1ac2c 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -40,22 +40,22 @@ type reactorTestSuite struct { conn *clientmocks.Client stateProvider *mocks.StateProvider - snapshotChannel *p2p.Channel + snapshotChannel p2p.Channel snapshotInCh chan p2p.Envelope snapshotOutCh chan p2p.Envelope snapshotPeerErrCh chan p2p.PeerError - chunkChannel *p2p.Channel + chunkChannel p2p.Channel chunkInCh chan p2p.Envelope chunkOutCh chan p2p.Envelope chunkPeerErrCh chan p2p.PeerError - blockChannel *p2p.Channel + blockChannel p2p.Channel blockInCh chan p2p.Envelope blockOutCh chan p2p.Envelope blockPeerErrCh chan p2p.PeerError - paramsChannel *p2p.Channel + paramsChannel p2p.Channel paramsInCh chan p2p.Envelope paramsOutCh chan p2p.Envelope paramsPeerErrCh chan p2p.PeerError @@ -102,6 +102,7 @@ func setup( rts.snapshotChannel = p2p.NewChannel( SnapshotChannel, + "snapshot", rts.snapshotInCh, rts.snapshotOutCh, rts.snapshotPeerErrCh, @@ -109,6 +110,7 @@ func setup( rts.chunkChannel = p2p.NewChannel( ChunkChannel, + "chunk", rts.chunkInCh, rts.chunkOutCh, rts.chunkPeerErrCh, @@ -116,6 +118,7 @@ func setup( rts.blockChannel = p2p.NewChannel( LightBlockChannel, + "lightblock", rts.blockInCh, rts.blockOutCh, rts.blockPeerErrCh, @@ -123,6 +126,7 @@ func setup( rts.paramsChannel = p2p.NewChannel( ParamsChannel, + "params", rts.paramsInCh, rts.paramsOutCh, rts.paramsPeerErrCh, @@ -133,7 +137,7 @@ func setup( cfg := config.DefaultStateSyncConfig() - chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) { switch desc.ID { case SnapshotChannel: return rts.snapshotChannel, nil diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index a796b0b2e..a8110b71b 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -208,7 +208,7 @@ type stateProviderP2P struct { sync.Mutex // light.Client is not concurrency-safe lc *light.Client initialHeight int64 - paramsSendCh *p2p.Channel + paramsSendCh p2p.Channel paramsRecvCh chan types.ConsensusParams } @@ -220,7 +220,7 @@ func NewP2PStateProvider( initialHeight int64, providers []lightprovider.Provider, trustOptions light.TrustOptions, - paramsSendCh *p2p.Channel, + paramsSendCh p2p.Channel, logger log.Logger, ) (StateProvider, error) { if len(providers) < 2 { diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 47e058c19..a09b55892 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -56,8 +56,8 @@ type syncer struct { stateProvider StateProvider conn abciclient.Client snapshots *snapshotPool - snapshotCh *p2p.Channel - chunkCh *p2p.Channel + snapshotCh p2p.Channel + chunkCh p2p.Channel tempDir string fetchers int32 retryTimeout time.Duration