From 2f7844b0b9bbababd29e8b99558f8742abf4645c Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 21 Oct 2022 12:41:33 -0400 Subject: [PATCH] mempool tests pass --- mempool/v0/reactor.go | 44 ++++++++++++++++++++++---------------- mempool/v0/reactor_test.go | 8 ++++++- mempool/v1/reactor.go | 42 +++++++++++++++++++++--------------- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 242705fc3..e4f54b8bb 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -157,26 +157,32 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(e p2p.Envelope) { - msg, err := msgFromProto(e.Message) - if err != nil { - memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) - memR.Switch.StopPeerForError(e.Src, err) - return - } - memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) - - txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} - if e.Src != nil { - txInfo.SenderP2PID = e.Src.ID() - } - - for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) - if errors.Is(err, mempool.ErrTxInCache) { - memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) + memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + switch msg := e.Message.(type) { + case *protomem.Txs: + protoTxs := msg.GetTxs() + if len(protoTxs) == 0 { + memR.Logger.Error("received tmpty txs from peer", "src", e.Src) + return } + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} + if e.Src != nil { + txInfo.SenderP2PID = e.Src.ID() + } + + var err error + for _, tx := range protoTxs { + ntx := types.Tx(tx) + err = memR.mempool.CheckTx(ntx, nil, txInfo) + if errors.Is(err, mempool.ErrTxInCache) { + memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) + } else if err != nil { + memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) + } + } + default: + memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + return } // broadcasting happens from go routines per peer diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index 425083654..e93880b4e 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -22,6 +22,7 @@ import ( "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/mock" memproto "github.com/tendermint/tendermint/proto/tendermint/mempool" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -279,7 +280,12 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { for i := 0; i < mempool.MaxActiveIDs+1; i++ { peer := mock.NewPeer(nil) - reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) + reactor.Receive(p2p.Envelope{ + ChannelID: mempool.MempoolChannel, + Src: peer, + Message: &protomem.Txs{ + Txs: [][]byte{[]byte{0x01, 0x02, 0x03}}, + }}) reactor.AddPeer(peer) } } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index e2f82ab84..b812cf043 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -157,26 +157,34 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(e p2p.Envelope) { - msg, err := protoToMsg(e.Message) - if err != nil { - memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) - memR.Switch.StopPeerForError(e.Src, err) + memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + switch msg := e.Message.(type) { + case *protomem.Txs: + protoTxs := msg.GetTxs() + if len(protoTxs) == 0 { + memR.Logger.Error("received tmpty txs from peer", "src", e.Src) + return + } + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} + if e.Src != nil { + txInfo.SenderP2PID = e.Src.ID() + } + + var err error + for _, tx := range protoTxs { + ntx := types.Tx(tx) + err = memR.mempool.CheckTx(ntx, nil, txInfo) + if errors.Is(err, mempool.ErrTxInCache) { + memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) + } else if err != nil { + memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) + } + } + default: + memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) return } - memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) - txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} - if e.Src != nil { - txInfo.SenderP2PID = e.Src.ID() - } - for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) - if err == mempool.ErrTxInCache { - memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) - } - } // broadcasting happens from go routines per peer }