This commit is contained in:
Aleksandr Bezobchuk
2022-06-03 12:40:52 -04:00
parent 593c025f16
commit 6707088e99

View File

@@ -182,9 +182,9 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if err == ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", txID(tx))
memR.Logger.Debug("Tx already exists in cache", "tx", types.Tx(tx).String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err)
memR.Logger.Info("Could not check tx", "tx", types.Tx(tx).String(), "err", err)
}
}
// broadcasting happens from go routines per peer
@@ -234,25 +234,26 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
// Allow for a lag of 1 block.
memTx := next.Value.(*mempoolTx)
if peerState.GetHeight() < memTx.Height()-1 {
memTx := next.Value.(*WrappedTx)
if peerState.GetHeight() < memTx.height-1 {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if _, ok := memTx.senders.Load(peerID); !ok {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(MempoolChannel, bz)
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
@@ -264,8 +265,10 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-memR.Quit():
return
}