diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 003ea9bca..cb45e4b65 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -152,7 +152,7 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor by sending our state to peer. func (bcR *Reactor) AddPeer(peer p2p.Peer) { - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.StatusResponse{ Base: bcR.store.Base(), @@ -183,21 +183,21 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, return false } - return src.NewTrySend(p2p.Envelope{ + return src.TrySendEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.BlockResponse{Block: bl}, }) } bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) - return src.NewTrySend(p2p.Envelope{ + return src.TrySendEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.NoBlockResponse{Height: msg.Height}, }) } // Receive implements Reactor by handling 4 types of messages (look below). -func (bcR *Reactor) NewReceive(e p2p.Envelope) { +func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { if err := ValidateMsg(e.Message); err != nil { bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err) bcR.Switch.StopPeerForError(e.Src, err) @@ -218,7 +218,7 @@ func (bcR *Reactor) NewReceive(e p2p.Envelope) { bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size()) case *bcproto.StatusRequest: // Send peer our state. - e.Src.NewTrySend(p2p.Envelope{ + e.Src.TrySendEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.StatusResponse{ Height: bcR.store.Height(), @@ -245,7 +245,7 @@ func (bcR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - bcR.NewReceive(p2p.Envelope{ + bcR.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: uw, @@ -287,7 +287,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) { if peer == nil { continue } - queued := peer.NewTrySend(p2p.Envelope{ + queued := peer.TrySendEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.BlockRequest{Height: request.Height}, }) @@ -429,7 +429,7 @@ FOR_LOOP: // BroadcastStatusRequest broadcasts `BlockStore` base and height. func (bcR *Reactor) BroadcastStatusRequest() { - bcR.Switch.NewBroadcast(p2p.Envelope{ + bcR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.StatusRequest{}, }) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 9a4ff01d4..cd405914b 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -166,13 +166,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { for i, peer := range peerList { if i < len(peerList)/2 { bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ Message: &tmcons.Vote{Vote: prevote1.ToProto()}, ChannelID: VoteChannel, }) } else { bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ Message: &tmcons.Vote{Vote: prevote2.ToProto()}, ChannelID: VoteChannel, }) @@ -527,7 +527,7 @@ func sendProposalAndParts( parts *types.PartSet, ) { // proposal - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: DataChannel, Message: &tmcons.Proposal{Proposal: *proposal.ToProto()}, }) @@ -539,7 +539,7 @@ func sendProposalAndParts( if err != nil { panic(err) // TODO: wbanfield better error handling } - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: DataChannel, Message: &tmcons.BlockPart{ Height: height, // This tells peer that this part applies to us. @@ -554,11 +554,11 @@ func sendProposalAndParts( prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header()) precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header()) cs.mtx.Unlock() - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: VoteChannel, Message: &tmcons.Vote{Vote: prevote.ToProto()}, }) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: VoteChannel, Message: &tmcons.Vote{Vote: precommit.ToProto()}, }) @@ -599,8 +599,8 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) { func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { br.reactor.RemovePeer(peer, reason) } -func (br *ByzantineReactor) NewReceive(e p2p.Envelope) { - br.reactor.NewReceive(e) +func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) { + br.reactor.ReceiveEnvelope(e) } func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) { br.reactor.Receive(chID, p, m) diff --git a/consensus/invalid_test.go b/consensus/invalid_test.go index 873d88e60..0059de435 100644 --- a/consensus/invalid_test.go +++ b/consensus/invalid_test.go @@ -95,7 +95,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw peers := sw.Peers().List() for _, peer := range peers { cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ Message: &tmcons.Vote{Vote: precommit.ToProto()}, ChannelID: VoteChannel, }) diff --git a/consensus/reactor.go b/consensus/reactor.go index dd0e54324..90344f121 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -226,7 +226,7 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine // NOTE: blocks on consensus state for proposals, block parts, and votes -func (conR *Reactor) NewReceive(e p2p.Envelope) { +func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { if !conR.IsRunning() { conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID) return @@ -303,7 +303,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { if votes := ourVotes.ToProto(); votes != nil { eMsg.Votes = *votes } - e.Src.NewTrySend(p2p.Envelope{ + e.Src.TrySendEnvelope(p2p.Envelope{ ChannelID: VoteSetBitsChannel, Message: eMsg, }) @@ -398,7 +398,7 @@ func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - conR.NewReceive(p2p.Envelope{ + conR.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: uw, @@ -455,7 +455,7 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() { func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) - conR.Switch.NewBroadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: nrsMsg, }) @@ -470,7 +470,7 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { BlockParts: rs.ProposalBlockParts.BitArray().ToProto(), IsCommit: rs.Step == cstypes.RoundStepCommit, } - conR.Switch.NewBroadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: csMsg, }) @@ -484,7 +484,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { Type: vote.Type, Index: vote.ValidatorIndex, } - conR.Switch.NewBroadcast(p2p.Envelope{ + conR.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: msg, }) @@ -502,7 +502,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { ChannelID: StateChannel, struct{ ConsensusMessage }{msg}, Message: p, } - peer.NewTrySend(e) + peer.TrySendEnvelope(e) } else { // Height doesn't match // TODO: check a field, maybe CatchupCommitRound? @@ -526,7 +526,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { rs := conR.getRoundState() nrsMsg := makeRoundStepMessage(rs) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: nrsMsg, }) @@ -573,7 +573,7 @@ OUTER_LOOP: panic(err) } logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) - if peer.NewSend(p2p.Envelope{ + if peer.SendEnvelope(p2p.Envelope{ ChannelID: DataChannel, Message: &tmcons.BlockPart{ Height: rs.Height, // This tells peer that this part applies to us. @@ -627,7 +627,7 @@ OUTER_LOOP: // Proposal: share the proposal metadata with peer. { logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) - if peer.NewSend(p2p.Envelope{ + if peer.SendEnvelope(p2p.Envelope{ ChannelID: DataChannel, Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()}, }) { @@ -641,7 +641,7 @@ OUTER_LOOP: // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound). if 0 <= rs.Proposal.POLRound { logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: DataChannel, Message: &tmcons.ProposalPOL{ Height: rs.Height, @@ -691,7 +691,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt logger.Error("Could not convert part to proto", "index", index, "error", err) return } - if peer.NewSend(p2p.Envelope{ + if peer.SendEnvelope(p2p.Envelope{ ChannelID: DataChannel, Message: &tmcons.BlockPart{ Height: prs.Height, // Not our height, so it doesn't matter. @@ -858,7 +858,7 @@ OUTER_LOOP: if rs.Height == prs.Height { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - peer.NewTrySend(p2p.Envelope{ + peer.TrySendEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -878,7 +878,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - peer.NewTrySend(p2p.Envelope{ + peer.TrySendEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -899,7 +899,7 @@ OUTER_LOOP: if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - peer.NewTrySend(p2p.Envelope{ + peer.TrySendEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -922,7 +922,7 @@ OUTER_LOOP: if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && prs.Height >= conR.conS.blockStore.Base() { if commit := conR.conS.LoadCommit(prs.Height); commit != nil { - peer.NewTrySend(p2p.Envelope{ + peer.TrySendEnvelope(p2p.Envelope{ ChannelID: StateChannel, Message: &tmcons.VoteSetMaj23{ Height: prs.Height, @@ -1145,7 +1145,7 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) - if ps.peer.NewSend(p2p.Envelope{ + if ps.peer.SendEnvelope(p2p.Envelope{ ChannelID: VoteChannel, Message: &tmcons.Vote{ Vote: vote.ToProto(), diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index e983b7862..a5f84c2d1 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -272,7 +272,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) { // simulate switch calling Receive before AddPeer assert.NotPanics(t, func() { - reactor.NewReceive(p2p.Envelope{ + reactor.ReceiveEnvelope(p2p.Envelope{ ChannelID: StateChannel, Src: peer, Message: &tmcons.HasVote{Height: 1, @@ -298,7 +298,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) { // simulate switch calling Receive before AddPeer assert.Panics(t, func() { - reactor.NewReceive(p2p.Envelope{ + reactor.ReceiveEnvelope(p2p.Envelope{ ChannelID: StateChannel, Src: peer, Message: &tmcons.HasVote{Height: 1, diff --git a/evidence/reactor.go b/evidence/reactor.go index 45a8d4f27..9ff6e785a 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -68,7 +68,7 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) { // Receive implements Reactor. // It adds any received evidence to the evpool. -func (evR *Reactor) NewReceive(e p2p.Envelope) { +func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) { evis, err := evidenceListFromProto(e.Message) if err != nil { evR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) @@ -98,7 +98,7 @@ func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - evR.NewReceive(p2p.Envelope{ + evR.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: msg, @@ -146,7 +146,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) { panic(err) } - success := peer.NewSend(p2p.Envelope{ + success := peer.SendEnvelope(p2p.Envelope{ ChannelID: EvidenceChannel, Message: evp, }) diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 67e026a55..652cb0fd0 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -208,7 +208,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) { // i.e. broadcastEvidenceRoutine finishes when peer is stopped defer leaktest.CheckTimeout(t, 10*time.Second)() - p.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + p.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == evidence.EvidenceChannel })).Return(false) diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 208fd1709..21a25fd81 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -157,7 +157,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. -func (memR *Reactor) NewReceive(e p2p.Envelope) { +func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: @@ -200,7 +200,7 @@ func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - memR.NewReceive(p2p.Envelope{ + memR.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: uw, @@ -261,7 +261,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // https://github.com/tendermint/tendermint/issues/5796 if _, ok := memTx.senders.Load(peerID); !ok { - success := peer.NewSend(p2p.Envelope{ + success := peer.SendEnvelope(p2p.Envelope{ ChannelID: mempool.MempoolChannel, Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }) diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index f188a1152..bd4ee3f65 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -283,7 +283,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { for i := 0; i < mempool.MaxActiveIDs+1; i++ { peer := mock.NewPeer(nil) - reactor.NewReceive(p2p.Envelope{ + reactor.ReceiveEnvelope(p2p.Envelope{ ChannelID: mempool.MempoolChannel, Src: peer, Message: &memproto.Message{}, // This uses the wrong message type on purpose to stop the peer as in an error state in the reactor. diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 203b76495..533584fa3 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -156,7 +156,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. -func (memR *Reactor) NewReceive(e p2p.Envelope) { +func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: @@ -199,7 +199,7 @@ func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - memR.NewReceive(p2p.Envelope{ + memR.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: uw, @@ -262,7 +262,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 if !memTx.HasPeer(peerID) { - success := peer.NewSend(p2p.Envelope{ + success := peer.SendEnvelope(p2p.Envelope{ ChannelID: mempool.MempoolChannel, Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }) diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index cbcf57404..266880536 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -45,20 +45,20 @@ type Reactor interface { // // CONTRACT: msgBytes are not nil. // - // Only one of Receive or NewReceive are called per message. If NewReceive + // Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope // is implemented, it will be used, otherwise the switch will fallback to - // using Receive. Receive will be replaced by NewReceive in a future version + // using Receive. Receive will be replaced by ReceiveEnvelope in a future version Receive(chID byte, peer Peer, msgBytes []byte) } -type NewReceiver interface { - // NewReceive is called by the switch when an envelope is received from any connected +type ReceiveEnveloper interface { + // ReceiveEnvelope is called by the switch when an envelope is received from any connected // peer on any of the channels registered by the reactor. // - // Only one of Receive or NewReceive are called per message. If NewReceive + // Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope // is implemented, it will be used, otherwise the switch will fallback to - // using Receive. Receive will be replaced by NewReceive in a future version - NewReceive(Envelope) + // using Receive. Receive will be replaced by ReceiveEnvelope in a future version + ReceiveEnvelope(Envelope) } //-------------------------------------- @@ -81,6 +81,6 @@ func (br *BaseReactor) SetSwitch(sw *Switch) { func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} -func (*BaseReactor) NewReceive(e Envelope) {} +func (*BaseReactor) ReceiveEnvelope(e Envelope) {} func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index 9b15c95e4..b4834469f 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -43,8 +43,8 @@ func NewPeer(ip net.IP) *Peer { } func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error -func (mp *Peer) NewTrySend(e p2p.Envelope) bool { return true } -func (mp *Peer) NewSend(e p2p.Envelope) bool { return true } +func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true } +func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true } func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true } func (mp *Peer) Send(_ byte, _ []byte) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 893ed2032..414247db8 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -22,5 +22,5 @@ func NewReactor() *Reactor { func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } func (r *Reactor) AddPeer(peer p2p.Peer) {} func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} -func (r *Reactor) NewReceive(e p2p.Envelope) {} +func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {} func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index 0d5693dbd..9f3aa098b 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -109,8 +109,8 @@ func (_m *Peer) IsRunning() bool { return r0 } -// NewSend provides a mock function with given fields: _a0 -func (_m *Peer) NewSend(_a0 p2p.Envelope) bool { +// SendEnvelope provides a mock function with given fields: _a0 +func (_m *Peer) SendEnvelope(_a0 p2p.Envelope) bool { ret := _m.Called(_a0) var r0 bool @@ -123,8 +123,8 @@ func (_m *Peer) NewSend(_a0 p2p.Envelope) bool { return r0 } -// NewTrySend provides a mock function with given fields: _a0 -func (_m *Peer) NewTrySend(_a0 p2p.Envelope) bool { +// TrySendEnvelope provides a mock function with given fields: _a0 +func (_m *Peer) TrySendEnvelope(_a0 p2p.Envelope) bool { ret := _m.Called(_a0) var r0 bool diff --git a/p2p/peer.go b/p2p/peer.go index 3aa680d80..55a6edf64 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -37,8 +37,8 @@ type Peer interface { Status() tmconn.ConnectionStatus SocketAddr() *NetAddress // actual address of the socket - NewSend(Envelope) bool - NewTrySend(Envelope) bool + SendEnvelope(Envelope) bool + TrySendEnvelope(Envelope) bool Send(byte, []byte) bool TrySend(byte, []byte) bool @@ -199,7 +199,7 @@ func (p *peer) OnStart() error { } // FlushStop mimics OnStop but additionally ensures that all successful -// NewSend() calls will get flushed before closing the connection. +// SendEnvelope() calls will get flushed before closing the connection. // NOTE: it is not safe to call this method more than once. func (p *peer) FlushStop() { p.metricsTicker.Stop() @@ -252,10 +252,10 @@ func (p *peer) Status() tmconn.ConnectionStatus { return p.mconn.Status() } -// NewSend sends the message in the envelope on the channel specified by the +// SendEnvelope sends the message in the envelope on the channel specified by the // envelope. Returns false if the connection times out trying to place the message // onto its internal queue. -func (p *peer) NewSend(e Envelope) bool { +func (p *peer) SendEnvelope(e Envelope) bool { if !p.IsRunning() { return false } else if !p.hasChannel(e.ChannelID) { @@ -280,7 +280,7 @@ func (p *peer) NewSend(e Envelope) bool { // Send msg bytes to the channel identified by chID byte. Returns false if the // send queue is full after timeout, specified by MConnection. -// NewSend replaces TrySend which will be deprecated in a future release. +// SendEnvelope replaces TrySend which will be deprecated in a future release. func (p *peer) Send(chID byte, msgBytes []byte) bool { if !p.IsRunning() { return false @@ -298,10 +298,10 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool { return res } -// NewTrySend attempts to sends the message in the envelope on the channel specified by the +// TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the // envelope. Returns false immediately if the connection's internal queue is full -// NewTrySend replaces TrySend which will be deprecated in a future release. -func (p *peer) NewTrySend(e Envelope) bool { +// TrySendEnvelope replaces TrySend which will be deprecated in a future release. +func (p *peer) TrySendEnvelope(e Envelope) bool { if !p.IsRunning() { // see Switch#Broadcast, where we fetch the list of peers and loop over // them - while we're looping, one peer may be removed and stopped. @@ -465,8 +465,8 @@ func createMConnection( } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) - if nr, ok := reactor.(NewReceiver); ok { - nr.NewReceive(Envelope{ + if nr, ok := reactor.(ReceiveEnveloper); ok { + nr.ReceiveEnvelope(Envelope{ ChannelID: chID, Src: p, Message: msg, diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 482becd3d..0908d91c3 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -19,8 +19,8 @@ type mockPeer struct { } func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error -func (mp *mockPeer) NewTrySend(e Envelope) bool { return true } -func (mp *mockPeer) NewSend(e Envelope) bool { return true } +func (mp *mockPeer) TrySendEnvelope(e Envelope) bool { return true } +func (mp *mockPeer) SendEnvelope(e Envelope) bool { return true } func (mp *mockPeer) TrySend(_ byte, _ []byte) bool { return true } func (mp *mockPeer) Send(_ byte, _ []byte) bool { return true } func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 0636258e0..d6428252f 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -72,7 +72,7 @@ func TestPeerSend(t *testing.T) { }) assert.True(p.CanSend(testCh)) - assert.True(p.NewSend(Envelope{ChannelID: testCh, Message: &p2p.Message{}})) + assert.True(p.SendEnvelope(Envelope{ChannelID: testCh, Message: &p2p.Message{}})) } func createOutboundPeerAndPerformHandshake( diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index f7ffad721..e0763e29c 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -237,7 +237,7 @@ func (r *Reactor) logErrAddrBook(err error) { } // Receive implements Reactor by handling incoming PEX messages. -func (r *Reactor) NewReceive(e p2p.Envelope) { +func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) { r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { @@ -310,7 +310,7 @@ func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - r.NewReceive(p2p.Envelope{ + r.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: um, @@ -360,7 +360,7 @@ func (r *Reactor) RequestAddrs(p Peer) { } r.Logger.Debug("Request addrs", "from", p) r.requestsSent.Set(id, struct{}{}) - p.NewSend(p2p.Envelope{ + p.SendEnvelope(p2p.Envelope{ ChannelID: PexChannel, Message: &tmp2p.PexRequest{}, }) @@ -425,7 +425,7 @@ func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { ChannelID: PexChannel, Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}, } - p.NewSend(e) + p.SendEnvelope(e) } // SetEnsurePeersPeriod sets period to ensure peers connected. diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index e2d0170c4..75cbb51ca 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -132,10 +132,10 @@ func TestPEXReactorReceive(t *testing.T) { size := book.Size() msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}} - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg}) assert.Equal(t, size+1, book.Size()) - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) } func TestPEXReactorRequestMessageAbuse(t *testing.T) { @@ -156,17 +156,17 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { id := string(peer.ID()) // first time creates the entry - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) assert.True(t, r.lastReceivedRequests.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) // next time sets the last time value - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) assert.True(t, r.lastReceivedRequests.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) // third time is too many too soon - peer is removed - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}}) assert.False(t, r.lastReceivedRequests.Has(id)) assert.False(t, sw.Peers().Has(peer.ID())) assert.True(t, book.IsBanned(peerAddr)) @@ -193,12 +193,12 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}} // receive some addrs. should clear the request - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg}) assert.False(t, r.requestsSent.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) // receiving more unsolicited addrs causes a disconnect and ban - r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg}) + r.ReceiveEnvelope(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg}) assert.False(t, sw.Peers().Has(peer.ID())) assert.True(t, book.IsBanned(peer.SocketAddr())) } @@ -485,7 +485,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) { size := book.Size() msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}} - pexR.NewReceive(p2p.Envelope{ + pexR.ReceiveEnvelope(p2p.Envelope{ ChannelID: PexChannel, Src: peer, Message: msg, diff --git a/p2p/switch.go b/p2p/switch.go index 6f6af8cc2..27b235868 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -262,14 +262,14 @@ func (sw *Switch) OnStop() { //--------------------------------------------------------------------- // Peers -// NewBroadcast runs a go routine for each attempted send, which will block trying +// BroadcastEnvelope runs a go routine for each attempted send, which will block trying // to send for defaultSendTimeoutSeconds. Returns a channel which receives // success values for each attempted send (false if times out). Channel will be // closed once msg bytes are sent to all peers (or time out). -// NewBroadcasts sends to the peers using the NewSend method. +// BroadcastEnvelopes sends to the peers using the SendEnvelope method. // -// NOTE: NewBroadcast uses goroutines, so order of broadcast may not be preserved. -func (sw *Switch) NewBroadcast(e Envelope) chan bool { +// NOTE: BroadcastEnvelope uses goroutines, so order of broadcast may not be preserved. +func (sw *Switch) BroadcastEnvelope(e Envelope) chan bool { sw.Logger.Debug("Broadcast", "channel", e.ChannelID) peers := sw.peers.List() @@ -280,7 +280,7 @@ func (sw *Switch) NewBroadcast(e Envelope) chan bool { for _, peer := range peers { go func(p Peer) { defer wg.Done() - success := p.NewSend(e) + success := p.SendEnvelope(e) successChan <- success }(peer) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 24c2317cc..9943437da 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -71,7 +71,7 @@ func (tr *TestReactor) AddPeer(peer Peer) {} func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {} -func (tr *TestReactor) NewReceive(e Envelope) { +func (tr *TestReactor) ReceiveEnvelope(e Envelope) { if tr.logMessages { tr.mtx.Lock() defer tr.mtx.Unlock() @@ -92,7 +92,7 @@ func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) { panic(err) } - tr.NewReceive(Envelope{ + tr.ReceiveEnvelope(Envelope{ ChannelID: chID, Src: peer, Message: um, @@ -175,9 +175,9 @@ func TestSwitches(t *testing.T) { }, }, } - s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) - s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) - s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) + s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) + s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) + s1.BroadcastEnvelope(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), @@ -468,7 +468,7 @@ func TestSwitchStopPeerForError(t *testing.T) { // send messages to the peer from sw1 p := sw1.Peers().List()[0] - p.NewSend(Envelope{ + p.SendEnvelope(Envelope{ ChannelID: 0x1, Message: &p2pproto.Message{}, }) @@ -866,7 +866,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) { // Send random message from foo channel to another for i := 0; i < b.N; i++ { chID := byte(i % 4) - successChan := s1.NewBroadcast(Envelope{ChannelID: chID}) + successChan := s1.BroadcastEnvelope(Envelope{ChannelID: chID}) for s := range successChan { if s { numSuccess++ diff --git a/proto/tendermint/rpc/grpc/types.pb.go b/proto/tendermint/rpc/grpc/types.pb.go index 6d0fecf0b..c8d17b539 100644 --- a/proto/tendermint/rpc/grpc/types.pb.go +++ b/proto/tendermint/rpc/grpc/types.pb.go @@ -249,7 +249,7 @@ type broadcastAPIClient struct { cc grpc1.ClientConn } -func NewBroadcastAPIClient(cc grpc1.ClientConn) BroadcastAPIClient { +func BroadcastEnvelopeAPIClient(cc grpc1.ClientConn) BroadcastAPIClient { return &broadcastAPIClient{cc} } diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go index 387a66213..dcd60279e 100644 --- a/rpc/grpc/client_server.go +++ b/rpc/grpc/client_server.go @@ -31,7 +31,7 @@ func StartGRPCClient(protoAddr string) BroadcastAPIClient { if err != nil { panic(err) } - return NewBroadcastAPIClient(conn) + return BroadcastEnvelopeAPIClient(conn) } func dialerFunc(ctx context.Context, addr string) (net.Conn, error) { diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go index 6d0fecf0b..c8d17b539 100644 --- a/rpc/grpc/types.pb.go +++ b/rpc/grpc/types.pb.go @@ -249,7 +249,7 @@ type broadcastAPIClient struct { cc grpc1.ClientConn } -func NewBroadcastAPIClient(cc grpc1.ClientConn) BroadcastAPIClient { +func BroadcastEnvelopeAPIClient(cc grpc1.ClientConn) BroadcastAPIClient { return &broadcastAPIClient{cc} } diff --git a/statesync/reactor.go b/statesync/reactor.go index f1c195531..467ec795d 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -104,7 +104,7 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { } // Receive implements p2p.Reactor. -func (r *Reactor) NewReceive(e p2p.Envelope) { +func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) { if !r.IsRunning() { return } @@ -128,7 +128,7 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { for _, snapshot := range snapshots { r.Logger.Debug("Advertising snapshot", "height", snapshot.Height, "format", snapshot.Format, "peer", e.Src.ID()) - e.Src.NewSend(p2p.Envelope{ + e.Src.SendEnvelope(p2p.Envelope{ ChannelID: e.ChannelID, Message: &ssproto.SnapshotsResponse{ Height: snapshot.Height, @@ -183,7 +183,7 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format, "chunk", msg.Index, "peer", e.Src.ID()) - e.Src.NewSend(p2p.Envelope{ + e.Src.SendEnvelope(p2p.Envelope{ ChannelID: ChunkChannel, Message: &ssproto.ChunkResponse{ Height: msg.Height, @@ -236,7 +236,7 @@ func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { panic(err) } - r.NewReceive(p2p.Envelope{ + r.ReceiveEnvelope(p2p.Envelope{ ChannelID: chID, Src: peer, Message: um, @@ -292,7 +292,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.Logger.Debug("Requesting snapshots from known peers") // Request snapshots from all currently connected peers - r.Switch.NewBroadcast(p2p.Envelope{ + r.Switch.BroadcastEnvelope(p2p.Envelope{ ChannelID: SnapshotChannel, Message: &ssproto.SnapshotsRequest{}, }) diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 50fe2760b..2fe2d71f3 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -54,7 +54,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) { peer.On("ID").Return(p2p.ID("id")) var response *ssproto.ChunkResponse if tc.expectResponse != nil { - peer.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + peer.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == ChunkChannel })).Run(func(args mock.Arguments) { @@ -80,7 +80,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) { } }) - r.NewReceive(p2p.Envelope{ + r.ReceiveEnvelope(p2p.Envelope{ ChannelID: ChunkChannel, Src: peer, Message: tc.request, @@ -144,7 +144,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) { peer := &p2pmocks.Peer{} if len(tc.expectResponses) > 0 { peer.On("ID").Return(p2p.ID("id")) - peer.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + peer.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == SnapshotChannel })).Run(func(args mock.Arguments) { @@ -170,7 +170,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) { } }) - r.NewReceive(p2p.Envelope{ + r.ReceiveEnvelope(p2p.Envelope{ ChannelID: SnapshotChannel, Src: peer, Message: &ssproto.SnapshotsRequest{}, diff --git a/statesync/syncer.go b/statesync/syncer.go index 09e2b4f2d..f96d55012 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -130,7 +130,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) { ChannelID: SnapshotChannel, Message: &ssproto.SnapshotsRequest{}, } - peer.NewSend(e) + peer.SendEnvelope(e) } // RemovePeer removes a peer from the pool. @@ -471,7 +471,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { } s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", chunk, "peer", peer.ID()) - peer.NewSend(p2p.Envelope{ + peer.SendEnvelope(p2p.Envelope{ ChannelID: ChunkChannel, Message: &ssproto.ChunkRequest{ Height: snapshot.Height, diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index a5b216162..838508120 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -98,7 +98,7 @@ func TestSyncer_SyncAny(t *testing.T) { // Adding a couple of peers should trigger snapshot discovery messages peerA := &p2pmocks.Peer{} peerA.On("ID").Return(p2p.ID("a")) - peerA.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + peerA.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) if !ok { return false @@ -111,7 +111,7 @@ func TestSyncer_SyncAny(t *testing.T) { peerB := &p2pmocks.Peer{} peerB.On("ID").Return(p2p.ID("b")) - peerB.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + peerB.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) if !ok { return false @@ -176,11 +176,11 @@ func TestSyncer_SyncAny(t *testing.T) { chunkRequests[msg.Index]++ chunkRequestsMtx.Unlock() } - peerA.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + peerA.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == ChunkChannel })).Maybe().Run(onChunkRequest).Return(true) - peerB.On("NewSend", mock.MatchedBy(func(i interface{}) bool { + peerB.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == ChunkChannel })).Maybe().Run(onChunkRequest).Return(true) diff --git a/test/fuzz/p2p/pex/reactor_receive.go b/test/fuzz/p2p/pex/reactor_receive.go index 82ac1d5b4..efa13ea0d 100644 --- a/test/fuzz/p2p/pex/reactor_receive.go +++ b/test/fuzz/p2p/pex/reactor_receive.go @@ -80,8 +80,8 @@ func (fp *fuzzPeer) CloseConn() error { return nil } func (fp *fuzzPeer) NodeInfo() p2p.NodeInfo { return defaultNodeInfo } func (fp *fuzzPeer) Status() p2p.ConnectionStatus { var cs p2p.ConnectionStatus; return cs } func (fp *fuzzPeer) SocketAddr() *p2p.NetAddress { return p2p.NewNetAddress(fp.ID(), fp.RemoteAddr()) } -func (fp *fuzzPeer) NewSend(e p2p.Envelope) bool { return true } -func (fp *fuzzPeer) NewTrySend(e p2p.Envelope) bool { return true } +func (fp *fuzzPeer) SendEnvelope(e p2p.Envelope) bool { return true } +func (fp *fuzzPeer) TrySendEnvelope(e p2p.Envelope) bool { return true } func (fp *fuzzPeer) Send(_ byte, _ []byte) bool { return true } func (fp *fuzzPeer) TrySend(_ byte, _ []byte) bool { return true } func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value }