diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 3a0b6b99e..e992c1240 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/cosmos/gogoproto/proto" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -155,7 +157,8 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := memR.decodeMsg(msgBytes) + return + msg, err := decodeMsg(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) memR.Switch.StopPeerForError(src, err) @@ -178,6 +181,32 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { // broadcasting happens from go routines per peer } +// Receive implements Reactor. +// It adds any received transactions to the mempool. +func (memR *Reactor) NewReceive(e p2p.Envelope) { + msg, err := protoToMsg(e.Message) + if err != nil { + memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) + memR.Switch.StopPeerForError(e.Src, err) + return + } + memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) + + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} + if e.Src != nil { + txInfo.SenderP2PID = e.Src.ID() + } + for _, tx := range msg.Txs { + err = memR.mempool.CheckTx(tx, nil, txInfo) + if err == mempool.ErrTxInCache { + memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) + } else if err != nil { + memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) + } + } + // broadcasting happens from go routines per peer +} + // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 @@ -268,13 +297,18 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { //----------------------------------------------------------------------------- // Messages -func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) { +func decodeMsg(bz []byte) (TxsMessage, error) { msg := protomem.Message{} err := msg.Unmarshal(bz) if err != nil { return TxsMessage{}, err } + return protoToMsg(&msg) +} + +func protoToMsg(m proto.Message) (TxsMessage, error) { + msg := m.(*protomem.Message) var message TxsMessage if i, ok := msg.Sum.(*protomem.Message_Txs); ok {