diff --git a/mempool/reactor.go b/mempool/reactor.go index 99711a373..74c44eead 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -182,9 +182,9 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { for _, tx := range msg.Txs { err = memR.mempool.CheckTx(tx, nil, txInfo) if err == ErrTxInCache { - memR.Logger.Debug("Tx already exists in cache", "tx", txID(tx)) + memR.Logger.Debug("Tx already exists in cache", "tx", types.Tx(tx).String()) } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err) + memR.Logger.Info("Could not check tx", "tx", types.Tx(tx).String(), "err", err) } } // broadcasting happens from go routines per peer @@ -234,25 +234,26 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } // Allow for a lag of 1 block. - memTx := next.Value.(*mempoolTx) - if peerState.GetHeight() < memTx.Height()-1 { + memTx := next.Value.(*WrappedTx) + if peerState.GetHeight() < memTx.height-1 { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - if _, ok := memTx.senders.Load(peerID); !ok { msg := protomem.Message{ Sum: &protomem.Message_Txs{ Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }, } + bz, err := msg.Marshal() if err != nil { panic(err) } + success := peer.Send(MempoolChannel, bz) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) @@ -264,8 +265,10 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { case <-next.NextWaitChan(): // see the start of the for loop for nil check next = next.Next() + case <-peer.Quit(): return + case <-memR.Quit(): return }