mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-28 18:10:20 +00:00
Rename new methods to *Envelope instead of New*
This commit is contained in:
@@ -152,7 +152,7 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
|
||||
// AddPeer implements Reactor by sending our state to peer.
|
||||
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Base: bcR.store.Base(),
|
||||
@@ -183,21 +183,21 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
|
||||
return false
|
||||
}
|
||||
|
||||
return src.NewTrySend(p2p.Envelope{
|
||||
return src.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.BlockResponse{Block: bl},
|
||||
})
|
||||
}
|
||||
|
||||
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
|
||||
return src.NewTrySend(p2p.Envelope{
|
||||
return src.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.NoBlockResponse{Height: msg.Height},
|
||||
})
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling 4 types of messages (look below).
|
||||
func (bcR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
if err := ValidateMsg(e.Message); err != nil {
|
||||
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
|
||||
bcR.Switch.StopPeerForError(e.Src, err)
|
||||
@@ -218,7 +218,7 @@ func (bcR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
|
||||
case *bcproto.StatusRequest:
|
||||
// Send peer our state.
|
||||
e.Src.NewTrySend(p2p.Envelope{
|
||||
e.Src.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Height: bcR.store.Height(),
|
||||
@@ -245,7 +245,7 @@ func (bcR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
bcR.NewReceive(p2p.Envelope{
|
||||
bcR.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: uw,
|
||||
@@ -287,7 +287,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
|
||||
if peer == nil {
|
||||
continue
|
||||
}
|
||||
queued := peer.NewTrySend(p2p.Envelope{
|
||||
queued := peer.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.BlockRequest{Height: request.Height},
|
||||
})
|
||||
@@ -429,7 +429,7 @@ FOR_LOOP:
|
||||
|
||||
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
|
||||
func (bcR *Reactor) BroadcastStatusRequest() {
|
||||
bcR.Switch.NewBroadcast(p2p.Envelope{
|
||||
bcR.Switch.BroadcastEnvelope(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusRequest{},
|
||||
})
|
||||
|
||||
@@ -166,13 +166,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
for i, peer := range peerList {
|
||||
if i < len(peerList)/2 {
|
||||
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
Message: &tmcons.Vote{Vote: prevote1.ToProto()},
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
} else {
|
||||
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
Message: &tmcons.Vote{Vote: prevote2.ToProto()},
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
@@ -527,7 +527,7 @@ func sendProposalAndParts(
|
||||
parts *types.PartSet,
|
||||
) {
|
||||
// proposal
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
|
||||
})
|
||||
@@ -539,7 +539,7 @@ func sendProposalAndParts(
|
||||
if err != nil {
|
||||
panic(err) // TODO: wbanfield better error handling
|
||||
}
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
@@ -554,11 +554,11 @@ func sendProposalAndParts(
|
||||
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
|
||||
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
|
||||
cs.mtx.Unlock()
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{Vote: prevote.ToProto()},
|
||||
})
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{Vote: precommit.ToProto()},
|
||||
})
|
||||
@@ -599,8 +599,8 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
|
||||
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
br.reactor.RemovePeer(peer, reason)
|
||||
}
|
||||
func (br *ByzantineReactor) NewReceive(e p2p.Envelope) {
|
||||
br.reactor.NewReceive(e)
|
||||
func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
br.reactor.ReceiveEnvelope(e)
|
||||
}
|
||||
func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) {
|
||||
br.reactor.Receive(chID, p, m)
|
||||
|
||||
@@ -95,7 +95,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
|
||||
peers := sw.Peers().List()
|
||||
for _, peer := range peers {
|
||||
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
Message: &tmcons.Vote{Vote: precommit.ToProto()},
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
|
||||
@@ -226,7 +226,7 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// Peer state updates can happen in parallel, but processing of
|
||||
// proposals, block parts, and votes are ordered by the receiveRoutine
|
||||
// NOTE: blocks on consensus state for proposals, block parts, and votes
|
||||
func (conR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
if !conR.IsRunning() {
|
||||
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
|
||||
return
|
||||
@@ -303,7 +303,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
if votes := ourVotes.ToProto(); votes != nil {
|
||||
eMsg.Votes = *votes
|
||||
}
|
||||
e.Src.NewTrySend(p2p.Envelope{
|
||||
e.Src.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: VoteSetBitsChannel,
|
||||
Message: eMsg,
|
||||
})
|
||||
@@ -398,7 +398,7 @@ func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
conR.NewReceive(p2p.Envelope{
|
||||
conR.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: uw,
|
||||
@@ -455,7 +455,7 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
|
||||
|
||||
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
conR.Switch.BroadcastEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: nrsMsg,
|
||||
})
|
||||
@@ -470,7 +470,7 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
|
||||
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
|
||||
IsCommit: rs.Step == cstypes.RoundStepCommit,
|
||||
}
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
conR.Switch.BroadcastEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: csMsg,
|
||||
})
|
||||
@@ -484,7 +484,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
Type: vote.Type,
|
||||
Index: vote.ValidatorIndex,
|
||||
}
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
conR.Switch.BroadcastEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: msg,
|
||||
})
|
||||
@@ -502,7 +502,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
|
||||
Message: p,
|
||||
}
|
||||
peer.NewTrySend(e)
|
||||
peer.TrySendEnvelope(e)
|
||||
} else {
|
||||
// Height doesn't match
|
||||
// TODO: check a field, maybe CatchupCommitRound?
|
||||
@@ -526,7 +526,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep)
|
||||
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
|
||||
rs := conR.getRoundState()
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: nrsMsg,
|
||||
})
|
||||
@@ -573,7 +573,7 @@ OUTER_LOOP:
|
||||
panic(err)
|
||||
}
|
||||
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
|
||||
if peer.NewSend(p2p.Envelope{
|
||||
if peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: rs.Height, // This tells peer that this part applies to us.
|
||||
@@ -627,7 +627,7 @@ OUTER_LOOP:
|
||||
// Proposal: share the proposal metadata with peer.
|
||||
{
|
||||
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
|
||||
if peer.NewSend(p2p.Envelope{
|
||||
if peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
|
||||
}) {
|
||||
@@ -641,7 +641,7 @@ OUTER_LOOP:
|
||||
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
|
||||
if 0 <= rs.Proposal.POLRound {
|
||||
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.ProposalPOL{
|
||||
Height: rs.Height,
|
||||
@@ -691,7 +691,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
|
||||
logger.Error("Could not convert part to proto", "index", index, "error", err)
|
||||
return
|
||||
}
|
||||
if peer.NewSend(p2p.Envelope{
|
||||
if peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: prs.Height, // Not our height, so it doesn't matter.
|
||||
@@ -858,7 +858,7 @@ OUTER_LOOP:
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
|
||||
|
||||
peer.NewTrySend(p2p.Envelope{
|
||||
peer.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
@@ -878,7 +878,7 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
|
||||
peer.NewTrySend(p2p.Envelope{
|
||||
peer.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
@@ -899,7 +899,7 @@ OUTER_LOOP:
|
||||
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
|
||||
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
|
||||
|
||||
peer.NewTrySend(p2p.Envelope{
|
||||
peer.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
@@ -922,7 +922,7 @@ OUTER_LOOP:
|
||||
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
|
||||
prs.Height >= conR.conS.blockStore.Base() {
|
||||
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
|
||||
peer.NewTrySend(p2p.Envelope{
|
||||
peer.TrySendEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
@@ -1145,7 +1145,7 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
|
||||
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
|
||||
if vote, ok := ps.PickVoteToSend(votes); ok {
|
||||
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
|
||||
if ps.peer.NewSend(p2p.Envelope{
|
||||
if ps.peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{
|
||||
Vote: vote.ToProto(),
|
||||
|
||||
@@ -272,7 +272,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
|
||||
|
||||
// simulate switch calling Receive before AddPeer
|
||||
assert.NotPanics(t, func() {
|
||||
reactor.NewReceive(p2p.Envelope{
|
||||
reactor.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Src: peer,
|
||||
Message: &tmcons.HasVote{Height: 1,
|
||||
@@ -298,7 +298,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
|
||||
|
||||
// simulate switch calling Receive before AddPeer
|
||||
assert.Panics(t, func() {
|
||||
reactor.NewReceive(p2p.Envelope{
|
||||
reactor.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Src: peer,
|
||||
Message: &tmcons.HasVote{Height: 1,
|
||||
|
||||
@@ -68,7 +68,7 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received evidence to the evpool.
|
||||
func (evR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
evis, err := evidenceListFromProto(e.Message)
|
||||
if err != nil {
|
||||
evR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
@@ -98,7 +98,7 @@ func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
evR.NewReceive(p2p.Envelope{
|
||||
evR.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: msg,
|
||||
@@ -146,7 +146,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
success := peer.NewSend(p2p.Envelope{
|
||||
success := peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: EvidenceChannel,
|
||||
Message: evp,
|
||||
})
|
||||
|
||||
@@ -208,7 +208,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
|
||||
// i.e. broadcastEvidenceRoutine finishes when peer is stopped
|
||||
defer leaktest.CheckTimeout(t, 10*time.Second)()
|
||||
|
||||
p.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
p.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == evidence.EvidenceChannel
|
||||
})).Return(false)
|
||||
|
||||
@@ -157,7 +157,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
switch msg := e.Message.(type) {
|
||||
case *protomem.Txs:
|
||||
@@ -200,7 +200,7 @@ func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
memR.NewReceive(p2p.Envelope{
|
||||
memR.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: uw,
|
||||
@@ -261,7 +261,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
|
||||
if _, ok := memTx.senders.Load(peerID); !ok {
|
||||
success := peer.NewSend(p2p.Envelope{
|
||||
success := peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
})
|
||||
|
||||
@@ -283,7 +283,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
|
||||
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
|
||||
peer := mock.NewPeer(nil)
|
||||
reactor.NewReceive(p2p.Envelope{
|
||||
reactor.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Src: peer,
|
||||
Message: &memproto.Message{}, // This uses the wrong message type on purpose to stop the peer as in an error state in the reactor.
|
||||
|
||||
@@ -156,7 +156,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
switch msg := e.Message.(type) {
|
||||
case *protomem.Txs:
|
||||
@@ -199,7 +199,7 @@ func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
memR.NewReceive(p2p.Envelope{
|
||||
memR.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: uw,
|
||||
@@ -262,7 +262,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
// NOTE: Transaction batching was disabled due to
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
if !memTx.HasPeer(peerID) {
|
||||
success := peer.NewSend(p2p.Envelope{
|
||||
success := peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
})
|
||||
|
||||
@@ -45,20 +45,20 @@ type Reactor interface {
|
||||
//
|
||||
// CONTRACT: msgBytes are not nil.
|
||||
//
|
||||
// Only one of Receive or NewReceive are called per message. If NewReceive
|
||||
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
|
||||
// is implemented, it will be used, otherwise the switch will fallback to
|
||||
// using Receive. Receive will be replaced by NewReceive in a future version
|
||||
// using Receive. Receive will be replaced by ReceiveEnvelope in a future version
|
||||
Receive(chID byte, peer Peer, msgBytes []byte)
|
||||
}
|
||||
|
||||
type NewReceiver interface {
|
||||
// NewReceive is called by the switch when an envelope is received from any connected
|
||||
type ReceiveEnveloper interface {
|
||||
// ReceiveEnvelope is called by the switch when an envelope is received from any connected
|
||||
// peer on any of the channels registered by the reactor.
|
||||
//
|
||||
// Only one of Receive or NewReceive are called per message. If NewReceive
|
||||
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
|
||||
// is implemented, it will be used, otherwise the switch will fallback to
|
||||
// using Receive. Receive will be replaced by NewReceive in a future version
|
||||
NewReceive(Envelope)
|
||||
// using Receive. Receive will be replaced by ReceiveEnvelope in a future version
|
||||
ReceiveEnvelope(Envelope)
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
@@ -81,6 +81,6 @@ func (br *BaseReactor) SetSwitch(sw *Switch) {
|
||||
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
|
||||
func (*BaseReactor) AddPeer(peer Peer) {}
|
||||
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
|
||||
func (*BaseReactor) NewReceive(e Envelope) {}
|
||||
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
|
||||
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
|
||||
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
|
||||
|
||||
@@ -43,8 +43,8 @@ func NewPeer(ip net.IP) *Peer {
|
||||
}
|
||||
|
||||
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
|
||||
func (mp *Peer) NewTrySend(e p2p.Envelope) bool { return true }
|
||||
func (mp *Peer) NewSend(e p2p.Envelope) bool { return true }
|
||||
func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true }
|
||||
func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true }
|
||||
func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true }
|
||||
func (mp *Peer) Send(_ byte, _ []byte) bool { return true }
|
||||
func (mp *Peer) NodeInfo() p2p.NodeInfo {
|
||||
|
||||
@@ -22,5 +22,5 @@ func NewReactor() *Reactor {
|
||||
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
|
||||
func (r *Reactor) AddPeer(peer p2p.Peer) {}
|
||||
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
|
||||
func (r *Reactor) NewReceive(e p2p.Envelope) {}
|
||||
func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {}
|
||||
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
|
||||
|
||||
@@ -109,8 +109,8 @@ func (_m *Peer) IsRunning() bool {
|
||||
return r0
|
||||
}
|
||||
|
||||
// NewSend provides a mock function with given fields: _a0
|
||||
func (_m *Peer) NewSend(_a0 p2p.Envelope) bool {
|
||||
// SendEnvelope provides a mock function with given fields: _a0
|
||||
func (_m *Peer) SendEnvelope(_a0 p2p.Envelope) bool {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 bool
|
||||
@@ -123,8 +123,8 @@ func (_m *Peer) NewSend(_a0 p2p.Envelope) bool {
|
||||
return r0
|
||||
}
|
||||
|
||||
// NewTrySend provides a mock function with given fields: _a0
|
||||
func (_m *Peer) NewTrySend(_a0 p2p.Envelope) bool {
|
||||
// TrySendEnvelope provides a mock function with given fields: _a0
|
||||
func (_m *Peer) TrySendEnvelope(_a0 p2p.Envelope) bool {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 bool
|
||||
|
||||
22
p2p/peer.go
22
p2p/peer.go
@@ -37,8 +37,8 @@ type Peer interface {
|
||||
Status() tmconn.ConnectionStatus
|
||||
SocketAddr() *NetAddress // actual address of the socket
|
||||
|
||||
NewSend(Envelope) bool
|
||||
NewTrySend(Envelope) bool
|
||||
SendEnvelope(Envelope) bool
|
||||
TrySendEnvelope(Envelope) bool
|
||||
|
||||
Send(byte, []byte) bool
|
||||
TrySend(byte, []byte) bool
|
||||
@@ -199,7 +199,7 @@ func (p *peer) OnStart() error {
|
||||
}
|
||||
|
||||
// FlushStop mimics OnStop but additionally ensures that all successful
|
||||
// NewSend() calls will get flushed before closing the connection.
|
||||
// SendEnvelope() calls will get flushed before closing the connection.
|
||||
// NOTE: it is not safe to call this method more than once.
|
||||
func (p *peer) FlushStop() {
|
||||
p.metricsTicker.Stop()
|
||||
@@ -252,10 +252,10 @@ func (p *peer) Status() tmconn.ConnectionStatus {
|
||||
return p.mconn.Status()
|
||||
}
|
||||
|
||||
// NewSend sends the message in the envelope on the channel specified by the
|
||||
// SendEnvelope sends the message in the envelope on the channel specified by the
|
||||
// envelope. Returns false if the connection times out trying to place the message
|
||||
// onto its internal queue.
|
||||
func (p *peer) NewSend(e Envelope) bool {
|
||||
func (p *peer) SendEnvelope(e Envelope) bool {
|
||||
if !p.IsRunning() {
|
||||
return false
|
||||
} else if !p.hasChannel(e.ChannelID) {
|
||||
@@ -280,7 +280,7 @@ func (p *peer) NewSend(e Envelope) bool {
|
||||
|
||||
// Send msg bytes to the channel identified by chID byte. Returns false if the
|
||||
// send queue is full after timeout, specified by MConnection.
|
||||
// NewSend replaces TrySend which will be deprecated in a future release.
|
||||
// SendEnvelope replaces TrySend which will be deprecated in a future release.
|
||||
func (p *peer) Send(chID byte, msgBytes []byte) bool {
|
||||
if !p.IsRunning() {
|
||||
return false
|
||||
@@ -298,10 +298,10 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool {
|
||||
return res
|
||||
}
|
||||
|
||||
// NewTrySend attempts to sends the message in the envelope on the channel specified by the
|
||||
// TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the
|
||||
// envelope. Returns false immediately if the connection's internal queue is full
|
||||
// NewTrySend replaces TrySend which will be deprecated in a future release.
|
||||
func (p *peer) NewTrySend(e Envelope) bool {
|
||||
// TrySendEnvelope replaces TrySend which will be deprecated in a future release.
|
||||
func (p *peer) TrySendEnvelope(e Envelope) bool {
|
||||
if !p.IsRunning() {
|
||||
// see Switch#Broadcast, where we fetch the list of peers and loop over
|
||||
// them - while we're looping, one peer may be removed and stopped.
|
||||
@@ -465,8 +465,8 @@ func createMConnection(
|
||||
}
|
||||
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
||||
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
|
||||
if nr, ok := reactor.(NewReceiver); ok {
|
||||
nr.NewReceive(Envelope{
|
||||
if nr, ok := reactor.(ReceiveEnveloper); ok {
|
||||
nr.ReceiveEnvelope(Envelope{
|
||||
ChannelID: chID,
|
||||
Src: p,
|
||||
Message: msg,
|
||||
|
||||
@@ -19,8 +19,8 @@ type mockPeer struct {
|
||||
}
|
||||
|
||||
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
|
||||
func (mp *mockPeer) NewTrySend(e Envelope) bool { return true }
|
||||
func (mp *mockPeer) NewSend(e Envelope) bool { return true }
|
||||
func (mp *mockPeer) TrySendEnvelope(e Envelope) bool { return true }
|
||||
func (mp *mockPeer) SendEnvelope(e Envelope) bool { return true }
|
||||
func (mp *mockPeer) TrySend(_ byte, _ []byte) bool { return true }
|
||||
func (mp *mockPeer) Send(_ byte, _ []byte) bool { return true }
|
||||
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestPeerSend(t *testing.T) {
|
||||
})
|
||||
|
||||
assert.True(p.CanSend(testCh))
|
||||
assert.True(p.NewSend(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
|
||||
assert.True(p.SendEnvelope(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
|
||||
}
|
||||
|
||||
func createOutboundPeerAndPerformHandshake(
|
||||
|
||||
@@ -237,7 +237,7 @@ func (r *Reactor) logErrAddrBook(err error) {
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling incoming PEX messages.
|
||||
func (r *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
|
||||
switch msg := e.Message.(type) {
|
||||
@@ -310,7 +310,7 @@ func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
r.NewReceive(p2p.Envelope{
|
||||
r.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: um,
|
||||
@@ -360,7 +360,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
|
||||
}
|
||||
r.Logger.Debug("Request addrs", "from", p)
|
||||
r.requestsSent.Set(id, struct{}{})
|
||||
p.NewSend(p2p.Envelope{
|
||||
p.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Message: &tmp2p.PexRequest{},
|
||||
})
|
||||
@@ -425,7 +425,7 @@ func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
|
||||
ChannelID: PexChannel,
|
||||
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
|
||||
}
|
||||
p.NewSend(e)
|
||||
p.SendEnvelope(e)
|
||||
}
|
||||
|
||||
// SetEnsurePeersPeriod sets period to ensure peers connected.
|
||||
|
||||
@@ -132,10 +132,10 @@ func TestPEXReactorReceive(t *testing.T) {
|
||||
|
||||
size := book.Size()
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
assert.Equal(t, size+1, book.Size())
|
||||
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
}
|
||||
|
||||
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||
@@ -156,17 +156,17 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||
id := string(peer.ID())
|
||||
|
||||
// first time creates the entry
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// next time sets the last time value
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// third time is too many too soon - peer is removed
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
assert.False(t, r.lastReceivedRequests.Has(id))
|
||||
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||
assert.True(t, book.IsBanned(peerAddr))
|
||||
@@ -193,12 +193,12 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
|
||||
// receive some addrs. should clear the request
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
assert.False(t, r.requestsSent.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// receiving more unsolicited addrs causes a disconnect and ban
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||
assert.True(t, book.IsBanned(peer.SocketAddr()))
|
||||
}
|
||||
@@ -485,7 +485,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
|
||||
|
||||
size := book.Size()
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
pexR.NewReceive(p2p.Envelope{
|
||||
pexR.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Src: peer,
|
||||
Message: msg,
|
||||
|
||||
@@ -262,14 +262,14 @@ func (sw *Switch) OnStop() {
|
||||
//---------------------------------------------------------------------
|
||||
// Peers
|
||||
|
||||
// NewBroadcast runs a go routine for each attempted send, which will block trying
|
||||
// BroadcastEnvelope runs a go routine for each attempted send, which will block trying
|
||||
// to send for defaultSendTimeoutSeconds. Returns a channel which receives
|
||||
// success values for each attempted send (false if times out). Channel will be
|
||||
// closed once msg bytes are sent to all peers (or time out).
|
||||
// NewBroadcasts sends to the peers using the NewSend method.
|
||||
// BroadcastEnvelopes sends to the peers using the SendEnvelope method.
|
||||
//
|
||||
// NOTE: NewBroadcast uses goroutines, so order of broadcast may not be preserved.
|
||||
func (sw *Switch) NewBroadcast(e Envelope) chan bool {
|
||||
// NOTE: BroadcastEnvelope uses goroutines, so order of broadcast may not be preserved.
|
||||
func (sw *Switch) BroadcastEnvelope(e Envelope) chan bool {
|
||||
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
|
||||
|
||||
peers := sw.peers.List()
|
||||
@@ -280,7 +280,7 @@ func (sw *Switch) NewBroadcast(e Envelope) chan bool {
|
||||
for _, peer := range peers {
|
||||
go func(p Peer) {
|
||||
defer wg.Done()
|
||||
success := p.NewSend(e)
|
||||
success := p.SendEnvelope(e)
|
||||
successChan <- success
|
||||
}(peer)
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
|
||||
|
||||
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
|
||||
|
||||
func (tr *TestReactor) NewReceive(e Envelope) {
|
||||
func (tr *TestReactor) ReceiveEnvelope(e Envelope) {
|
||||
if tr.logMessages {
|
||||
tr.mtx.Lock()
|
||||
defer tr.mtx.Unlock()
|
||||
@@ -92,7 +92,7 @@ func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
tr.NewReceive(Envelope{
|
||||
tr.ReceiveEnvelope(Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: um,
|
||||
@@ -175,9 +175,9 @@ func TestSwitches(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
|
||||
s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
|
||||
s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
|
||||
s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
|
||||
s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
|
||||
s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch0Msg,
|
||||
byte(0x00),
|
||||
@@ -468,7 +468,7 @@ func TestSwitchStopPeerForError(t *testing.T) {
|
||||
|
||||
// send messages to the peer from sw1
|
||||
p := sw1.Peers().List()[0]
|
||||
p.NewSend(Envelope{
|
||||
p.SendEnvelope(Envelope{
|
||||
ChannelID: 0x1,
|
||||
Message: &p2pproto.Message{},
|
||||
})
|
||||
@@ -866,7 +866,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
|
||||
// Send random message from foo channel to another
|
||||
for i := 0; i < b.N; i++ {
|
||||
chID := byte(i % 4)
|
||||
successChan := s1.NewBroadcast(Envelope{ChannelID: chID})
|
||||
successChan := s1.BroadcastEnvelope(Envelope{ChannelID: chID})
|
||||
for s := range successChan {
|
||||
if s {
|
||||
numSuccess++
|
||||
|
||||
@@ -249,7 +249,7 @@ type broadcastAPIClient struct {
|
||||
cc grpc1.ClientConn
|
||||
}
|
||||
|
||||
func NewBroadcastAPIClient(cc grpc1.ClientConn) BroadcastAPIClient {
|
||||
func BroadcastEnvelopeAPIClient(cc grpc1.ClientConn) BroadcastAPIClient {
|
||||
return &broadcastAPIClient{cc}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ func StartGRPCClient(protoAddr string) BroadcastAPIClient {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return NewBroadcastAPIClient(conn)
|
||||
return BroadcastEnvelopeAPIClient(conn)
|
||||
}
|
||||
|
||||
func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
|
||||
|
||||
@@ -249,7 +249,7 @@ type broadcastAPIClient struct {
|
||||
cc grpc1.ClientConn
|
||||
}
|
||||
|
||||
func NewBroadcastAPIClient(cc grpc1.ClientConn) BroadcastAPIClient {
|
||||
func BroadcastEnvelopeAPIClient(cc grpc1.ClientConn) BroadcastAPIClient {
|
||||
return &broadcastAPIClient{cc}
|
||||
}
|
||||
|
||||
|
||||
@@ -104,7 +104,7 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
}
|
||||
|
||||
// Receive implements p2p.Reactor.
|
||||
func (r *Reactor) NewReceive(e p2p.Envelope) {
|
||||
func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
if !r.IsRunning() {
|
||||
return
|
||||
}
|
||||
@@ -128,7 +128,7 @@ func (r *Reactor) NewReceive(e p2p.Envelope) {
|
||||
for _, snapshot := range snapshots {
|
||||
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
|
||||
"format", snapshot.Format, "peer", e.Src.ID())
|
||||
e.Src.NewSend(p2p.Envelope{
|
||||
e.Src.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: e.ChannelID,
|
||||
Message: &ssproto.SnapshotsResponse{
|
||||
Height: snapshot.Height,
|
||||
@@ -183,7 +183,7 @@ func (r *Reactor) NewReceive(e p2p.Envelope) {
|
||||
}
|
||||
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
|
||||
"chunk", msg.Index, "peer", e.Src.ID())
|
||||
e.Src.NewSend(p2p.Envelope{
|
||||
e.Src.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Message: &ssproto.ChunkResponse{
|
||||
Height: msg.Height,
|
||||
@@ -236,7 +236,7 @@ func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
r.NewReceive(p2p.Envelope{
|
||||
r.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: chID,
|
||||
Src: peer,
|
||||
Message: um,
|
||||
@@ -292,7 +292,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
|
||||
r.Logger.Debug("Requesting snapshots from known peers")
|
||||
// Request snapshots from all currently connected peers
|
||||
|
||||
r.Switch.NewBroadcast(p2p.Envelope{
|
||||
r.Switch.BroadcastEnvelope(p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
})
|
||||
|
||||
@@ -54,7 +54,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
var response *ssproto.ChunkResponse
|
||||
if tc.expectResponse != nil {
|
||||
peer.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
peer.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == ChunkChannel
|
||||
})).Run(func(args mock.Arguments) {
|
||||
@@ -80,7 +80,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
r.NewReceive(p2p.Envelope{
|
||||
r.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Src: peer,
|
||||
Message: tc.request,
|
||||
@@ -144,7 +144,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
peer := &p2pmocks.Peer{}
|
||||
if len(tc.expectResponses) > 0 {
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
peer.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
peer.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == SnapshotChannel
|
||||
})).Run(func(args mock.Arguments) {
|
||||
@@ -170,7 +170,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
r.NewReceive(p2p.Envelope{
|
||||
r.ReceiveEnvelope(p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Src: peer,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
|
||||
@@ -130,7 +130,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) {
|
||||
ChannelID: SnapshotChannel,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
}
|
||||
peer.NewSend(e)
|
||||
peer.SendEnvelope(e)
|
||||
}
|
||||
|
||||
// RemovePeer removes a peer from the pool.
|
||||
@@ -471,7 +471,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
|
||||
}
|
||||
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
|
||||
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
|
||||
peer.NewSend(p2p.Envelope{
|
||||
peer.SendEnvelope(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Message: &ssproto.ChunkRequest{
|
||||
Height: snapshot.Height,
|
||||
|
||||
@@ -98,7 +98,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
// Adding a couple of peers should trigger snapshot discovery messages
|
||||
peerA := &p2pmocks.Peer{}
|
||||
peerA.On("ID").Return(p2p.ID("a"))
|
||||
peerA.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
peerA.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
if !ok {
|
||||
return false
|
||||
@@ -111,7 +111,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
|
||||
peerB := &p2pmocks.Peer{}
|
||||
peerB.On("ID").Return(p2p.ID("b"))
|
||||
peerB.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
peerB.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
if !ok {
|
||||
return false
|
||||
@@ -176,11 +176,11 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
chunkRequests[msg.Index]++
|
||||
chunkRequestsMtx.Unlock()
|
||||
}
|
||||
peerA.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
peerA.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == ChunkChannel
|
||||
})).Maybe().Run(onChunkRequest).Return(true)
|
||||
peerB.On("NewSend", mock.MatchedBy(func(i interface{}) bool {
|
||||
peerB.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == ChunkChannel
|
||||
})).Maybe().Run(onChunkRequest).Return(true)
|
||||
|
||||
@@ -80,8 +80,8 @@ func (fp *fuzzPeer) CloseConn() error { return nil }
|
||||
func (fp *fuzzPeer) NodeInfo() p2p.NodeInfo { return defaultNodeInfo }
|
||||
func (fp *fuzzPeer) Status() p2p.ConnectionStatus { var cs p2p.ConnectionStatus; return cs }
|
||||
func (fp *fuzzPeer) SocketAddr() *p2p.NetAddress { return p2p.NewNetAddress(fp.ID(), fp.RemoteAddr()) }
|
||||
func (fp *fuzzPeer) NewSend(e p2p.Envelope) bool { return true }
|
||||
func (fp *fuzzPeer) NewTrySend(e p2p.Envelope) bool { return true }
|
||||
func (fp *fuzzPeer) SendEnvelope(e p2p.Envelope) bool { return true }
|
||||
func (fp *fuzzPeer) TrySendEnvelope(e p2p.Envelope) bool { return true }
|
||||
func (fp *fuzzPeer) Send(_ byte, _ []byte) bool { return true }
|
||||
func (fp *fuzzPeer) TrySend(_ byte, _ []byte) bool { return true }
|
||||
func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value }
|
||||
|
||||
Reference in New Issue
Block a user