From 7e107c45fbe8d0c5b5b54d744dc74034bfe886f0 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 19 Oct 2022 13:10:49 -0400 Subject: [PATCH] remove old TrySends --- blocksync/reactor.go | 10 +++++----- consensus/byzantine_test.go | 12 ++++++------ consensus/invalid_test.go | 2 +- consensus/reactor.go | 24 ++++++++++++------------ evidence/reactor.go | 2 +- mempool/v0/reactor.go | 2 +- mempool/v1/reactor.go | 2 +- p2p/pex/pex_reactor.go | 4 ++-- p2p/switch_test.go | 2 +- statesync/reactor.go | 4 ++-- statesync/syncer.go | 4 ++-- 11 files changed, 34 insertions(+), 34 deletions(-) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index bad048663..f3ca9a140 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -156,7 +156,7 @@ func (bcR *Reactor) AddPeer(peer p2p.Peer) { Height: bcR.store.Height(), }, } - peer.NewSend(e) + peer.Send(e) // it's OK if send fails. will try later in poolRoutine // peer is added to the pool once we receive the first @@ -191,7 +191,7 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, Message: wm, } - return src.NewTrySend(e) + return src.TrySend(e) } bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) @@ -206,7 +206,7 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, Message: wm, } - return src.NewTrySend(e) + return src.TrySend(e) } // Receive implements Reactor by handling 4 types of messages (look below). @@ -250,7 +250,7 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { ChannelID: BlocksyncChannel, Message: wm, } - src.NewTrySend(e) + src.TrySend(e) case *bcproto.StatusResponse: // Got a peer status. Unverified. bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height) @@ -305,7 +305,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) { ChannelID: BlocksyncChannel, Message: wm, } - queued := peer.NewTrySend(e) + queued := peer.TrySend(e) if !queued { bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) } diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index d3085ce17..9508b02eb 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -171,7 +171,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { Message: p, ChannelID: VoteChannel, } - peer.NewSend(e) + peer.Send(e) } else { bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer) p, err := MsgToProto(&VoteMessage{prevote2}) @@ -180,7 +180,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { Message: p, ChannelID: VoteChannel, } - peer.NewSend(e) + peer.Send(e) } } } else { @@ -541,7 +541,7 @@ func sendProposalAndParts( ChannelID: DataChannel, Message: p, } - peer.NewSend(e) + peer.Send(e) // parts for i := 0; i < int(parts.Total()); i++ { @@ -559,7 +559,7 @@ func sendProposalAndParts( ChannelID: DataChannel, Message: p, } - peer.NewSend(e) + peer.Send(e) } // votes @@ -575,7 +575,7 @@ func sendProposalAndParts( ChannelID: VoteChannel, Message: p, } - peer.NewSend(e) + peer.Send(e) p, err = MsgToProto(&VoteMessage{precommit}) if err != nil { panic(err) @@ -584,7 +584,7 @@ func sendProposalAndParts( ChannelID: VoteChannel, Message: p, } - peer.NewSend(e) + peer.Send(e) } //---------------------------------------- diff --git a/consensus/invalid_test.go b/consensus/invalid_test.go index 2f0209b2a..01a507ac1 100644 --- a/consensus/invalid_test.go +++ b/consensus/invalid_test.go @@ -101,7 +101,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw Message: p, ChannelID: VoteChannel, } - peer.NewSend(e) + peer.Send(e) } }() } diff --git a/consensus/reactor.go b/consensus/reactor.go index 8b6171d39..b91915206 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -302,7 +302,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if err != nil { panic(err) } - src.NewTrySend(p2p.Envelope{ + src.TrySend(p2p.Envelope{ ChannelID: VoteSetBitsChannel, Message: p, }) @@ -498,7 +498,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { ChannelID: StateChannel, struct{ ConsensusMessage }{msg}, Message: p, } - peer.NewTrySend(e) + peer.TrySend(e) } else { // Height doesn't match // TODO: check a field, maybe CatchupCommitRound? @@ -530,7 +530,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { ChannelID: StateChannel, Message: p, } - peer.NewSend(e) + peer.Send(e) } func (conR *Reactor) updateRoundStateRoutine() { @@ -584,7 +584,7 @@ OUTER_LOOP: ChannelID: DataChannel, Message: p, } - if peer.NewSend(e) { + if peer.Send(e) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } continue OUTER_LOOP @@ -641,7 +641,7 @@ OUTER_LOOP: ChannelID: DataChannel, Message: p, } - if peer.NewSend(e) { + if peer.Send(e) { // NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected! ps.SetHasProposal(rs.Proposal) } @@ -666,7 +666,7 @@ OUTER_LOOP: ChannelID: DataChannel, Message: p, } - peer.NewSend(e) + peer.Send(e) } continue OUTER_LOOP } @@ -718,7 +718,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt ChannelID: DataChannel, Message: p, } - if peer.NewSend(e) { + if peer.Send(e) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } else { logger.Debug("Sending block part for catchup failed") @@ -890,7 +890,7 @@ OUTER_LOOP: ChannelID: StateChannel, Message: p, } - peer.NewTrySend(e) + peer.TrySend(e) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -915,7 +915,7 @@ OUTER_LOOP: ChannelID: StateChannel, Message: p, } - peer.NewTrySend(e) + peer.TrySend(e) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -941,7 +941,7 @@ OUTER_LOOP: ChannelID: StateChannel, Message: p, } - peer.NewTrySend(e) + peer.TrySend(e) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -969,7 +969,7 @@ OUTER_LOOP: ChannelID: StateChannel, Message: p, } - peer.NewTrySend(e) + peer.TrySend(e) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -1194,7 +1194,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { ChannelID: VoteChannel, Message: p, } - if ps.peer.NewSend(e) { + if ps.peer.Send(e) { ps.SetHasVote(vote) return true } diff --git a/evidence/reactor.go b/evidence/reactor.go index 10d3d9817..c4621c4ce 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -134,7 +134,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) { ChannelID: EvidenceChannel, Message: evp, } - success := peer.NewSend(e) + success := peer.Send(e) if !success { time.Sleep(peerRetryMessageIntervalMS * time.Millisecond) continue diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 58bd5ffb5..8e08535cc 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -242,7 +242,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { }, }, } - success := peer.NewSend(e) + success := peer.Send(e) if !success { time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) continue diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 0480f71b3..f45ea0de8 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -243,7 +243,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { }, } - success := peer.NewSend(e) + success := peer.Send(e) if !success { time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) continue diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 034a005b3..96c248a68 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -352,7 +352,7 @@ func (r *Reactor) RequestAddrs(p Peer) { ChannelID: PexChannel, Message: &tmp2p.PexRequest{}, } - p.NewSend(e) + p.Send(e) } // ReceiveAddrs adds the given addrs to the addrbook if theres an open @@ -414,7 +414,7 @@ func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { ChannelID: PexChannel, Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}, } - p.NewSend(e) + p.Send(e) } // SetEnsurePeersPeriod sets period to ensure peers connected. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 436b801e9..83b983f5a 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -470,7 +470,7 @@ func TestSwitchStopPeerForError(t *testing.T) { e := Envelope{ ChannelID: 0x1, } - p.NewSend(e) + p.Send(e) // stop sw2. this should cause the p to fail, // which results in calling StopPeerForError internally diff --git a/statesync/reactor.go b/statesync/reactor.go index 7223d4063..d6398cafd 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -140,7 +140,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { Metadata: snapshot.Metadata, }, } - src.NewSend(e) + src.Send(e) } case *ssproto.SnapshotsResponse: @@ -196,7 +196,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { Missing: resp.Chunk == nil, }), } - src.NewSend(e) + src.Send(e) case *ssproto.ChunkResponse: r.mtx.RLock() diff --git a/statesync/syncer.go b/statesync/syncer.go index 9fbda7ad5..dd12da248 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -130,7 +130,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) { ChannelID: SnapshotChannel, Message: toWrappedProto(&ssproto.SnapshotsRequest{}), } - peer.NewSend(e) + peer.Send(e) } // RemovePeer removes a peer from the pool. @@ -479,7 +479,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { Index: chunk, }, } - peer.NewSend(e) + peer.Send(e) } // verifyApp verifies the sync, checking the app hash, last block height and app version