remove old TrySends

This commit is contained in:
William Banfield
2022-10-19 13:10:49 -04:00
parent 64c18a4406
commit 7e107c45fb
11 changed files with 34 additions and 34 deletions

View File

@@ -156,7 +156,7 @@ func (bcR *Reactor) AddPeer(peer p2p.Peer) {
Height: bcR.store.Height(),
},
}
peer.NewSend(e)
peer.Send(e)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
@@ -191,7 +191,7 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
Message: wm,
}
return src.NewTrySend(e)
return src.TrySend(e)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
@@ -206,7 +206,7 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
Message: wm,
}
return src.NewTrySend(e)
return src.TrySend(e)
}
// Receive implements Reactor by handling 4 types of messages (look below).
@@ -250,7 +250,7 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
ChannelID: BlocksyncChannel,
Message: wm,
}
src.NewTrySend(e)
src.TrySend(e)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
@@ -305,7 +305,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
ChannelID: BlocksyncChannel,
Message: wm,
}
queued := peer.NewTrySend(e)
queued := peer.TrySend(e)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}

View File

@@ -171,7 +171,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
Message: p,
ChannelID: VoteChannel,
}
peer.NewSend(e)
peer.Send(e)
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
p, err := MsgToProto(&VoteMessage{prevote2})
@@ -180,7 +180,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
Message: p,
ChannelID: VoteChannel,
}
peer.NewSend(e)
peer.Send(e)
}
}
} else {
@@ -541,7 +541,7 @@ func sendProposalAndParts(
ChannelID: DataChannel,
Message: p,
}
peer.NewSend(e)
peer.Send(e)
// parts
for i := 0; i < int(parts.Total()); i++ {
@@ -559,7 +559,7 @@ func sendProposalAndParts(
ChannelID: DataChannel,
Message: p,
}
peer.NewSend(e)
peer.Send(e)
}
// votes
@@ -575,7 +575,7 @@ func sendProposalAndParts(
ChannelID: VoteChannel,
Message: p,
}
peer.NewSend(e)
peer.Send(e)
p, err = MsgToProto(&VoteMessage{precommit})
if err != nil {
panic(err)
@@ -584,7 +584,7 @@ func sendProposalAndParts(
ChannelID: VoteChannel,
Message: p,
}
peer.NewSend(e)
peer.Send(e)
}
//----------------------------------------

View File

@@ -101,7 +101,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
Message: p,
ChannelID: VoteChannel,
}
peer.NewSend(e)
peer.Send(e)
}
}()
}

View File

@@ -302,7 +302,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if err != nil {
panic(err)
}
src.NewTrySend(p2p.Envelope{
src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: p,
})
@@ -498,7 +498,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
Message: p,
}
peer.NewTrySend(e)
peer.TrySend(e)
} else {
// Height doesn't match
// TODO: check a field, maybe CatchupCommitRound?
@@ -530,7 +530,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
ChannelID: StateChannel,
Message: p,
}
peer.NewSend(e)
peer.Send(e)
}
func (conR *Reactor) updateRoundStateRoutine() {
@@ -584,7 +584,7 @@ OUTER_LOOP:
ChannelID: DataChannel,
Message: p,
}
if peer.NewSend(e) {
if peer.Send(e) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
@@ -641,7 +641,7 @@ OUTER_LOOP:
ChannelID: DataChannel,
Message: p,
}
if peer.NewSend(e) {
if peer.Send(e) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
}
@@ -666,7 +666,7 @@ OUTER_LOOP:
ChannelID: DataChannel,
Message: p,
}
peer.NewSend(e)
peer.Send(e)
}
continue OUTER_LOOP
}
@@ -718,7 +718,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
ChannelID: DataChannel,
Message: p,
}
if peer.NewSend(e) {
if peer.Send(e) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
logger.Debug("Sending block part for catchup failed")
@@ -890,7 +890,7 @@ OUTER_LOOP:
ChannelID: StateChannel,
Message: p,
}
peer.NewTrySend(e)
peer.TrySend(e)
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -915,7 +915,7 @@ OUTER_LOOP:
ChannelID: StateChannel,
Message: p,
}
peer.NewTrySend(e)
peer.TrySend(e)
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -941,7 +941,7 @@ OUTER_LOOP:
ChannelID: StateChannel,
Message: p,
}
peer.NewTrySend(e)
peer.TrySend(e)
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -969,7 +969,7 @@ OUTER_LOOP:
ChannelID: StateChannel,
Message: p,
}
peer.NewTrySend(e)
peer.TrySend(e)
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -1194,7 +1194,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
ChannelID: VoteChannel,
Message: p,
}
if ps.peer.NewSend(e) {
if ps.peer.Send(e) {
ps.SetHasVote(vote)
return true
}

View File

@@ -134,7 +134,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
ChannelID: EvidenceChannel,
Message: evp,
}
success := peer.NewSend(e)
success := peer.Send(e)
if !success {
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
continue

View File

@@ -242,7 +242,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
},
},
}
success := peer.NewSend(e)
success := peer.Send(e)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue

View File

@@ -243,7 +243,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
},
}
success := peer.NewSend(e)
success := peer.Send(e)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue

View File

@@ -352,7 +352,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
ChannelID: PexChannel,
Message: &tmp2p.PexRequest{},
}
p.NewSend(e)
p.Send(e)
}
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
@@ -414,7 +414,7 @@ func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
ChannelID: PexChannel,
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
}
p.NewSend(e)
p.Send(e)
}
// SetEnsurePeersPeriod sets period to ensure peers connected.

View File

@@ -470,7 +470,7 @@ func TestSwitchStopPeerForError(t *testing.T) {
e := Envelope{
ChannelID: 0x1,
}
p.NewSend(e)
p.Send(e)
// stop sw2. this should cause the p to fail,
// which results in calling StopPeerForError internally

View File

@@ -140,7 +140,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
Metadata: snapshot.Metadata,
},
}
src.NewSend(e)
src.Send(e)
}
case *ssproto.SnapshotsResponse:
@@ -196,7 +196,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
Missing: resp.Chunk == nil,
}),
}
src.NewSend(e)
src.Send(e)
case *ssproto.ChunkResponse:
r.mtx.RLock()

View File

@@ -130,7 +130,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) {
ChannelID: SnapshotChannel,
Message: toWrappedProto(&ssproto.SnapshotsRequest{}),
}
peer.NewSend(e)
peer.Send(e)
}
// RemovePeer removes a peer from the pool.
@@ -479,7 +479,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
Index: chunk,
},
}
peer.NewSend(e)
peer.Send(e)
}
// verifyApp verifies the sync, checking the app hash, last block height and app version