mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-08 14:21:14 +00:00
mempool v1 tests pass
This commit is contained in:
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -155,7 +157,8 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
msg, err := memR.decodeMsg(msgBytes)
|
||||
return
|
||||
msg, err := decodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
memR.Switch.StopPeerForError(src, err)
|
||||
@@ -178,6 +181,32 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
// broadcasting happens from go routines per peer
|
||||
}
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) NewReceive(e p2p.Envelope) {
|
||||
msg, err := protoToMsg(e.Message)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
memR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
|
||||
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
for _, tx := range msg.Txs {
|
||||
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
||||
if err == mempool.ErrTxInCache {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
// broadcasting happens from go routines per peer
|
||||
}
|
||||
|
||||
// PeerState describes the state of a peer.
|
||||
type PeerState interface {
|
||||
GetHeight() int64
|
||||
@@ -268,13 +297,18 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
func decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
msg := protomem.Message{}
|
||||
err := msg.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return TxsMessage{}, err
|
||||
}
|
||||
|
||||
return protoToMsg(&msg)
|
||||
}
|
||||
|
||||
func protoToMsg(m proto.Message) (TxsMessage, error) {
|
||||
msg := m.(*protomem.Message)
|
||||
var message TxsMessage
|
||||
|
||||
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
||||
|
||||
Reference in New Issue
Block a user