mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-08 14:21:14 +00:00
After a reactor has failed to parse an incoming message, it shouldn't output the "bad" data into the logs, as that data is unfiltered and could have anything in it. (We also don't think this information is helpful to have in the logs anyways.)
342 lines
8.7 KiB
Go
342 lines
8.7 KiB
Go
package mempool
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/libs/clist"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmsync "github.com/tendermint/tendermint/libs/sync"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
const (
|
|
MempoolChannel = byte(0x30)
|
|
|
|
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
|
|
|
|
// UnknownPeerID is the peer ID to use when running CheckTx when there is
|
|
// no peer (e.g. RPC)
|
|
UnknownPeerID uint16 = 0
|
|
|
|
maxActiveIDs = math.MaxUint16
|
|
)
|
|
|
|
// Reactor handles mempool tx broadcasting amongst peers.
|
|
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
|
|
// peers you received it from.
|
|
type Reactor struct {
|
|
p2p.BaseReactor
|
|
config *cfg.MempoolConfig
|
|
mempool *CListMempool
|
|
ids *mempoolIDs
|
|
}
|
|
|
|
type mempoolIDs struct {
|
|
mtx tmsync.RWMutex
|
|
peerMap map[p2p.ID]uint16
|
|
nextID uint16 // assumes that a node will never have over 65536 active peers
|
|
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
|
|
}
|
|
|
|
// Reserve searches for the next unused ID and assigns it to the
|
|
// peer.
|
|
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
|
|
ids.mtx.Lock()
|
|
defer ids.mtx.Unlock()
|
|
|
|
curID := ids.nextPeerID()
|
|
ids.peerMap[peer.ID()] = curID
|
|
ids.activeIDs[curID] = struct{}{}
|
|
}
|
|
|
|
// nextPeerID returns the next unused peer ID to use.
|
|
// This assumes that ids's mutex is already locked.
|
|
func (ids *mempoolIDs) nextPeerID() uint16 {
|
|
if len(ids.activeIDs) == maxActiveIDs {
|
|
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs))
|
|
}
|
|
|
|
_, idExists := ids.activeIDs[ids.nextID]
|
|
for idExists {
|
|
ids.nextID++
|
|
_, idExists = ids.activeIDs[ids.nextID]
|
|
}
|
|
curID := ids.nextID
|
|
ids.nextID++
|
|
return curID
|
|
}
|
|
|
|
// Reclaim returns the ID reserved for the peer back to unused pool.
|
|
func (ids *mempoolIDs) Reclaim(peer p2p.Peer) {
|
|
ids.mtx.Lock()
|
|
defer ids.mtx.Unlock()
|
|
|
|
removedID, ok := ids.peerMap[peer.ID()]
|
|
if ok {
|
|
delete(ids.activeIDs, removedID)
|
|
delete(ids.peerMap, peer.ID())
|
|
}
|
|
}
|
|
|
|
// GetForPeer returns an ID reserved for the peer.
|
|
func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 {
|
|
ids.mtx.RLock()
|
|
defer ids.mtx.RUnlock()
|
|
|
|
return ids.peerMap[peer.ID()]
|
|
}
|
|
|
|
func newMempoolIDs() *mempoolIDs {
|
|
return &mempoolIDs{
|
|
peerMap: make(map[p2p.ID]uint16),
|
|
activeIDs: map[uint16]struct{}{0: {}},
|
|
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
|
|
}
|
|
}
|
|
|
|
// NewReactor returns a new Reactor with the given config and mempool.
|
|
func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
|
|
memR := &Reactor{
|
|
config: config,
|
|
mempool: mempool,
|
|
ids: newMempoolIDs(),
|
|
}
|
|
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
|
|
return memR
|
|
}
|
|
|
|
// InitPeer implements Reactor by creating a state for the peer.
|
|
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
|
|
memR.ids.ReserveForPeer(peer)
|
|
return peer
|
|
}
|
|
|
|
// SetLogger sets the Logger on the reactor and the underlying mempool.
|
|
func (memR *Reactor) SetLogger(l log.Logger) {
|
|
memR.Logger = l
|
|
memR.mempool.SetLogger(l)
|
|
}
|
|
|
|
// OnStart implements p2p.BaseReactor.
|
|
func (memR *Reactor) OnStart() error {
|
|
if !memR.config.Broadcast {
|
|
memR.Logger.Info("Tx broadcasting is disabled")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetChannels implements Reactor by returning the list of channels for this
|
|
// reactor.
|
|
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
|
maxMsgSize := memR.config.MaxBatchBytes
|
|
return []*p2p.ChannelDescriptor{
|
|
{
|
|
ID: MempoolChannel,
|
|
Priority: 5,
|
|
RecvMessageCapacity: maxMsgSize,
|
|
},
|
|
}
|
|
}
|
|
|
|
// AddPeer implements Reactor.
|
|
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
|
|
func (memR *Reactor) AddPeer(peer p2p.Peer) {
|
|
if memR.config.Broadcast {
|
|
go memR.broadcastTxRoutine(peer)
|
|
}
|
|
}
|
|
|
|
// RemovePeer implements Reactor.
|
|
func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
|
memR.ids.Reclaim(peer)
|
|
// broadcast routine checks if peer is gone and returns
|
|
}
|
|
|
|
// Receive implements Reactor.
|
|
// It adds any received transactions to the mempool.
|
|
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
|
msg, err := memR.decodeMsg(msgBytes)
|
|
if err != nil {
|
|
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
|
memR.Switch.StopPeerForError(src, err)
|
|
return
|
|
}
|
|
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
|
|
|
|
txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)}
|
|
if src != nil {
|
|
txInfo.SenderP2PID = src.ID()
|
|
}
|
|
for _, tx := range msg.Txs {
|
|
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
|
if err != nil {
|
|
memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err)
|
|
}
|
|
}
|
|
// broadcasting happens from go routines per peer
|
|
}
|
|
|
|
// PeerState describes the state of a peer.
|
|
type PeerState interface {
|
|
GetHeight() int64
|
|
}
|
|
|
|
// Send new mempool txs to peer.
|
|
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
|
peerID := memR.ids.GetForPeer(peer)
|
|
var next *clist.CElement
|
|
|
|
for {
|
|
// In case of both next.NextWaitChan() and peer.Quit() are variable at the same time
|
|
if !memR.IsRunning() || !peer.IsRunning() {
|
|
return
|
|
}
|
|
// This happens because the CElement we were looking at got garbage
|
|
// collected (removed). That is, .NextWait() returned nil. Go ahead and
|
|
// start from the beginning.
|
|
if next == nil {
|
|
select {
|
|
case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available
|
|
if next = memR.mempool.TxsFront(); next == nil {
|
|
continue
|
|
}
|
|
case <-peer.Quit():
|
|
return
|
|
case <-memR.Quit():
|
|
return
|
|
}
|
|
}
|
|
|
|
// Make sure the peer is up to date.
|
|
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
|
|
if !ok {
|
|
// Peer does not have a state yet. We set it in the consensus reactor, but
|
|
// when we add peer in Switch, the order we call reactors#AddPeer is
|
|
// different every time due to us using a map. Sometimes other reactors
|
|
// will be initialized before the consensus reactor. We should wait a few
|
|
// milliseconds and retry.
|
|
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
// Allow for a lag of 1 block.
|
|
memTx := next.Value.(*mempoolTx)
|
|
if peerState.GetHeight() < memTx.Height()-1 {
|
|
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
txs := memR.txs(next, peerID, peerState.GetHeight()) // WARNING: mutates next!
|
|
|
|
// send txs
|
|
if len(txs) > 0 {
|
|
msg := protomem.Message{
|
|
Sum: &protomem.Message_Txs{
|
|
Txs: &protomem.Txs{Txs: txs},
|
|
},
|
|
}
|
|
bz, err := msg.Marshal()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
memR.Logger.Debug("Sending N txs to peer", "N", len(txs), "peer", peer)
|
|
success := peer.Send(MempoolChannel, bz)
|
|
if !success {
|
|
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
|
continue
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-next.NextWaitChan():
|
|
// see the start of the for loop for nil check
|
|
next = next.Next()
|
|
case <-peer.Quit():
|
|
return
|
|
case <-memR.Quit():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// txs iterates over the transaction list and builds a batch of txs. next is
|
|
// included.
|
|
// WARNING: mutates next!
|
|
func (memR *Reactor) txs(next *clist.CElement, peerID uint16, peerHeight int64) [][]byte {
|
|
batch := make([][]byte, 0)
|
|
|
|
for {
|
|
memTx := next.Value.(*mempoolTx)
|
|
|
|
if _, ok := memTx.senders.Load(peerID); !ok {
|
|
// If current batch + this tx size is greater than max => return.
|
|
batchMsg := protomem.Message{
|
|
Sum: &protomem.Message_Txs{
|
|
Txs: &protomem.Txs{Txs: append(batch, memTx.tx)},
|
|
},
|
|
}
|
|
if batchMsg.Size() > memR.config.MaxBatchBytes {
|
|
return batch
|
|
}
|
|
|
|
batch = append(batch, memTx.tx)
|
|
}
|
|
|
|
n := next.Next()
|
|
if n == nil {
|
|
return batch
|
|
}
|
|
next = n
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// Messages
|
|
|
|
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
|
|
msg := protomem.Message{}
|
|
err := msg.Unmarshal(bz)
|
|
if err != nil {
|
|
return TxsMessage{}, err
|
|
}
|
|
|
|
var message TxsMessage
|
|
|
|
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
|
txs := i.Txs.GetTxs()
|
|
|
|
if len(txs) == 0 {
|
|
return message, errors.New("empty TxsMessage")
|
|
}
|
|
|
|
decoded := make([]types.Tx, len(txs))
|
|
for j, tx := range txs {
|
|
decoded[j] = types.Tx(tx)
|
|
}
|
|
|
|
message = TxsMessage{
|
|
Txs: decoded,
|
|
}
|
|
return message, nil
|
|
}
|
|
return message, fmt.Errorf("msg type: %T is not supported", msg)
|
|
}
|
|
|
|
//-------------------------------------
|
|
|
|
// TxsMessage is a Message containing transactions.
|
|
type TxsMessage struct {
|
|
Txs []types.Tx
|
|
}
|
|
|
|
// String returns a string representation of the TxsMessage.
|
|
func (m *TxsMessage) String() string {
|
|
return fmt.Sprintf("[TxsMessage %v]", m.Txs)
|
|
}
|