From ae266b2da6f0c8adf3bfba3368c26a429dc46feb Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 19 Oct 2022 21:58:48 -0400 Subject: [PATCH] mempool v0 tests pass --- mempool/v0/reactor.go | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index d9238a7a3..bfce45373 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/cosmos/gogoproto/proto" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -156,7 +157,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := memR.decodeMsg(msgBytes) + msg, err := decodeMsg(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) memR.Switch.StopPeerForError(src, err) @@ -181,6 +182,32 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { // broadcasting happens from go routines per peer } +func (memR *Reactor) NewReceive(e p2p.Envelope) { + msg, err := msgFromProto(e.Message) + if err != nil { + memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) + memR.Switch.StopPeerForError(e.Src, err) + return + } + memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) + + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} + if e.Src != nil { + txInfo.SenderP2PID = e.Src.ID() + } + + for _, tx := range msg.Txs { + err = memR.mempool.CheckTx(tx, nil, txInfo) + if errors.Is(err, mempool.ErrTxInCache) { + memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) + } else if err != nil { + memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) + } + } + + // broadcasting happens from go routines per peer +} + // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 @@ -262,15 +289,19 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } -func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) { +func decodeMsg(bz []byte) (TxsMessage, error) { msg := protomem.Message{} err := msg.Unmarshal(bz) if err != nil { return TxsMessage{}, err } - var message TxsMessage + return msgFromProto(&msg) +} +func msgFromProto(m proto.Message) (TxsMessage, error) { + msg := m.(*protomem.Message) + var message TxsMessage if i, ok := msg.Sum.(*protomem.Message_Txs); ok { txs := i.Txs.GetTxs()