mempool v1: tweak broadcastTxRoutine (#6771)

This commit is contained in:
Aleksandr Bezobchuk
2021-07-27 15:34:06 -04:00
committed by GitHub
parent 9a3861fb82
commit 4f73748bc8
2 changed files with 10 additions and 15 deletions

View File

@@ -188,8 +188,8 @@ func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
// NextGossipTx returns the next valid transaction to gossip. A caller must wait
// for WaitForNextTx to signal a transaction is available to gossip first. It is
// thread-safe.
func (txmp *TxMempool) NextGossipTx() *WrappedTx {
return txmp.gossipIndex.Front().Value.(*WrappedTx)
func (txmp *TxMempool) NextGossipTx() *clist.CElement {
return txmp.gossipIndex.Front()
}
// EnableTxsAvailable enables the mempool to trigger events when transactions

View File

@@ -9,6 +9,7 @@ import (
"time"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
@@ -306,7 +307,7 @@ func (r *Reactor) processPeerUpdates() {
func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) {
peerMempoolID := r.ids.GetForPeer(peerID)
var memTx *WrappedTx
var nextGossipTx *clist.CElement
// remove the peer ID from the map of routines and mark the waitgroup as done
defer func() {
@@ -333,10 +334,10 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if memTx == nil {
if nextGossipTx == nil {
select {
case <-r.mempool.WaitForNextTx(): // wait until a tx is available
if memTx = r.mempool.NextGossipTx(); memTx == nil {
if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil {
continue
}
@@ -352,6 +353,8 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
}
}
memTx := nextGossipTx.Value.(*WrappedTx)
if r.peerMgr != nil {
height := r.peerMgr.GetHeight(peerID)
if height > 0 && height < memTx.height-1 {
@@ -380,16 +383,8 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
}
select {
case <-memTx.gossipEl.NextWaitChan():
// If there is a next element in gossip index, we point memTx to that node's
// value, otherwise we reset memTx to nil which will be checked at the
// parent for loop.
next := memTx.gossipEl.Next()
if next != nil {
memTx = next.Value.(*WrappedTx)
} else {
memTx = nil
}
case <-nextGossipTx.NextWaitChan():
nextGossipTx = nextGossipTx.Next()
case <-closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was