mirror of
https://github.com/tendermint/tendermint.git
synced 2026-06-08 15:22:43 +00:00
p2p: make p2p.Channel an interface (#8446)
This commit is contained in:
@@ -135,7 +135,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
|
||||
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (p2p.Channel, error) { return blockSyncCh, nil }
|
||||
|
||||
state, err := r.stateStore.Load()
|
||||
if err != nil {
|
||||
@@ -183,7 +183,7 @@ func (r *Reactor) OnStop() {
|
||||
|
||||
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
|
||||
// Otherwise, we'll respond saying we do not have it.
|
||||
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
|
||||
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh p2p.Channel) error {
|
||||
block := r.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
blockProto, err := block.ToProto()
|
||||
@@ -209,7 +209,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -271,7 +271,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
|
||||
// message execution will result in a PeerError being sent on the BlockSyncChannel.
|
||||
// When the reactor is stopped, we will catch the signal and close the p2p Channel
|
||||
// gracefully.
|
||||
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh p2p.Channel) {
|
||||
iter := blockSyncCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -292,7 +292,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
|
||||
}
|
||||
|
||||
// processPeerUpdate processes a PeerUpdate.
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
|
||||
@@ -327,7 +327,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -369,7 +369,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh p2p.Channel) {
|
||||
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
|
||||
defer statusUpdateTicker.Stop()
|
||||
|
||||
@@ -411,7 +411,7 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel)
|
||||
// do.
|
||||
//
|
||||
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
|
||||
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh p2p.Channel) {
|
||||
var (
|
||||
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
|
||||
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
|
||||
|
||||
@@ -37,7 +37,7 @@ type reactorTestSuite struct {
|
||||
reactors map[types.NodeID]*Reactor
|
||||
app map[types.NodeID]abciclient.Client
|
||||
|
||||
blockSyncChannels map[types.NodeID]*p2p.Channel
|
||||
blockSyncChannels map[types.NodeID]p2p.Channel
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
|
||||
@@ -66,7 +66,7 @@ func setup(
|
||||
nodes: make([]types.NodeID, 0, numNodes),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
app: make(map[types.NodeID]abciclient.Client, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
blockSync: true,
|
||||
@@ -186,7 +186,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.blockSyncChannels[nodeID], nil
|
||||
}
|
||||
rts.reactors[nodeID] = NewReactor(
|
||||
|
||||
@@ -107,7 +107,7 @@ func invalidDoPrevoteFunc(
|
||||
round int32,
|
||||
cs *State,
|
||||
r *Reactor,
|
||||
voteCh *p2p.Channel,
|
||||
voteCh p2p.Channel,
|
||||
pv types.PrivValidator,
|
||||
) {
|
||||
// routine to:
|
||||
|
||||
@@ -165,10 +165,10 @@ func NewReactor(
|
||||
}
|
||||
|
||||
type channelBundle struct {
|
||||
state *p2p.Channel
|
||||
data *p2p.Channel
|
||||
vote *p2p.Channel
|
||||
votSet *p2p.Channel
|
||||
state p2p.Channel
|
||||
data p2p.Channel
|
||||
vote p2p.Channel
|
||||
votSet p2p.Channel
|
||||
}
|
||||
|
||||
// OnStart starts separate go routines for each p2p Channel and listens for
|
||||
@@ -308,14 +308,14 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
|
||||
return ps, ok
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: makeRoundStepMessage(rs),
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
|
||||
psHeader := rs.ProposalBlockParts.Header()
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
@@ -329,7 +329,7 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: &tmcons.HasVote{
|
||||
@@ -344,7 +344,7 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote,
|
||||
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
|
||||
// internal pubsub defined in the consensus state to broadcast them to peers
|
||||
// upon receiving.
|
||||
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) {
|
||||
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh p2p.Channel) {
|
||||
onStopCh := r.state.getOnStopCh()
|
||||
|
||||
err := r.state.evsw.AddListenerForEvent(
|
||||
@@ -401,7 +401,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
To: peerID,
|
||||
Message: makeRoundStepMessage(r.getRoundState()),
|
||||
@@ -431,7 +431,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
|
||||
return r.rs
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh p2p.Channel) {
|
||||
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
|
||||
|
||||
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
|
||||
@@ -495,7 +495,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
|
||||
time.Sleep(r.state.config.PeerGossipSleepDuration)
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh p2p.Channel) {
|
||||
logger := r.logger.With("peer", ps.peerID)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
@@ -652,7 +652,7 @@ OUTER_LOOP:
|
||||
|
||||
// pickSendVote picks a vote and sends it to the peer. It will return true if
|
||||
// there is a vote to send and false otherwise.
|
||||
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
|
||||
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh p2p.Channel) (bool, error) {
|
||||
vote, ok := ps.PickVoteToSend(votes)
|
||||
if !ok {
|
||||
return false, nil
|
||||
@@ -680,7 +680,7 @@ func (r *Reactor) gossipVotesForHeight(
|
||||
rs *cstypes.RoundState,
|
||||
prs *cstypes.PeerRoundState,
|
||||
ps *PeerState,
|
||||
voteCh *p2p.Channel,
|
||||
voteCh p2p.Channel,
|
||||
) (bool, error) {
|
||||
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
|
||||
|
||||
@@ -752,7 +752,7 @@ func (r *Reactor) gossipVotesForHeight(
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh p2p.Channel) {
|
||||
logger := r.logger.With("peer", ps.peerID)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
@@ -817,7 +817,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
|
||||
|
||||
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
|
||||
// into play for liveness when there's a signature DDoS attack happening.
|
||||
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
|
||||
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh p2p.Channel) {
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
@@ -1028,7 +1028,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// If we fail to find the peer state for the envelope sender, we perform a no-op
|
||||
// and return. This can happen when we process the envelope after the peer is
|
||||
// removed.
|
||||
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error {
|
||||
ps, ok := r.GetPeerState(envelope.From)
|
||||
if !ok || ps == nil {
|
||||
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
|
||||
|
||||
@@ -45,10 +45,10 @@ type reactorTestSuite struct {
|
||||
reactors map[types.NodeID]*Reactor
|
||||
subs map[types.NodeID]eventbus.Subscription
|
||||
blocksyncSubs map[types.NodeID]eventbus.Subscription
|
||||
stateChannels map[types.NodeID]*p2p.Channel
|
||||
dataChannels map[types.NodeID]*p2p.Channel
|
||||
voteChannels map[types.NodeID]*p2p.Channel
|
||||
voteSetBitsChannels map[types.NodeID]*p2p.Channel
|
||||
stateChannels map[types.NodeID]p2p.Channel
|
||||
dataChannels map[types.NodeID]p2p.Channel
|
||||
voteChannels map[types.NodeID]p2p.Channel
|
||||
voteSetBitsChannels map[types.NodeID]p2p.Channel
|
||||
}
|
||||
|
||||
func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
|
||||
@@ -85,7 +85,7 @@ func setup(
|
||||
t.Cleanup(cancel)
|
||||
|
||||
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
|
||||
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
switch desc.ID {
|
||||
case StateChannel:
|
||||
return rts.stateChannels[nodeID], nil
|
||||
|
||||
@@ -159,7 +159,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
|
||||
|
||||
// processEvidenceCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the evidenceCh.
|
||||
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) {
|
||||
iter := evidenceCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -186,7 +186,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel
|
||||
// connects/disconnects frequently from the broadcasting peer(s).
|
||||
//
|
||||
// REF: https://github.com/tendermint/tendermint/issues/4727
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.Lock()
|
||||
@@ -227,7 +227,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case peerUpdate := <-peerUpdates.Updates():
|
||||
@@ -249,7 +249,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
// that the peer has already received or may not be ready for.
|
||||
//
|
||||
// REF: https://github.com/tendermint/tendermint/issues/4727
|
||||
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) {
|
||||
var next *clist.CElement
|
||||
|
||||
defer func() {
|
||||
|
||||
@@ -38,7 +38,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
reactors map[types.NodeID]*evidence.Reactor
|
||||
pools map[types.NodeID]*evidence.Pool
|
||||
evidenceChannels map[types.NodeID]*p2p.Channel
|
||||
evidenceChannels map[types.NodeID]p2p.Channel
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
nodes []*p2ptest.Node
|
||||
@@ -96,7 +96,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
|
||||
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.evidenceChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
|
||||
|
||||
// processMempoolCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the mempoolCh.
|
||||
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh p2p.Channel) {
|
||||
iter := mempoolCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -217,7 +217,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel)
|
||||
// goroutine or not. If not, we start one for the newly added peer. For down or
|
||||
// removed peers, we remove the peer from the mempool peer ID set and signal to
|
||||
// stop the tx broadcasting goroutine.
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.Lock()
|
||||
@@ -266,7 +266,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -277,7 +277,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh p2p.Channel) {
|
||||
peerMempoolID := r.ids.GetForPeer(peerID)
|
||||
var nextGossipTx *clist.CElement
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
|
||||
reactors map[types.NodeID]*Reactor
|
||||
mempoolChannels map[types.NodeID]*p2p.Channel
|
||||
mempoolChannels map[types.NodeID]p2p.Channel
|
||||
mempools map[types.NodeID]*TxMempool
|
||||
kvstores map[types.NodeID]*kvstore.Application
|
||||
|
||||
@@ -51,7 +51,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
||||
logger: log.NewNopLogger().With("testCase", t.Name()),
|
||||
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
||||
mempools: make(map[types.NodeID]*TxMempool, numNodes),
|
||||
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
@@ -75,7 +75,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.mempoolChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,14 @@ type Wrapper interface {
|
||||
Unwrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
type Channel interface {
|
||||
fmt.Stringer
|
||||
|
||||
Send(context.Context, Envelope) error
|
||||
SendError(context.Context, PeerError) error
|
||||
Receive(context.Context) *ChannelIterator
|
||||
}
|
||||
|
||||
// PeerError is a peer error reported via Channel.Error.
|
||||
//
|
||||
// FIXME: This currently just disconnects the peer, which is too simplistic.
|
||||
@@ -51,9 +59,9 @@ type PeerError struct {
|
||||
func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
|
||||
func (pe PeerError) Unwrap() error { return pe.Err }
|
||||
|
||||
// Channel is a bidirectional channel to exchange Protobuf messages with peers.
|
||||
// legacyChannel is a bidirectional channel to exchange Protobuf messages with peers.
|
||||
// Each message is wrapped in an Envelope to specify its sender and receiver.
|
||||
type Channel struct {
|
||||
type legacyChannel struct {
|
||||
ID ChannelID
|
||||
inCh <-chan Envelope // inbound messages (peers to reactors)
|
||||
outCh chan<- Envelope // outbound messages (reactors to peers)
|
||||
@@ -65,15 +73,10 @@ type Channel struct {
|
||||
|
||||
// NewChannel creates a new channel. It is primarily for internal and test
|
||||
// use, reactors should use Router.OpenChannel().
|
||||
func NewChannel(
|
||||
id ChannelID,
|
||||
messageType proto.Message,
|
||||
inCh <-chan Envelope,
|
||||
outCh chan<- Envelope,
|
||||
errCh chan<- PeerError,
|
||||
) *Channel {
|
||||
return &Channel{
|
||||
func NewChannel(id ChannelID, name string, messageType proto.Message, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel {
|
||||
return &legacyChannel{
|
||||
ID: id,
|
||||
name: name,
|
||||
messageType: messageType,
|
||||
inCh: inCh,
|
||||
outCh: outCh,
|
||||
@@ -83,7 +86,7 @@ func NewChannel(
|
||||
|
||||
// Send blocks until the envelope has been sent, or until ctx ends.
|
||||
// An error only occurs if the context ends before the send completes.
|
||||
func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
|
||||
func (ch *legacyChannel) Send(ctx context.Context, envelope Envelope) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -94,7 +97,7 @@ func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
|
||||
|
||||
// SendError blocks until the given error has been sent, or ctx ends.
|
||||
// An error only occurs if the context ends before the send completes.
|
||||
func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
|
||||
func (ch *legacyChannel) SendError(ctx context.Context, pe PeerError) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -103,18 +106,29 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
|
||||
func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
|
||||
|
||||
// Receive returns a new unbuffered iterator to receive messages from ch.
|
||||
// The iterator runs until ctx ends.
|
||||
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
|
||||
func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator {
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope), // unbuffered
|
||||
}
|
||||
go func() {
|
||||
go func(pipe chan Envelope) {
|
||||
defer close(iter.pipe)
|
||||
iteratorWorker(ctx, ch, iter.pipe)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case envelope := <-ch.inCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- envelope:
|
||||
}
|
||||
}
|
||||
}
|
||||
}(iter.pipe)
|
||||
return iter
|
||||
}
|
||||
|
||||
@@ -129,21 +143,6 @@ type ChannelIterator struct {
|
||||
current *Envelope
|
||||
}
|
||||
|
||||
func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case envelope := <-ch.inCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- envelope:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns true when the Envelope value has advanced, and false
|
||||
// when the context is canceled or iteration should stop. If an iterator has returned false,
|
||||
// it will never return true again.
|
||||
@@ -182,7 +181,7 @@ func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
|
||||
//
|
||||
// This allows the caller to consume messages from multiple channels
|
||||
// without needing to manage the concurrency separately.
|
||||
func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
|
||||
func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator {
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope), // unbuffered
|
||||
}
|
||||
@@ -190,10 +189,17 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
|
||||
|
||||
for _, ch := range chs {
|
||||
wg.Add(1)
|
||||
go func(ch *Channel) {
|
||||
go func(ch Channel, pipe chan Envelope) {
|
||||
defer wg.Done()
|
||||
iteratorWorker(ctx, ch, iter.pipe)
|
||||
}(ch)
|
||||
iter := ch.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- *iter.Envelope():
|
||||
}
|
||||
}
|
||||
}(ch, iter.pipe)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
@@ -16,13 +16,13 @@ type channelInternal struct {
|
||||
Error chan PeerError
|
||||
}
|
||||
|
||||
func testChannel(size int) (*channelInternal, *Channel) {
|
||||
func testChannel(size int) (*channelInternal, *legacyChannel) {
|
||||
in := &channelInternal{
|
||||
In: make(chan Envelope, size),
|
||||
Out: make(chan Envelope, size),
|
||||
Error: make(chan PeerError, size),
|
||||
}
|
||||
ch := &Channel{
|
||||
ch := &legacyChannel{
|
||||
inCh: in.In,
|
||||
outCh: in.Out,
|
||||
errCh: in.Error,
|
||||
|
||||
@@ -146,8 +146,8 @@ func (n *Network) MakeChannels(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) map[types.NodeID]*p2p.Channel {
|
||||
channels := map[types.NodeID]*p2p.Channel{}
|
||||
) map[types.NodeID]p2p.Channel {
|
||||
channels := map[types.NodeID]p2p.Channel{}
|
||||
for _, node := range n.Nodes {
|
||||
channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc)
|
||||
}
|
||||
@@ -161,8 +161,8 @@ func (n *Network) MakeChannelsNoCleanup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) map[types.NodeID]*p2p.Channel {
|
||||
channels := map[types.NodeID]*p2p.Channel{}
|
||||
) map[types.NodeID]p2p.Channel {
|
||||
channels := map[types.NodeID]p2p.Channel{}
|
||||
for _, node := range n.Nodes {
|
||||
channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc)
|
||||
}
|
||||
@@ -304,7 +304,7 @@ func (n *Node) MakeChannel(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) *p2p.Channel {
|
||||
) p2p.Channel {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
channel, err := n.Router.OpenChannel(ctx, chDesc)
|
||||
require.NoError(t, err)
|
||||
@@ -321,7 +321,7 @@ func (n *Node) MakeChannelNoCleanup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) *p2p.Channel {
|
||||
) p2p.Channel {
|
||||
channel, err := n.Router.OpenChannel(ctx, chDesc)
|
||||
require.NoError(t, err)
|
||||
return channel
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
// RequireEmpty requires that the given channel is empty.
|
||||
func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
|
||||
func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
@@ -32,7 +32,7 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
|
||||
}
|
||||
|
||||
// RequireReceive requires that the given envelope is received on the channel.
|
||||
func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
|
||||
func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
@@ -54,7 +54,7 @@ func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, exp
|
||||
|
||||
// RequireReceiveUnordered requires that the given envelopes are all received on
|
||||
// the channel, ignoring order.
|
||||
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) {
|
||||
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -75,7 +75,7 @@ func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Cha
|
||||
}
|
||||
|
||||
// RequireSend requires that the given envelope is sent on the channel.
|
||||
func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) {
|
||||
func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) {
|
||||
tctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -93,7 +93,7 @@ func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelo
|
||||
func RequireSendReceive(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
channel *p2p.Channel,
|
||||
channel p2p.Channel,
|
||||
peerID types.NodeID,
|
||||
send proto.Message,
|
||||
receive proto.Message,
|
||||
@@ -116,7 +116,7 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp
|
||||
}
|
||||
|
||||
// RequireError requires that the given peer error is submitted for a peer.
|
||||
func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
|
||||
func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) {
|
||||
tctx, tcancel := context.WithTimeout(ctx, time.Second)
|
||||
defer tcancel()
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ func (r *Reactor) OnStop() {}
|
||||
|
||||
// processPexCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the pexCh.
|
||||
func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) {
|
||||
incoming := make(chan *p2p.Envelope)
|
||||
go func() {
|
||||
defer close(incoming)
|
||||
@@ -192,8 +192,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
// A request from another peer, or a response to one of our requests.
|
||||
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to process message",
|
||||
"ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := pexCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
@@ -225,7 +224,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
// handlePexMessage handles envelopes sent from peers on the PexChannel.
|
||||
// If an update was received, a new polling interval is returned; otherwise the
|
||||
// duration is 0.
|
||||
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
|
||||
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) {
|
||||
logger := r.logger.With("peer", envelope.From)
|
||||
|
||||
switch msg := envelope.Message.(type) {
|
||||
@@ -308,7 +307,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
// that peer a request for more peer addresses. The chosen peer is moved into
|
||||
// the requestsSent bucket so that we will not attempt to contact them again
|
||||
// until they've replied or updated.
|
||||
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
|
||||
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
if len(r.availablePeers) == 0 {
|
||||
|
||||
@@ -275,7 +275,7 @@ type singleTestReactor struct {
|
||||
pexInCh chan p2p.Envelope
|
||||
pexOutCh chan p2p.Envelope
|
||||
pexErrCh chan p2p.PeerError
|
||||
pexCh *p2p.Channel
|
||||
pexCh p2p.Channel
|
||||
peerCh chan p2p.PeerUpdate
|
||||
manager *p2p.PeerManager
|
||||
}
|
||||
@@ -287,8 +287,11 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
|
||||
pexInCh := make(chan p2p.Envelope, chBuf)
|
||||
pexOutCh := make(chan p2p.Envelope, chBuf)
|
||||
pexErrCh := make(chan p2p.PeerError, chBuf)
|
||||
|
||||
chDesc := pex.ChannelDescriptor()
|
||||
pexCh := p2p.NewChannel(
|
||||
p2p.ChannelID(pex.PexChannel),
|
||||
chDesc.ID,
|
||||
chDesc.Name,
|
||||
new(p2pproto.PexMessage),
|
||||
pexInCh,
|
||||
pexOutCh,
|
||||
@@ -300,7 +303,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
|
||||
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return pexCh, nil
|
||||
}
|
||||
|
||||
@@ -325,7 +328,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
|
||||
reactors map[types.NodeID]*pex.Reactor
|
||||
pexChannels map[types.NodeID]*p2p.Channel
|
||||
pexChannels map[types.NodeID]p2p.Channel
|
||||
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
@@ -368,7 +371,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
|
||||
logger: log.NewNopLogger().With("testCase", t.Name()),
|
||||
network: p2ptest.MakeNetwork(ctx, t, networkOpts),
|
||||
reactors: make(map[types.NodeID]*pex.Reactor, realNodes),
|
||||
pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes),
|
||||
pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, opts.TotalNodes),
|
||||
total: opts.TotalNodes,
|
||||
@@ -389,7 +392,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.pexChannels[nodeID], nil
|
||||
}
|
||||
|
||||
@@ -449,7 +452,7 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
|
||||
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
|
||||
r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return r.pexChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -294,7 +294,7 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error
|
||||
// ChannelCreator allows routers to construct their own channels,
|
||||
// either by receiving a reference to Router.OpenChannel or using some
|
||||
// kind shim for testing purposes.
|
||||
type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
|
||||
type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error)
|
||||
|
||||
// OpenChannel opens a new channel for the given message type. The caller must
|
||||
// close the channel when done, before stopping the Router. messageType is the
|
||||
@@ -302,7 +302,7 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
|
||||
// implement Wrapper to automatically (un)wrap multiple message types in a
|
||||
// wrapper message. The caller may provide a size to make the channel buffered,
|
||||
// which internally makes the inbound, outbound, and error channel buffered.
|
||||
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) {
|
||||
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error) {
|
||||
r.legacy.channelMtx.Lock()
|
||||
defer r.legacy.channelMtx.Unlock()
|
||||
|
||||
@@ -317,11 +317,10 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
|
||||
queue := r.legacy.queueFactory(chDesc.RecvBufferCapacity)
|
||||
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
|
||||
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
|
||||
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
|
||||
channel.name = chDesc.Name
|
||||
channel := NewChannel(chDesc.ID, chDesc.Name, chDesc.MessageType, queue.dequeue(), outCh, errCh)
|
||||
|
||||
var wrapper Wrapper
|
||||
if w, ok := messageType.(Wrapper); ok {
|
||||
if w, ok := chDesc.MessageType.(Wrapper); ok {
|
||||
wrapper = w
|
||||
}
|
||||
|
||||
@@ -342,7 +341,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
|
||||
queue.close()
|
||||
}()
|
||||
|
||||
r.routeChannel(ctx, id, outCh, errCh, wrapper)
|
||||
r.routeChannel(ctx, chDesc.ID, outCh, errCh, wrapper)
|
||||
}()
|
||||
|
||||
return channel, nil
|
||||
|
||||
@@ -91,7 +91,7 @@ func TestRouterConstruction(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func echoReactor(ctx context.Context, channel *p2p.Channel) {
|
||||
func echoReactor(ctx context.Context, channel p2p.Channel) {
|
||||
iter := channel.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
|
||||
@@ -26,14 +26,14 @@ var (
|
||||
// NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
|
||||
type Dispatcher struct {
|
||||
// the channel with which to send light block requests on
|
||||
requestCh *p2p.Channel
|
||||
requestCh p2p.Channel
|
||||
|
||||
mtx sync.Mutex
|
||||
// all pending calls that have been dispatched and are awaiting an answer
|
||||
calls map[types.NodeID]chan *types.LightBlock
|
||||
}
|
||||
|
||||
func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher {
|
||||
func NewDispatcher(requestChannel p2p.Channel) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
requestCh: requestChannel,
|
||||
calls: make(map[types.NodeID]chan *types.LightBlock),
|
||||
|
||||
@@ -24,13 +24,13 @@ type channelInternal struct {
|
||||
Error chan p2p.PeerError
|
||||
}
|
||||
|
||||
func testChannel(size int) (*channelInternal, *p2p.Channel) {
|
||||
func testChannel(size int) (*channelInternal, p2p.Channel) {
|
||||
in := &channelInternal{
|
||||
In: make(chan p2p.Envelope, size),
|
||||
Out: make(chan p2p.Envelope, size),
|
||||
Error: make(chan p2p.PeerError, size),
|
||||
}
|
||||
return in, p2p.NewChannel(0, nil, in.In, in.Out, in.Error)
|
||||
return in, p2p.NewChannel(0, "test", nil, in.In, in.Out, in.Error)
|
||||
}
|
||||
|
||||
func TestDispatcherBasic(t *testing.T) {
|
||||
|
||||
@@ -305,7 +305,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]p2p.Channel{
|
||||
SnapshotChannel: snapshotCh,
|
||||
ChunkChannel: chunkCh,
|
||||
LightBlockChannel: blockCh,
|
||||
@@ -611,7 +611,7 @@ func (r *Reactor) backfill(
|
||||
// handleSnapshotMessage handles envelopes sent from peers on the
|
||||
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
|
||||
// for this channel. This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh p2p.Channel) error {
|
||||
logger := r.logger.With("peer", envelope.From)
|
||||
|
||||
switch msg := envelope.Message.(type) {
|
||||
@@ -683,7 +683,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
|
||||
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
|
||||
// It returns an error only if the Envelope.Message is unknown for this channel.
|
||||
// This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
r.logger.Debug(
|
||||
@@ -772,7 +772,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.LightBlockRequest:
|
||||
r.logger.Info("received light block request", "height", msg.Height)
|
||||
@@ -829,7 +829,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ParamsRequest:
|
||||
r.logger.Debug("received consensus params request", "height", msg.Height)
|
||||
@@ -878,7 +878,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -912,12 +912,12 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
|
||||
// encountered during message execution will result in a PeerError being sent on
|
||||
// the respective channel. When the reactor is stopped, we will catch the signal
|
||||
// and close the p2p Channel gracefully.
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) {
|
||||
// make sure that the iterator gets cleaned up in case of error
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]p2p.Channel) {
|
||||
// make sure tht the iterator gets cleaned up in case of error
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
chs := make([]*p2p.Channel, 0, len(chanTable))
|
||||
chs := make([]p2p.Channel, 0, len(chanTable))
|
||||
for key := range chanTable {
|
||||
chs = append(chs, chanTable[key])
|
||||
}
|
||||
|
||||
@@ -40,22 +40,22 @@ type reactorTestSuite struct {
|
||||
conn *clientmocks.Client
|
||||
stateProvider *mocks.StateProvider
|
||||
|
||||
snapshotChannel *p2p.Channel
|
||||
snapshotChannel p2p.Channel
|
||||
snapshotInCh chan p2p.Envelope
|
||||
snapshotOutCh chan p2p.Envelope
|
||||
snapshotPeerErrCh chan p2p.PeerError
|
||||
|
||||
chunkChannel *p2p.Channel
|
||||
chunkChannel p2p.Channel
|
||||
chunkInCh chan p2p.Envelope
|
||||
chunkOutCh chan p2p.Envelope
|
||||
chunkPeerErrCh chan p2p.PeerError
|
||||
|
||||
blockChannel *p2p.Channel
|
||||
blockChannel p2p.Channel
|
||||
blockInCh chan p2p.Envelope
|
||||
blockOutCh chan p2p.Envelope
|
||||
blockPeerErrCh chan p2p.PeerError
|
||||
|
||||
paramsChannel *p2p.Channel
|
||||
paramsChannel p2p.Channel
|
||||
paramsInCh chan p2p.Envelope
|
||||
paramsOutCh chan p2p.Envelope
|
||||
paramsPeerErrCh chan p2p.PeerError
|
||||
@@ -102,6 +102,7 @@ func setup(
|
||||
|
||||
rts.snapshotChannel = p2p.NewChannel(
|
||||
SnapshotChannel,
|
||||
"snapshot",
|
||||
new(ssproto.Message),
|
||||
rts.snapshotInCh,
|
||||
rts.snapshotOutCh,
|
||||
@@ -110,6 +111,7 @@ func setup(
|
||||
|
||||
rts.chunkChannel = p2p.NewChannel(
|
||||
ChunkChannel,
|
||||
"chunk",
|
||||
new(ssproto.Message),
|
||||
rts.chunkInCh,
|
||||
rts.chunkOutCh,
|
||||
@@ -118,6 +120,7 @@ func setup(
|
||||
|
||||
rts.blockChannel = p2p.NewChannel(
|
||||
LightBlockChannel,
|
||||
"lightblock",
|
||||
new(ssproto.Message),
|
||||
rts.blockInCh,
|
||||
rts.blockOutCh,
|
||||
@@ -126,6 +129,7 @@ func setup(
|
||||
|
||||
rts.paramsChannel = p2p.NewChannel(
|
||||
ParamsChannel,
|
||||
"params",
|
||||
new(ssproto.Message),
|
||||
rts.paramsInCh,
|
||||
rts.paramsOutCh,
|
||||
@@ -137,7 +141,7 @@ func setup(
|
||||
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
|
||||
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
switch desc.ID {
|
||||
case SnapshotChannel:
|
||||
return rts.snapshotChannel, nil
|
||||
|
||||
@@ -208,7 +208,7 @@ type stateProviderP2P struct {
|
||||
sync.Mutex // light.Client is not concurrency-safe
|
||||
lc *light.Client
|
||||
initialHeight int64
|
||||
paramsSendCh *p2p.Channel
|
||||
paramsSendCh p2p.Channel
|
||||
paramsRecvCh chan types.ConsensusParams
|
||||
}
|
||||
|
||||
@@ -220,7 +220,7 @@ func NewP2PStateProvider(
|
||||
initialHeight int64,
|
||||
providers []lightprovider.Provider,
|
||||
trustOptions light.TrustOptions,
|
||||
paramsSendCh *p2p.Channel,
|
||||
paramsSendCh p2p.Channel,
|
||||
logger log.Logger,
|
||||
) (StateProvider, error) {
|
||||
if len(providers) < 2 {
|
||||
|
||||
@@ -56,8 +56,8 @@ type syncer struct {
|
||||
stateProvider StateProvider
|
||||
conn abciclient.Client
|
||||
snapshots *snapshotPool
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
snapshotCh p2p.Channel
|
||||
chunkCh p2p.Channel
|
||||
tempDir string
|
||||
fetchers int32
|
||||
retryTimeout time.Duration
|
||||
|
||||
Reference in New Issue
Block a user