mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-19 23:30:38 +00:00
p2p: make p2p.Channel an interface (#8967)
This is (#8446) pulled from the `main/libp2p` branch but without any of the libp2p content, and is perhaps the easiest first step to enable pluggability at the peer layer, and makes it possible hoist shims (including for, say 0.34) into tendermint without touching the reactors.
This commit is contained in:
@@ -135,7 +135,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
|
||||
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (p2p.Channel, error) { return blockSyncCh, nil }
|
||||
|
||||
state, err := r.stateStore.Load()
|
||||
if err != nil {
|
||||
@@ -183,7 +183,7 @@ func (r *Reactor) OnStop() {
|
||||
|
||||
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
|
||||
// Otherwise, we'll respond saying we do not have it.
|
||||
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
|
||||
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh p2p.Channel) error {
|
||||
block := r.store.LoadBlock(msg.Height)
|
||||
if block == nil {
|
||||
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
|
||||
@@ -223,7 +223,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -298,7 +298,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
|
||||
// message execution will result in a PeerError being sent on the BlockSyncChannel.
|
||||
// When the reactor is stopped, we will catch the signal and close the p2p Channel
|
||||
// gracefully.
|
||||
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh p2p.Channel) {
|
||||
iter := blockSyncCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -319,7 +319,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
|
||||
}
|
||||
|
||||
// processPeerUpdate processes a PeerUpdate.
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
|
||||
@@ -354,7 +354,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -396,7 +396,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh p2p.Channel) {
|
||||
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
|
||||
defer statusUpdateTicker.Stop()
|
||||
|
||||
@@ -438,7 +438,7 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel)
|
||||
// do.
|
||||
//
|
||||
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
|
||||
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh p2p.Channel) {
|
||||
var (
|
||||
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
|
||||
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
|
||||
|
||||
@@ -37,7 +37,7 @@ type reactorTestSuite struct {
|
||||
reactors map[types.NodeID]*Reactor
|
||||
app map[types.NodeID]abciclient.Client
|
||||
|
||||
blockSyncChannels map[types.NodeID]*p2p.Channel
|
||||
blockSyncChannels map[types.NodeID]p2p.Channel
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func setup(
|
||||
nodes: make([]types.NodeID, 0, numNodes),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
app: make(map[types.NodeID]abciclient.Client, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
}
|
||||
@@ -177,7 +177,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.blockSyncChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ func invalidDoPrevoteFunc(
|
||||
round int32,
|
||||
cs *State,
|
||||
r *Reactor,
|
||||
voteCh *p2p.Channel,
|
||||
voteCh p2p.Channel,
|
||||
pv types.PrivValidator,
|
||||
) {
|
||||
// routine to:
|
||||
|
||||
@@ -165,10 +165,10 @@ func NewReactor(
|
||||
}
|
||||
|
||||
type channelBundle struct {
|
||||
state *p2p.Channel
|
||||
data *p2p.Channel
|
||||
vote *p2p.Channel
|
||||
votSet *p2p.Channel
|
||||
state p2p.Channel
|
||||
data p2p.Channel
|
||||
vote p2p.Channel
|
||||
votSet p2p.Channel
|
||||
}
|
||||
|
||||
// OnStart starts separate go routines for each p2p Channel and listens for
|
||||
@@ -310,14 +310,14 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
|
||||
return ps, ok
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: makeRoundStepMessage(rs),
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
|
||||
psHeader := rs.ProposalBlockParts.Header()
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
@@ -331,7 +331,7 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: &tmcons.HasVote{
|
||||
@@ -346,7 +346,7 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote,
|
||||
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
|
||||
// internal pubsub defined in the consensus state to broadcast them to peers
|
||||
// upon receiving.
|
||||
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) {
|
||||
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh p2p.Channel) {
|
||||
onStopCh := r.state.getOnStopCh()
|
||||
|
||||
err := r.state.evsw.AddListenerForEvent(
|
||||
@@ -403,7 +403,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
To: peerID,
|
||||
Message: makeRoundStepMessage(r.getRoundState()),
|
||||
@@ -433,7 +433,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
|
||||
return r.rs
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh p2p.Channel) {
|
||||
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
|
||||
|
||||
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
|
||||
@@ -497,7 +497,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
|
||||
time.Sleep(r.state.config.PeerGossipSleepDuration)
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh p2p.Channel) {
|
||||
logger := r.logger.With("peer", ps.peerID)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
@@ -632,7 +632,7 @@ OUTER_LOOP:
|
||||
|
||||
// pickSendVote picks a vote and sends it to the peer. It will return true if
|
||||
// there is a vote to send and false otherwise.
|
||||
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
|
||||
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh p2p.Channel) (bool, error) {
|
||||
vote, ok := ps.PickVoteToSend(votes)
|
||||
if !ok {
|
||||
return false, nil
|
||||
@@ -660,7 +660,7 @@ func (r *Reactor) gossipVotesForHeight(
|
||||
rs *cstypes.RoundState,
|
||||
prs *cstypes.PeerRoundState,
|
||||
ps *PeerState,
|
||||
voteCh *p2p.Channel,
|
||||
voteCh p2p.Channel,
|
||||
) (bool, error) {
|
||||
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
|
||||
|
||||
@@ -732,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight(
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh p2p.Channel) {
|
||||
logger := r.logger.With("peer", ps.peerID)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
@@ -804,7 +804,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
|
||||
|
||||
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
|
||||
// into play for liveness when there's a signature DDoS attack happening.
|
||||
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
|
||||
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh p2p.Channel) {
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
@@ -1015,7 +1015,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// If we fail to find the peer state for the envelope sender, we perform a no-op
|
||||
// and return. This can happen when we process the envelope after the peer is
|
||||
// removed.
|
||||
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error {
|
||||
ps, ok := r.GetPeerState(envelope.From)
|
||||
if !ok || ps == nil {
|
||||
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
|
||||
|
||||
@@ -46,10 +46,10 @@ type reactorTestSuite struct {
|
||||
reactors map[types.NodeID]*Reactor
|
||||
subs map[types.NodeID]eventbus.Subscription
|
||||
blocksyncSubs map[types.NodeID]eventbus.Subscription
|
||||
stateChannels map[types.NodeID]*p2p.Channel
|
||||
dataChannels map[types.NodeID]*p2p.Channel
|
||||
voteChannels map[types.NodeID]*p2p.Channel
|
||||
voteSetBitsChannels map[types.NodeID]*p2p.Channel
|
||||
stateChannels map[types.NodeID]p2p.Channel
|
||||
dataChannels map[types.NodeID]p2p.Channel
|
||||
voteChannels map[types.NodeID]p2p.Channel
|
||||
voteSetBitsChannels map[types.NodeID]p2p.Channel
|
||||
}
|
||||
|
||||
func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
|
||||
@@ -86,7 +86,7 @@ func setup(
|
||||
t.Cleanup(cancel)
|
||||
|
||||
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
|
||||
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
switch desc.ID {
|
||||
case StateChannel:
|
||||
return rts.stateChannels[nodeID], nil
|
||||
|
||||
@@ -159,7 +159,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
|
||||
|
||||
// processEvidenceCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the evidenceCh.
|
||||
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) {
|
||||
iter := evidenceCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -186,7 +186,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel
|
||||
// connects/disconnects frequently from the broadcasting peer(s).
|
||||
//
|
||||
// REF: https://github.com/tendermint/tendermint/issues/4727
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.Lock()
|
||||
@@ -227,7 +227,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case peerUpdate := <-peerUpdates.Updates():
|
||||
@@ -249,7 +249,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
// that the peer has already received or may not be ready for.
|
||||
//
|
||||
// REF: https://github.com/tendermint/tendermint/issues/4727
|
||||
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) {
|
||||
var next *clist.CElement
|
||||
|
||||
defer func() {
|
||||
|
||||
@@ -38,7 +38,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
reactors map[types.NodeID]*evidence.Reactor
|
||||
pools map[types.NodeID]*evidence.Pool
|
||||
evidenceChannels map[types.NodeID]*p2p.Channel
|
||||
evidenceChannels map[types.NodeID]p2p.Channel
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
nodes []*p2ptest.Node
|
||||
@@ -96,7 +96,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
|
||||
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.evidenceChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -194,7 +194,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
|
||||
|
||||
// processMempoolCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the mempoolCh.
|
||||
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh p2p.Channel) {
|
||||
iter := mempoolCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -215,7 +215,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel)
|
||||
// goroutine or not. If not, we start one for the newly added peer. For down or
|
||||
// removed peers, we remove the peer from the mempool peer ID set and signal to
|
||||
// stop the tx broadcasting goroutine.
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.Lock()
|
||||
@@ -264,7 +264,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -275,7 +275,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh p2p.Channel) {
|
||||
peerMempoolID := r.ids.GetForPeer(peerID)
|
||||
var nextGossipTx *clist.CElement
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
|
||||
reactors map[types.NodeID]*Reactor
|
||||
mempoolChannels map[types.NodeID]*p2p.Channel
|
||||
mempoolChannels map[types.NodeID]p2p.Channel
|
||||
mempools map[types.NodeID]*TxMempool
|
||||
kvstores map[types.NodeID]*kvstore.Application
|
||||
|
||||
@@ -51,7 +51,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
||||
logger: log.NewNopLogger().With("testCase", t.Name()),
|
||||
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
||||
mempools: make(map[types.NodeID]*TxMempool, numNodes),
|
||||
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
@@ -75,7 +75,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.mempoolChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
@@ -37,6 +38,16 @@ type Wrapper interface {
|
||||
Unwrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
type Channel interface {
|
||||
fmt.Stringer
|
||||
|
||||
Err() error
|
||||
|
||||
Send(context.Context, Envelope) error
|
||||
SendError(context.Context, PeerError) error
|
||||
Receive(context.Context) *ChannelIterator
|
||||
}
|
||||
|
||||
// PeerError is a peer error reported via Channel.Error.
|
||||
//
|
||||
// FIXME: This currently just disconnects the peer, which is too simplistic.
|
||||
@@ -56,9 +67,9 @@ type PeerError struct {
|
||||
func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
|
||||
func (pe PeerError) Unwrap() error { return pe.Err }
|
||||
|
||||
// Channel is a bidirectional channel to exchange Protobuf messages with peers.
|
||||
// legacyChannel is a bidirectional channel to exchange Protobuf messages with peers.
|
||||
// Each message is wrapped in an Envelope to specify its sender and receiver.
|
||||
type Channel struct {
|
||||
type legacyChannel struct {
|
||||
ID ChannelID
|
||||
inCh <-chan Envelope // inbound messages (peers to reactors)
|
||||
outCh chan<- Envelope // outbound messages (reactors to peers)
|
||||
@@ -69,9 +80,10 @@ type Channel struct {
|
||||
|
||||
// NewChannel creates a new channel. It is primarily for internal and test
|
||||
// use, reactors should use Router.OpenChannel().
|
||||
func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel {
|
||||
return &Channel{
|
||||
func NewChannel(id ChannelID, name string, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel {
|
||||
return &legacyChannel{
|
||||
ID: id,
|
||||
name: name,
|
||||
inCh: inCh,
|
||||
outCh: outCh,
|
||||
errCh: errCh,
|
||||
@@ -80,7 +92,7 @@ func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh
|
||||
|
||||
// Send blocks until the envelope has been sent, or until ctx ends.
|
||||
// An error only occurs if the context ends before the send completes.
|
||||
func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
|
||||
func (ch *legacyChannel) Send(ctx context.Context, envelope Envelope) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -89,9 +101,15 @@ func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *legacyChannel) Err() error { return nil }
|
||||
|
||||
// SendError blocks until the given error has been sent, or ctx ends.
|
||||
// An error only occurs if the context ends before the send completes.
|
||||
func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
|
||||
func (ch *legacyChannel) SendError(ctx context.Context, pe PeerError) error {
|
||||
if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -100,18 +118,29 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
|
||||
func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
|
||||
|
||||
// Receive returns a new unbuffered iterator to receive messages from ch.
|
||||
// The iterator runs until ctx ends.
|
||||
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
|
||||
func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator {
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope), // unbuffered
|
||||
}
|
||||
go func() {
|
||||
go func(pipe chan<- Envelope) {
|
||||
defer close(iter.pipe)
|
||||
iteratorWorker(ctx, ch, iter.pipe)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case envelope := <-ch.inCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- envelope:
|
||||
}
|
||||
}
|
||||
}
|
||||
}(iter.pipe)
|
||||
return iter
|
||||
}
|
||||
|
||||
@@ -126,21 +155,6 @@ type ChannelIterator struct {
|
||||
current *Envelope
|
||||
}
|
||||
|
||||
func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case envelope := <-ch.inCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- envelope:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns true when the Envelope value has advanced, and false
|
||||
// when the context is canceled or iteration should stop. If an iterator has returned false,
|
||||
// it will never return true again.
|
||||
@@ -179,7 +193,7 @@ func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
|
||||
//
|
||||
// This allows the caller to consume messages from multiple channels
|
||||
// without needing to manage the concurrency separately.
|
||||
func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
|
||||
func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator {
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope), // unbuffered
|
||||
}
|
||||
@@ -187,10 +201,17 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
|
||||
|
||||
for _, ch := range chs {
|
||||
wg.Add(1)
|
||||
go func(ch *Channel) {
|
||||
go func(ch Channel, pipe chan<- Envelope) {
|
||||
defer wg.Done()
|
||||
iteratorWorker(ctx, ch, iter.pipe)
|
||||
}(ch)
|
||||
iter := ch.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- *iter.Envelope():
|
||||
}
|
||||
}
|
||||
}(ch, iter.pipe)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
@@ -16,13 +16,13 @@ type channelInternal struct {
|
||||
Error chan PeerError
|
||||
}
|
||||
|
||||
func testChannel(size int) (*channelInternal, *Channel) {
|
||||
func testChannel(size int) (*channelInternal, *legacyChannel) {
|
||||
in := &channelInternal{
|
||||
In: make(chan Envelope, size),
|
||||
Out: make(chan Envelope, size),
|
||||
Error: make(chan PeerError, size),
|
||||
}
|
||||
ch := &Channel{
|
||||
ch := &legacyChannel{
|
||||
inCh: in.In,
|
||||
outCh: in.Out,
|
||||
errCh: in.Error,
|
||||
|
||||
@@ -146,8 +146,8 @@ func (n *Network) MakeChannels(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) map[types.NodeID]*p2p.Channel {
|
||||
channels := map[types.NodeID]*p2p.Channel{}
|
||||
) map[types.NodeID]p2p.Channel {
|
||||
channels := map[types.NodeID]p2p.Channel{}
|
||||
for _, node := range n.Nodes {
|
||||
channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc)
|
||||
}
|
||||
@@ -161,8 +161,8 @@ func (n *Network) MakeChannelsNoCleanup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) map[types.NodeID]*p2p.Channel {
|
||||
channels := map[types.NodeID]*p2p.Channel{}
|
||||
) map[types.NodeID]p2p.Channel {
|
||||
channels := map[types.NodeID]p2p.Channel{}
|
||||
for _, node := range n.Nodes {
|
||||
channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc)
|
||||
}
|
||||
@@ -304,7 +304,7 @@ func (n *Node) MakeChannel(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) *p2p.Channel {
|
||||
) p2p.Channel {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
channel, err := n.Router.OpenChannel(ctx, chDesc)
|
||||
require.NoError(t, err)
|
||||
@@ -321,7 +321,7 @@ func (n *Node) MakeChannelNoCleanup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) *p2p.Channel {
|
||||
) p2p.Channel {
|
||||
channel, err := n.Router.OpenChannel(ctx, chDesc)
|
||||
require.NoError(t, err)
|
||||
return channel
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
// RequireEmpty requires that the given channel is empty.
|
||||
func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
|
||||
func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
@@ -32,7 +32,7 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
|
||||
}
|
||||
|
||||
// RequireReceive requires that the given envelope is received on the channel.
|
||||
func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
|
||||
func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
@@ -54,7 +54,7 @@ func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, exp
|
||||
|
||||
// RequireReceiveUnordered requires that the given envelopes are all received on
|
||||
// the channel, ignoring order.
|
||||
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) {
|
||||
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -75,7 +75,7 @@ func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Cha
|
||||
}
|
||||
|
||||
// RequireSend requires that the given envelope is sent on the channel.
|
||||
func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) {
|
||||
func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) {
|
||||
tctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -93,7 +93,7 @@ func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelo
|
||||
func RequireSendReceive(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
channel *p2p.Channel,
|
||||
channel p2p.Channel,
|
||||
peerID types.NodeID,
|
||||
send proto.Message,
|
||||
receive proto.Message,
|
||||
@@ -116,7 +116,7 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp
|
||||
}
|
||||
|
||||
// RequireError requires that the given peer error is submitted for a peer.
|
||||
func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
|
||||
func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) {
|
||||
tctx, tcancel := context.WithTimeout(ctx, time.Second)
|
||||
defer tcancel()
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ func (r *Reactor) OnStop() {}
|
||||
|
||||
// processPexCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the pexCh.
|
||||
func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) {
|
||||
incoming := make(chan *p2p.Envelope)
|
||||
go func() {
|
||||
defer close(incoming)
|
||||
@@ -192,8 +192,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
// A request from another peer, or a response to one of our requests.
|
||||
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to process message",
|
||||
"ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := pexCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
@@ -225,7 +224,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
// handlePexMessage handles envelopes sent from peers on the PexChannel.
|
||||
// If an update was received, a new polling interval is returned; otherwise the
|
||||
// duration is 0.
|
||||
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
|
||||
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) {
|
||||
logger := r.logger.With("peer", envelope.From)
|
||||
|
||||
switch msg := envelope.Message.(type) {
|
||||
@@ -308,7 +307,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
// that peer a request for more peer addresses. The chosen peer is moved into
|
||||
// the requestsSent bucket so that we will not attempt to contact them again
|
||||
// until they've replied or updated.
|
||||
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
|
||||
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
if len(r.availablePeers) == 0 {
|
||||
|
||||
@@ -275,7 +275,7 @@ type singleTestReactor struct {
|
||||
pexInCh chan p2p.Envelope
|
||||
pexOutCh chan p2p.Envelope
|
||||
pexErrCh chan p2p.PeerError
|
||||
pexCh *p2p.Channel
|
||||
pexCh p2p.Channel
|
||||
peerCh chan p2p.PeerUpdate
|
||||
manager *p2p.PeerManager
|
||||
}
|
||||
@@ -287,8 +287,11 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
|
||||
pexInCh := make(chan p2p.Envelope, chBuf)
|
||||
pexOutCh := make(chan p2p.Envelope, chBuf)
|
||||
pexErrCh := make(chan p2p.PeerError, chBuf)
|
||||
|
||||
chDesc := pex.ChannelDescriptor()
|
||||
pexCh := p2p.NewChannel(
|
||||
p2p.ChannelID(pex.PexChannel),
|
||||
chDesc.ID,
|
||||
chDesc.Name,
|
||||
pexInCh,
|
||||
pexOutCh,
|
||||
pexErrCh,
|
||||
@@ -299,7 +302,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
|
||||
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return pexCh, nil
|
||||
}
|
||||
|
||||
@@ -324,7 +327,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
|
||||
reactors map[types.NodeID]*pex.Reactor
|
||||
pexChannels map[types.NodeID]*p2p.Channel
|
||||
pexChannels map[types.NodeID]p2p.Channel
|
||||
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
@@ -367,7 +370,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
|
||||
logger: log.NewNopLogger().With("testCase", t.Name()),
|
||||
network: p2ptest.MakeNetwork(ctx, t, networkOpts),
|
||||
reactors: make(map[types.NodeID]*pex.Reactor, realNodes),
|
||||
pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes),
|
||||
pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, opts.TotalNodes),
|
||||
total: opts.TotalNodes,
|
||||
@@ -388,7 +391,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.pexChannels[nodeID], nil
|
||||
}
|
||||
|
||||
@@ -448,7 +451,7 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
|
||||
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
|
||||
r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return r.pexChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -239,7 +239,7 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error
|
||||
// ChannelCreator allows routers to construct their own channels,
|
||||
// either by receiving a reference to Router.OpenChannel or using some
|
||||
// kind shim for testing purposes.
|
||||
type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
|
||||
type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error)
|
||||
|
||||
// OpenChannel opens a new channel for the given message type. The caller must
|
||||
// close the channel when done, before stopping the Router. messageType is the
|
||||
@@ -247,7 +247,7 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
|
||||
// implement Wrapper to automatically (un)wrap multiple message types in a
|
||||
// wrapper message. The caller may provide a size to make the channel buffered,
|
||||
// which internally makes the inbound, outbound, and error channel buffered.
|
||||
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) {
|
||||
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error) {
|
||||
r.channelMtx.Lock()
|
||||
defer r.channelMtx.Unlock()
|
||||
|
||||
@@ -262,11 +262,10 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
|
||||
queue := r.queueFactory(chDesc.RecvBufferCapacity)
|
||||
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
|
||||
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
|
||||
channel := NewChannel(id, queue.dequeue(), outCh, errCh)
|
||||
channel.name = chDesc.Name
|
||||
channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh)
|
||||
|
||||
var wrapper Wrapper
|
||||
if w, ok := messageType.(Wrapper); ok {
|
||||
if w, ok := chDesc.MessageType.(Wrapper); ok {
|
||||
wrapper = w
|
||||
}
|
||||
|
||||
@@ -287,7 +286,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
|
||||
queue.close()
|
||||
}()
|
||||
|
||||
r.routeChannel(ctx, id, outCh, errCh, wrapper)
|
||||
r.routeChannel(ctx, chDesc.ID, outCh, errCh, wrapper)
|
||||
}()
|
||||
|
||||
return channel, nil
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func echoReactor(ctx context.Context, channel *p2p.Channel) {
|
||||
func echoReactor(ctx context.Context, channel p2p.Channel) {
|
||||
iter := channel.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
|
||||
@@ -26,14 +26,14 @@ var (
|
||||
// NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
|
||||
type Dispatcher struct {
|
||||
// the channel with which to send light block requests on
|
||||
requestCh *p2p.Channel
|
||||
requestCh p2p.Channel
|
||||
|
||||
mtx sync.Mutex
|
||||
// all pending calls that have been dispatched and are awaiting an answer
|
||||
calls map[types.NodeID]chan *types.LightBlock
|
||||
}
|
||||
|
||||
func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher {
|
||||
func NewDispatcher(requestChannel p2p.Channel) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
requestCh: requestChannel,
|
||||
calls: make(map[types.NodeID]chan *types.LightBlock),
|
||||
|
||||
@@ -24,13 +24,13 @@ type channelInternal struct {
|
||||
Error chan p2p.PeerError
|
||||
}
|
||||
|
||||
func testChannel(size int) (*channelInternal, *p2p.Channel) {
|
||||
func testChannel(size int) (*channelInternal, p2p.Channel) {
|
||||
in := &channelInternal{
|
||||
In: make(chan p2p.Envelope, size),
|
||||
Out: make(chan p2p.Envelope, size),
|
||||
Error: make(chan p2p.PeerError, size),
|
||||
}
|
||||
return in, p2p.NewChannel(0, in.In, in.Out, in.Error)
|
||||
return in, p2p.NewChannel(0, "test", in.In, in.Out, in.Error)
|
||||
}
|
||||
|
||||
func TestDispatcherBasic(t *testing.T) {
|
||||
|
||||
@@ -305,7 +305,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]p2p.Channel{
|
||||
SnapshotChannel: snapshotCh,
|
||||
ChunkChannel: chunkCh,
|
||||
LightBlockChannel: blockCh,
|
||||
@@ -611,7 +611,7 @@ func (r *Reactor) backfill(
|
||||
// handleSnapshotMessage handles envelopes sent from peers on the
|
||||
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
|
||||
// for this channel. This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh p2p.Channel) error {
|
||||
logger := r.logger.With("peer", envelope.From)
|
||||
|
||||
switch msg := envelope.Message.(type) {
|
||||
@@ -683,7 +683,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
|
||||
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
|
||||
// It returns an error only if the Envelope.Message is unknown for this channel.
|
||||
// This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
r.logger.Debug(
|
||||
@@ -772,7 +772,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.LightBlockRequest:
|
||||
r.logger.Info("received light block request", "height", msg.Height)
|
||||
@@ -829,7 +829,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ParamsRequest:
|
||||
r.logger.Debug("received consensus params request", "height", msg.Height)
|
||||
@@ -878,7 +878,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -912,12 +912,12 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
|
||||
// encountered during message execution will result in a PeerError being sent on
|
||||
// the respective channel. When the reactor is stopped, we will catch the signal
|
||||
// and close the p2p Channel gracefully.
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) {
|
||||
// make sure that the iterator gets cleaned up in case of error
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]p2p.Channel) {
|
||||
// make sure tht the iterator gets cleaned up in case of error
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
chs := make([]*p2p.Channel, 0, len(chanTable))
|
||||
chs := make([]p2p.Channel, 0, len(chanTable))
|
||||
for key := range chanTable {
|
||||
chs = append(chs, chanTable[key])
|
||||
}
|
||||
|
||||
@@ -40,22 +40,22 @@ type reactorTestSuite struct {
|
||||
conn *clientmocks.Client
|
||||
stateProvider *mocks.StateProvider
|
||||
|
||||
snapshotChannel *p2p.Channel
|
||||
snapshotChannel p2p.Channel
|
||||
snapshotInCh chan p2p.Envelope
|
||||
snapshotOutCh chan p2p.Envelope
|
||||
snapshotPeerErrCh chan p2p.PeerError
|
||||
|
||||
chunkChannel *p2p.Channel
|
||||
chunkChannel p2p.Channel
|
||||
chunkInCh chan p2p.Envelope
|
||||
chunkOutCh chan p2p.Envelope
|
||||
chunkPeerErrCh chan p2p.PeerError
|
||||
|
||||
blockChannel *p2p.Channel
|
||||
blockChannel p2p.Channel
|
||||
blockInCh chan p2p.Envelope
|
||||
blockOutCh chan p2p.Envelope
|
||||
blockPeerErrCh chan p2p.PeerError
|
||||
|
||||
paramsChannel *p2p.Channel
|
||||
paramsChannel p2p.Channel
|
||||
paramsInCh chan p2p.Envelope
|
||||
paramsOutCh chan p2p.Envelope
|
||||
paramsPeerErrCh chan p2p.PeerError
|
||||
@@ -102,6 +102,7 @@ func setup(
|
||||
|
||||
rts.snapshotChannel = p2p.NewChannel(
|
||||
SnapshotChannel,
|
||||
"snapshot",
|
||||
rts.snapshotInCh,
|
||||
rts.snapshotOutCh,
|
||||
rts.snapshotPeerErrCh,
|
||||
@@ -109,6 +110,7 @@ func setup(
|
||||
|
||||
rts.chunkChannel = p2p.NewChannel(
|
||||
ChunkChannel,
|
||||
"chunk",
|
||||
rts.chunkInCh,
|
||||
rts.chunkOutCh,
|
||||
rts.chunkPeerErrCh,
|
||||
@@ -116,6 +118,7 @@ func setup(
|
||||
|
||||
rts.blockChannel = p2p.NewChannel(
|
||||
LightBlockChannel,
|
||||
"lightblock",
|
||||
rts.blockInCh,
|
||||
rts.blockOutCh,
|
||||
rts.blockPeerErrCh,
|
||||
@@ -123,6 +126,7 @@ func setup(
|
||||
|
||||
rts.paramsChannel = p2p.NewChannel(
|
||||
ParamsChannel,
|
||||
"params",
|
||||
rts.paramsInCh,
|
||||
rts.paramsOutCh,
|
||||
rts.paramsPeerErrCh,
|
||||
@@ -133,7 +137,7 @@ func setup(
|
||||
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
|
||||
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
switch desc.ID {
|
||||
case SnapshotChannel:
|
||||
return rts.snapshotChannel, nil
|
||||
|
||||
@@ -208,7 +208,7 @@ type stateProviderP2P struct {
|
||||
sync.Mutex // light.Client is not concurrency-safe
|
||||
lc *light.Client
|
||||
initialHeight int64
|
||||
paramsSendCh *p2p.Channel
|
||||
paramsSendCh p2p.Channel
|
||||
paramsRecvCh chan types.ConsensusParams
|
||||
}
|
||||
|
||||
@@ -220,7 +220,7 @@ func NewP2PStateProvider(
|
||||
initialHeight int64,
|
||||
providers []lightprovider.Provider,
|
||||
trustOptions light.TrustOptions,
|
||||
paramsSendCh *p2p.Channel,
|
||||
paramsSendCh p2p.Channel,
|
||||
logger log.Logger,
|
||||
) (StateProvider, error) {
|
||||
if len(providers) < 2 {
|
||||
|
||||
@@ -56,8 +56,8 @@ type syncer struct {
|
||||
stateProvider StateProvider
|
||||
conn abciclient.Client
|
||||
snapshots *snapshotPool
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
snapshotCh p2p.Channel
|
||||
chunkCh p2p.Channel
|
||||
tempDir string
|
||||
fetchers int32
|
||||
retryTimeout time.Duration
|
||||
|
||||
Reference in New Issue
Block a user