mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 23:10:59 +00:00
Fixed consensus mempool test, fuzz testdata, del backup
This commit is contained in:
@@ -394,7 +394,7 @@ func newStateWithConfigAndBlockStore(
|
||||
mtx := new(tmsync.Mutex)
|
||||
|
||||
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
|
||||
|
||||
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)
|
||||
// Make Mempool
|
||||
memplMetrics := mempl.NopMetrics()
|
||||
|
||||
@@ -404,7 +404,7 @@ func newStateWithConfigAndBlockStore(
|
||||
switch config.Mempool.Version {
|
||||
case cfg.MempoolV0:
|
||||
mempool = mempoolv0.NewCListMempool(config.Mempool,
|
||||
proxyAppConnCon,
|
||||
proxyAppConnConMem,
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(memplMetrics),
|
||||
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
|
||||
@@ -413,7 +413,7 @@ func newStateWithConfigAndBlockStore(
|
||||
logger := consensusLogger()
|
||||
mempool = mempoolv1.NewTxMempool(logger,
|
||||
config.Mempool,
|
||||
proxyAppConnCon,
|
||||
proxyAppConnConMem,
|
||||
state.LastBlockHeight,
|
||||
mempoolv1.WithMetrics(memplMetrics),
|
||||
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkCacheInsertTime(b *testing.B) {
|
||||
cache := NewLRUTxCache(b.N)
|
||||
|
||||
txs := make([][]byte, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
txs[i] = make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(txs[i], uint64(i))
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
cache.Push(txs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// This benchmark is probably skewed, since we actually will be removing
|
||||
// txs in parallel, which may cause some overhead due to mutex locking.
|
||||
func BenchmarkCacheRemoveTime(b *testing.B) {
|
||||
cache := NewLRUTxCache(b.N)
|
||||
|
||||
txs := make([][]byte, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
txs[i] = make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(txs[i], uint64(i))
|
||||
cache.Push(txs[i])
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
cache.Remove(txs[i])
|
||||
}
|
||||
}
|
||||
@@ -1,107 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// TxCache defines an interface for raw transaction caching in a mempool.
|
||||
// Currently, a TxCache does not allow direct reading or getting of transaction
|
||||
// values. A TxCache is used primarily to push transactions and removing
|
||||
// transactions. Pushing via Push returns a boolean telling the caller if the
|
||||
// transaction already exists in the cache or not.
|
||||
type TxCache interface {
|
||||
// Reset resets the cache to an empty state.
|
||||
Reset()
|
||||
|
||||
// Push adds the given raw transaction to the cache and returns true if it was
|
||||
// newly added. Otherwise, it returns false.
|
||||
Push(tx types.Tx) bool
|
||||
|
||||
// Remove removes the given raw transaction from the cache.
|
||||
Remove(tx types.Tx)
|
||||
}
|
||||
|
||||
var _ TxCache = (*LRUTxCache)(nil)
|
||||
|
||||
// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache
|
||||
// only stores the hash of the raw transaction.
|
||||
type LRUTxCache struct {
|
||||
mtx tmsync.Mutex
|
||||
size int
|
||||
cacheMap map[types.TxKey]*list.Element
|
||||
list *list.List
|
||||
}
|
||||
|
||||
func NewLRUTxCache(cacheSize int) *LRUTxCache {
|
||||
return &LRUTxCache{
|
||||
size: cacheSize,
|
||||
cacheMap: make(map[types.TxKey]*list.Element, cacheSize),
|
||||
list: list.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// GetList returns the underlying linked-list that backs the LRU cache. Note,
|
||||
// this should be used for testing purposes only!
|
||||
func (c *LRUTxCache) GetList() *list.List {
|
||||
return c.list
|
||||
}
|
||||
|
||||
func (c *LRUTxCache) Reset() {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.cacheMap = make(map[types.TxKey]*list.Element, c.size)
|
||||
c.list.Init()
|
||||
}
|
||||
|
||||
func (c *LRUTxCache) Push(tx types.Tx) bool {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
key := tx.Key()
|
||||
|
||||
moved, ok := c.cacheMap[key]
|
||||
if ok {
|
||||
c.list.MoveToBack(moved)
|
||||
return false
|
||||
}
|
||||
|
||||
if c.list.Len() >= c.size {
|
||||
front := c.list.Front()
|
||||
if front != nil {
|
||||
frontKey := front.Value.(types.TxKey)
|
||||
delete(c.cacheMap, frontKey)
|
||||
c.list.Remove(front)
|
||||
}
|
||||
}
|
||||
|
||||
e := c.list.PushBack(key)
|
||||
c.cacheMap[key] = e
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *LRUTxCache) Remove(tx types.Tx) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
key := tx.Key()
|
||||
e := c.cacheMap[key]
|
||||
delete(c.cacheMap, key)
|
||||
|
||||
if e != nil {
|
||||
c.list.Remove(e)
|
||||
}
|
||||
}
|
||||
|
||||
// NopTxCache defines a no-op raw transaction cache.
|
||||
type NopTxCache struct{}
|
||||
|
||||
var _ TxCache = (*NopTxCache)(nil)
|
||||
|
||||
func (NopTxCache) Reset() {}
|
||||
func (NopTxCache) Push(types.Tx) bool { return true }
|
||||
func (NopTxCache) Remove(types.Tx) {}
|
||||
@@ -1,35 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCacheRemove(t *testing.T) {
|
||||
cache := NewLRUTxCache(100)
|
||||
numTxs := 10
|
||||
|
||||
txs := make([][]byte, numTxs)
|
||||
for i := 0; i < numTxs; i++ {
|
||||
// probability of collision is 2**-256
|
||||
txBytes := make([]byte, 32)
|
||||
_, err := rand.Read(txBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
txs[i] = txBytes
|
||||
cache.Push(txBytes)
|
||||
|
||||
// make sure its added to both the linked list and the map
|
||||
require.Equal(t, i+1, len(cache.cacheMap))
|
||||
require.Equal(t, i+1, cache.list.Len())
|
||||
}
|
||||
|
||||
for i := 0; i < numTxs; i++ {
|
||||
cache.Remove(txs[i])
|
||||
// make sure its removed from both the map and the linked list
|
||||
require.Equal(t, numTxs-(i+1), len(cache.cacheMap))
|
||||
require.Equal(t, numTxs-(i+1), cache.list.Len())
|
||||
}
|
||||
}
|
||||
@@ -1,872 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
_ Mempool = (*TxMempool)(nil)
|
||||
|
||||
newline = []byte("\n")
|
||||
)
|
||||
|
||||
// TxMempoolOption sets an optional parameter on the TxMempool.
|
||||
type TxMempoolOption func(*TxMempool)
|
||||
|
||||
// TxMempool defines a prioritized mempool data structure used by the v1 mempool
|
||||
// reactor. It keeps a thread-safe priority queue of transactions that is used
|
||||
// when a block proposer constructs a block and a thread-safe linked-list that
|
||||
// is used to gossip transactions to peers in a FIFO manner.
|
||||
type TxMempool struct {
|
||||
logger log.Logger
|
||||
metrics *Metrics
|
||||
config *cfg.MempoolConfig
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
|
||||
// txsAvailable fires once for each height when the mempool is not empty
|
||||
txsAvailable chan struct{}
|
||||
notifiedTxsAvailable bool
|
||||
|
||||
// height defines the last block height process during Update()
|
||||
height int64
|
||||
|
||||
// sizeBytes defines the total size of the mempool (sum of all tx bytes)
|
||||
sizeBytes int64
|
||||
|
||||
// cache defines a fixed-size cache of already seen transactions as this
|
||||
// reduces pressure on the proxyApp.
|
||||
cache TxCache
|
||||
|
||||
// txStore defines the main storage of valid transactions. Indexes are built
|
||||
// on top of this store.
|
||||
txStore *TxStore
|
||||
|
||||
// gossipIndex defines the gossiping index of valid transactions via a
|
||||
// thread-safe linked-list. We also use the gossip index as a cursor for
|
||||
// rechecking transactions already in the mempool.
|
||||
gossipIndex *clist.CList
|
||||
|
||||
// recheckCursor and recheckEnd are used as cursors based on the gossip index
|
||||
// to recheck transactions that are already in the mempool. Iteration is not
|
||||
// thread-safe and transaction may be mutated in serial order.
|
||||
//
|
||||
// XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
|
||||
// iterator and cursor management when rechecking transactions. If the gossip
|
||||
// index changes or is removed in a future refactor, this will have to be
|
||||
// refactored. Instead, we should consider just keeping a slice of a snapshot
|
||||
// of the mempool's current transactions during Update and an integer cursor
|
||||
// into that slice. This, however, requires additional O(n) space complexity.
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
|
||||
// priorityIndex defines the priority index of valid transactions via a
|
||||
// thread-safe priority queue.
|
||||
priorityIndex *TxPriorityQueue
|
||||
|
||||
// heightIndex defines a height-based, in ascending order, transaction index.
|
||||
// i.e. older transactions are first.
|
||||
heightIndex *WrappedTxList
|
||||
|
||||
// timestampIndex defines a timestamp-based, in ascending order, transaction
|
||||
// index. i.e. older transactions are first.
|
||||
timestampIndex *WrappedTxList
|
||||
|
||||
// A read/write lock is used to safe guard updates, insertions and deletions
|
||||
// from the mempool. A read-lock is implicitly acquired when executing CheckTx,
|
||||
// however, a caller must explicitly grab a write-lock via Lock when updating
|
||||
// the mempool via Update().
|
||||
mtx tmsync.RWMutex
|
||||
preCheck PreCheckFunc
|
||||
postCheck PostCheckFunc
|
||||
}
|
||||
|
||||
func NewTxMempool(
|
||||
logger log.Logger,
|
||||
config *cfg.MempoolConfig,
|
||||
proxyAppConn proxy.AppConnMempool,
|
||||
height int64,
|
||||
options ...TxMempoolOption,
|
||||
) *TxMempool {
|
||||
|
||||
txmp := &TxMempool{
|
||||
logger: logger,
|
||||
config: config,
|
||||
proxyAppConn: proxyAppConn,
|
||||
height: height,
|
||||
cache: NopTxCache{},
|
||||
metrics: NopMetrics(),
|
||||
txStore: NewTxStore(),
|
||||
gossipIndex: clist.New(),
|
||||
priorityIndex: NewTxPriorityQueue(),
|
||||
heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
|
||||
return wtx1.height >= wtx2.height
|
||||
}),
|
||||
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
|
||||
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
|
||||
}),
|
||||
}
|
||||
|
||||
if config.CacheSize > 0 {
|
||||
txmp.cache = NewLRUTxCache(config.CacheSize)
|
||||
}
|
||||
|
||||
proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
|
||||
|
||||
for _, opt := range options {
|
||||
opt(txmp)
|
||||
}
|
||||
|
||||
return txmp
|
||||
}
|
||||
|
||||
// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
|
||||
// returns an error. This is executed before CheckTx. It only applies to the
|
||||
// first created block. After that, Update() overwrites the existing value.
|
||||
func WithPreCheck(f PreCheckFunc) TxMempoolOption {
|
||||
return func(txmp *TxMempool) { txmp.preCheck = f }
|
||||
}
|
||||
|
||||
// WithPostCheck sets a filter for the mempool to reject a transaction if
|
||||
// f(tx, resp) returns an error. This is executed after CheckTx. It only applies
|
||||
// to the first created block. After that, Update overwrites the existing value.
|
||||
func WithPostCheck(f PostCheckFunc) TxMempoolOption {
|
||||
return func(txmp *TxMempool) { txmp.postCheck = f }
|
||||
}
|
||||
|
||||
// WithMetrics sets the mempool's metrics collector.
|
||||
func WithMetrics(metrics *Metrics) TxMempoolOption {
|
||||
return func(txmp *TxMempool) { txmp.metrics = metrics }
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) SetLogger(l log.Logger) {
|
||||
txmp.logger = l
|
||||
}
|
||||
|
||||
// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
|
||||
// release the lock when finished.
|
||||
func (txmp *TxMempool) Lock() {
|
||||
txmp.mtx.Lock()
|
||||
}
|
||||
|
||||
// Unlock releases a write-lock on the mempool.
|
||||
func (txmp *TxMempool) Unlock() {
|
||||
txmp.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Size returns the number of valid transactions in the mempool. It is
|
||||
// thread-safe.
|
||||
func (txmp *TxMempool) Size() int {
|
||||
return txmp.txStore.Size()
|
||||
}
|
||||
|
||||
// SizeBytes return the total sum in bytes of all the valid transactions in the
|
||||
// mempool. It is thread-safe.
|
||||
func (txmp *TxMempool) SizeBytes() int64 {
|
||||
return atomic.LoadInt64(&txmp.sizeBytes)
|
||||
}
|
||||
|
||||
// FlushAppConn executes FlushSync on the mempool's proxyAppConn.
|
||||
//
|
||||
// NOTE: The caller must obtain a write-lock via Lock() prior to execution.
|
||||
func (txmp *TxMempool) FlushAppConn() error {
|
||||
return txmp.proxyAppConn.FlushSync()
|
||||
}
|
||||
|
||||
// WaitForNextTx returns a blocking channel that will be closed when the next
|
||||
// valid transaction is available to gossip. It is thread-safe.
|
||||
func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
|
||||
return txmp.gossipIndex.WaitChan()
|
||||
}
|
||||
|
||||
// NextGossipTx returns the next valid transaction to gossip. A caller must wait
|
||||
// for WaitForNextTx to signal a transaction is available to gossip first. It is
|
||||
// thread-safe.
|
||||
func (txmp *TxMempool) NextGossipTx() *clist.CElement {
|
||||
return txmp.gossipIndex.Front()
|
||||
}
|
||||
|
||||
// EnableTxsAvailable enables the mempool to trigger events when transactions
|
||||
// are available on a block by block basis.
|
||||
func (txmp *TxMempool) EnableTxsAvailable() {
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
|
||||
txmp.txsAvailable = make(chan struct{}, 1)
|
||||
}
|
||||
|
||||
// TxsAvailable returns a channel which fires once for every height, and only
|
||||
// when transactions are available in the mempool. It is thread-safe.
|
||||
func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
|
||||
return txmp.txsAvailable
|
||||
}
|
||||
|
||||
// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
|
||||
// a read-lock attempts to execute the application's CheckTx ABCI method via
|
||||
// CheckTxAsync. We return an error if any of the following happen:
|
||||
//
|
||||
// - The CheckTxAsync execution fails.
|
||||
// - The transaction already exists in the cache and we've already received the
|
||||
// transaction from the peer. Otherwise, if it solely exists in the cache, we
|
||||
// return nil.
|
||||
// - The transaction size exceeds the maximum transaction size as defined by the
|
||||
// configuration provided to the mempool.
|
||||
// - The transaction fails Pre-Check (if it is defined).
|
||||
// - The proxyAppConn fails, e.g. the buffer is full.
|
||||
//
|
||||
// If the mempool is full, we still execute CheckTx and attempt to find a lower
|
||||
// priority transaction to evict. If such a transaction exists, we remove the
|
||||
// lower priority transaction and add the new one with higher priority.
|
||||
//
|
||||
// NOTE:
|
||||
// - The applications' CheckTx implementation may panic.
|
||||
// - The caller is not to explicitly require any locks for executing CheckTx.
|
||||
func (txmp *TxMempool) CheckTx(
|
||||
tx types.Tx,
|
||||
cb func(*abci.Response),
|
||||
txInfo TxInfo,
|
||||
) error {
|
||||
|
||||
txmp.mtx.RLock()
|
||||
defer txmp.mtx.RUnlock()
|
||||
|
||||
txSize := len(tx)
|
||||
if txSize > txmp.config.MaxTxBytes {
|
||||
return ErrTxTooLarge{
|
||||
max: txmp.config.MaxTxBytes,
|
||||
actual: txSize,
|
||||
}
|
||||
}
|
||||
|
||||
if txmp.preCheck != nil {
|
||||
if err := txmp.preCheck(tx); err != nil {
|
||||
return ErrPreCheck{
|
||||
Reason: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := txmp.proxyAppConn.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txHash := tx.Key()
|
||||
|
||||
// We add the transaction to the mempool's cache and if the
|
||||
// transaction is already present in the cache, i.e. false is returned, then we
|
||||
// check if we've seen this transaction and error if we have.
|
||||
if !txmp.cache.Push(tx) {
|
||||
txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
|
||||
return ErrTxInCache
|
||||
}
|
||||
|
||||
reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
|
||||
|
||||
reqRes.SetCallback(func(res *abci.Response) {
|
||||
if txmp.recheckCursor != nil {
|
||||
panic("recheck cursor is non-nil in CheckTx callback")
|
||||
}
|
||||
|
||||
wtx := &WrappedTx{
|
||||
tx: tx,
|
||||
hash: txHash,
|
||||
timestamp: time.Now().UTC(),
|
||||
height: txmp.height,
|
||||
}
|
||||
txmp.initTxCallback(wtx, res, txInfo)
|
||||
|
||||
if cb != nil {
|
||||
cb(res)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
|
||||
txmp.Lock()
|
||||
defer txmp.Unlock()
|
||||
|
||||
// remove the committed transaction from the transaction store and indexes
|
||||
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
|
||||
txmp.removeTx(wtx, false)
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("transaction not found")
|
||||
}
|
||||
|
||||
// Flush flushes out the mempool. It acquires a read-lock, fetches all the
|
||||
// transactions currently in the transaction store and removes each transaction
|
||||
// from the store and all indexes and finally resets the cache.
|
||||
//
|
||||
// NOTE:
|
||||
// - Flushing the mempool may leave the mempool in an inconsistent state.
|
||||
func (txmp *TxMempool) Flush() {
|
||||
txmp.mtx.RLock()
|
||||
defer txmp.mtx.RUnlock()
|
||||
|
||||
txmp.heightIndex.Reset()
|
||||
txmp.timestampIndex.Reset()
|
||||
|
||||
for _, wtx := range txmp.txStore.GetAllTxs() {
|
||||
txmp.removeTx(wtx, false)
|
||||
}
|
||||
|
||||
atomic.SwapInt64(&txmp.sizeBytes, 0)
|
||||
txmp.cache.Reset()
|
||||
}
|
||||
|
||||
// ReapMaxBytesMaxGas returns a list of transactions within the provided size
|
||||
// and gas constraints. Transaction are retrieved in priority order.
|
||||
//
|
||||
// NOTE:
|
||||
// - A read-lock is acquired.
|
||||
// - Transactions returned are not actually removed from the mempool transaction
|
||||
// store or indexes.
|
||||
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
||||
txmp.mtx.RLock()
|
||||
defer txmp.mtx.RUnlock()
|
||||
|
||||
var (
|
||||
totalGas int64
|
||||
totalSize int64
|
||||
)
|
||||
|
||||
// wTxs contains a list of *WrappedTx retrieved from the priority queue that
|
||||
// need to be re-enqueued prior to returning.
|
||||
wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
|
||||
defer func() {
|
||||
for _, wtx := range wTxs {
|
||||
txmp.priorityIndex.PushTx(wtx)
|
||||
}
|
||||
}()
|
||||
|
||||
txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
|
||||
for txmp.priorityIndex.NumTxs() > 0 {
|
||||
wtx := txmp.priorityIndex.PopTx()
|
||||
txs = append(txs, wtx.tx)
|
||||
wTxs = append(wTxs, wtx)
|
||||
size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
|
||||
|
||||
// Ensure we have capacity for the transaction with respect to the
|
||||
// transaction size.
|
||||
if maxBytes > -1 && totalSize+size > maxBytes {
|
||||
return txs[:len(txs)-1]
|
||||
}
|
||||
|
||||
totalSize += size
|
||||
|
||||
// ensure we have capacity for the transaction with respect to total gas
|
||||
gas := totalGas + wtx.gasWanted
|
||||
if maxGas > -1 && gas > maxGas {
|
||||
return txs[:len(txs)-1]
|
||||
}
|
||||
|
||||
totalGas = gas
|
||||
}
|
||||
|
||||
return txs
|
||||
}
|
||||
|
||||
// ReapMaxTxs returns a list of transactions within the provided number of
|
||||
// transactions bound. Transaction are retrieved in priority order.
|
||||
//
|
||||
// NOTE:
|
||||
// - A read-lock is acquired.
|
||||
// - Transactions returned are not actually removed from the mempool transaction
|
||||
// store or indexes.
|
||||
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
|
||||
txmp.mtx.RLock()
|
||||
defer txmp.mtx.RUnlock()
|
||||
|
||||
numTxs := txmp.priorityIndex.NumTxs()
|
||||
if max < 0 {
|
||||
max = numTxs
|
||||
}
|
||||
|
||||
cap := tmmath.MinInt(numTxs, max)
|
||||
|
||||
// wTxs contains a list of *WrappedTx retrieved from the priority queue that
|
||||
// need to be re-enqueued prior to returning.
|
||||
wTxs := make([]*WrappedTx, 0, cap)
|
||||
defer func() {
|
||||
for _, wtx := range wTxs {
|
||||
txmp.priorityIndex.PushTx(wtx)
|
||||
}
|
||||
}()
|
||||
|
||||
txs := make([]types.Tx, 0, cap)
|
||||
for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
|
||||
wtx := txmp.priorityIndex.PopTx()
|
||||
txs = append(txs, wtx.tx)
|
||||
wTxs = append(wTxs, wtx)
|
||||
}
|
||||
|
||||
return txs
|
||||
}
|
||||
|
||||
// Update iterates over all the transactions provided by the caller, i.e. the
|
||||
// block producer, and removes them from the cache (if applicable) and removes
|
||||
// the transactions from the main transaction store and associated indexes.
|
||||
// Finally, if there are trainsactions remaining in the mempool, we initiate a
|
||||
// re-CheckTx for them (if applicable), otherwise, we notify the caller more
|
||||
// transactions are available.
|
||||
//
|
||||
// NOTE:
|
||||
// - The caller must explicitly acquire a write-lock via Lock().
|
||||
func (txmp *TxMempool) Update(
|
||||
blockHeight int64,
|
||||
blockTxs types.Txs,
|
||||
deliverTxResponses []*abci.ResponseDeliverTx,
|
||||
newPreFn PreCheckFunc,
|
||||
newPostFn PostCheckFunc,
|
||||
) error {
|
||||
|
||||
txmp.height = blockHeight
|
||||
txmp.notifiedTxsAvailable = false
|
||||
|
||||
if newPreFn != nil {
|
||||
txmp.preCheck = newPreFn
|
||||
}
|
||||
if newPostFn != nil {
|
||||
txmp.postCheck = newPostFn
|
||||
}
|
||||
|
||||
for i, tx := range blockTxs {
|
||||
if deliverTxResponses[i].Code == abci.CodeTypeOK {
|
||||
// add the valid committed transaction to the cache (if missing)
|
||||
_ = txmp.cache.Push(tx)
|
||||
} else if !txmp.config.KeepInvalidTxsInCache {
|
||||
// allow invalid transactions to be re-submitted
|
||||
txmp.cache.Remove(tx)
|
||||
}
|
||||
|
||||
// remove the committed transaction from the transaction store and indexes
|
||||
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
|
||||
txmp.removeTx(wtx, false)
|
||||
}
|
||||
}
|
||||
|
||||
txmp.purgeExpiredTxs(blockHeight)
|
||||
|
||||
// If there any uncommitted transactions left in the mempool, we either
|
||||
// initiate re-CheckTx per remaining transaction or notify that remaining
|
||||
// transactions are left.
|
||||
if txmp.Size() > 0 {
|
||||
if txmp.config.Recheck {
|
||||
txmp.logger.Debug(
|
||||
"executing re-CheckTx for all remaining transactions",
|
||||
"num_txs", txmp.Size(),
|
||||
"height", blockHeight,
|
||||
)
|
||||
txmp.updateReCheckTxs()
|
||||
} else {
|
||||
txmp.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
|
||||
txmp.metrics.Size.Set(float64(txmp.Size()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// initTxCallback performs the initial, i.e. the first, callback after CheckTx
|
||||
// has been executed by the ABCI application. In other words, initTxCallback is
|
||||
// called after executing CheckTx when we see a unique transaction for the first
|
||||
// time. CheckTx can be called again for the same transaction at a later point
|
||||
// in time when re-checking, however, this callback will not be called.
|
||||
//
|
||||
// After the ABCI application executes CheckTx, initTxCallback is called with
|
||||
// the ABCI *Response object and TxInfo. If postCheck is defined on the mempool,
|
||||
// we execute that first. If there is no error from postCheck (if defined) and
|
||||
// the ABCI CheckTx response code is OK, we attempt to insert the transaction.
|
||||
//
|
||||
// When attempting to insert the transaction, we first check if there is
|
||||
// sufficient capacity. If there is sufficient capacity, the transaction is
|
||||
// inserted into the txStore and indexed across all indexes. Otherwise, if the
|
||||
// mempool is full, we attempt to find a lower priority transaction to evict in
|
||||
// place of the new incoming transaction. If no such transaction exists, the
|
||||
// new incoming transaction is rejected.
|
||||
//
|
||||
// If the new incoming transaction fails CheckTx or postCheck fails, we reject
|
||||
// the new incoming transaction.
|
||||
//
|
||||
// NOTE:
|
||||
// - An explicit lock is NOT required.
|
||||
func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) {
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
|
||||
}
|
||||
|
||||
if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
|
||||
// ignore bad transactions
|
||||
txmp.logger.Info(
|
||||
"rejected bad transaction",
|
||||
"priority", wtx.priority,
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"peer_id", txInfo.SenderP2PID,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
"post_check_err", err,
|
||||
)
|
||||
|
||||
txmp.metrics.FailedTxs.Add(1)
|
||||
|
||||
if !txmp.config.KeepInvalidTxsInCache {
|
||||
txmp.cache.Remove(wtx.tx)
|
||||
}
|
||||
if err != nil {
|
||||
checkTxRes.CheckTx.MempoolError = err.Error()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
sender := checkTxRes.CheckTx.Sender
|
||||
priority := checkTxRes.CheckTx.Priority
|
||||
|
||||
if len(sender) > 0 {
|
||||
if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
|
||||
txmp.logger.Error(
|
||||
"rejected incoming good transaction; tx already exists for sender",
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"sender", sender,
|
||||
)
|
||||
txmp.metrics.RejectedTxs.Add(1)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := txmp.canAddTx(wtx); err != nil {
|
||||
evictTxs := txmp.priorityIndex.GetEvictableTxs(
|
||||
priority,
|
||||
int64(wtx.Size()),
|
||||
txmp.SizeBytes(),
|
||||
txmp.config.MaxTxsBytes,
|
||||
)
|
||||
if len(evictTxs) == 0 {
|
||||
// No room for the new incoming transaction so we just remove it from
|
||||
// the cache.
|
||||
txmp.cache.Remove(wtx.tx)
|
||||
txmp.logger.Error(
|
||||
"rejected incoming good transaction; mempool full",
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err.Error(),
|
||||
)
|
||||
txmp.metrics.RejectedTxs.Add(1)
|
||||
return
|
||||
}
|
||||
|
||||
// evict an existing transaction(s)
|
||||
//
|
||||
// NOTE:
|
||||
// - The transaction, toEvict, can be removed while a concurrent
|
||||
// reCheckTx callback is being executed for the same transaction.
|
||||
for _, toEvict := range evictTxs {
|
||||
txmp.removeTx(toEvict, true)
|
||||
txmp.logger.Debug(
|
||||
"evicted existing good transaction; mempool full",
|
||||
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
|
||||
"old_priority", toEvict.priority,
|
||||
"new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"new_priority", wtx.priority,
|
||||
)
|
||||
txmp.metrics.EvictedTxs.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
wtx.gasWanted = checkTxRes.CheckTx.GasWanted
|
||||
wtx.priority = priority
|
||||
wtx.sender = sender
|
||||
wtx.peers = map[uint16]struct{}{
|
||||
txInfo.SenderID: {},
|
||||
}
|
||||
|
||||
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
|
||||
txmp.metrics.Size.Set(float64(txmp.Size()))
|
||||
|
||||
txmp.insertTx(wtx)
|
||||
txmp.logger.Debug(
|
||||
"inserted good transaction",
|
||||
"priority", wtx.priority,
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"height", txmp.height,
|
||||
"num_txs", txmp.Size(),
|
||||
)
|
||||
txmp.notifyTxsAvailable()
|
||||
|
||||
}
|
||||
|
||||
// defaultTxCallback performs the default CheckTx application callback. This is
|
||||
// NOT executed when a transaction is first seen/received. Instead, this callback
|
||||
// is executed during re-checking transactions (if enabled). A caller, i.e a
|
||||
// block proposer, acquires a mempool write-lock via Lock() and when executing
|
||||
// Update(), if the mempool is non-empty and Recheck is enabled, then all
|
||||
// remaining transactions will be rechecked via CheckTxAsync. The order in which
|
||||
// they are rechecked must be the same order in which this callback is called
|
||||
// per transaction.
|
||||
func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
|
||||
if txmp.recheckCursor == nil {
|
||||
return
|
||||
}
|
||||
|
||||
txmp.metrics.RecheckTimes.Add(1)
|
||||
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if !ok {
|
||||
txmp.logger.Error(
|
||||
"received incorrect type in mempool callback",
|
||||
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
|
||||
"got", reflect.TypeOf(res.Value).Name(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
tx := req.GetCheckTx().Tx
|
||||
wtx := txmp.recheckCursor.Value.(*WrappedTx)
|
||||
|
||||
// Search through the remaining list of tx to recheck for a transaction that matches
|
||||
// the one we received from the ABCI application.
|
||||
for {
|
||||
if bytes.Equal(tx, wtx.tx) {
|
||||
// We've found a tx in the recheck list that matches the tx that we
|
||||
// received from the ABCI application.
|
||||
// Break, and use this transaction for further checks.
|
||||
break
|
||||
}
|
||||
|
||||
txmp.logger.Error(
|
||||
"re-CheckTx transaction mismatch",
|
||||
"got", wtx.tx.Hash(),
|
||||
"expected", types.Tx(tx).Key(),
|
||||
)
|
||||
|
||||
if txmp.recheckCursor == txmp.recheckEnd {
|
||||
// we reached the end of the recheckTx list without finding a tx
|
||||
// matching the one we received from the ABCI application.
|
||||
// Return without processing any tx.
|
||||
txmp.recheckCursor = nil
|
||||
return
|
||||
}
|
||||
|
||||
txmp.recheckCursor = txmp.recheckCursor.Next()
|
||||
wtx = txmp.recheckCursor.Value.(*WrappedTx)
|
||||
}
|
||||
|
||||
// Only evaluate transactions that have not been removed. This can happen
|
||||
// if an existing transaction is evicted during CheckTx and while this
|
||||
// callback is being executed for the same evicted transaction.
|
||||
if !txmp.txStore.IsTxRemoved(wtx.hash) {
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(tx, checkTxRes.CheckTx)
|
||||
}
|
||||
|
||||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.priority = checkTxRes.CheckTx.Priority
|
||||
} else {
|
||||
txmp.logger.Debug(
|
||||
"existing transaction no longer valid; failed re-CheckTx callback",
|
||||
"priority", wtx.priority,
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
)
|
||||
|
||||
if wtx.gossipEl != txmp.recheckCursor {
|
||||
panic("corrupted reCheckTx cursor")
|
||||
}
|
||||
|
||||
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
|
||||
}
|
||||
}
|
||||
|
||||
// move reCheckTx cursor to next element
|
||||
if txmp.recheckCursor == txmp.recheckEnd {
|
||||
txmp.recheckCursor = nil
|
||||
} else {
|
||||
txmp.recheckCursor = txmp.recheckCursor.Next()
|
||||
}
|
||||
|
||||
if txmp.recheckCursor == nil {
|
||||
txmp.logger.Debug("finished rechecking transactions")
|
||||
|
||||
if txmp.Size() > 0 {
|
||||
txmp.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
|
||||
txmp.metrics.Size.Set(float64(txmp.Size()))
|
||||
}
|
||||
|
||||
// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
|
||||
// each transaction, it executes CheckTxAsync. The global callback defined on
|
||||
// the proxyAppConn will be executed for each transaction after CheckTx is
|
||||
// executed.
|
||||
//
|
||||
// NOTE:
|
||||
// - The caller must have a write-lock when executing updateReCheckTxs.
|
||||
func (txmp *TxMempool) updateReCheckTxs() {
|
||||
if txmp.Size() == 0 {
|
||||
panic("attempted to update re-CheckTx txs when mempool is empty")
|
||||
}
|
||||
|
||||
txmp.recheckCursor = txmp.gossipIndex.Front()
|
||||
txmp.recheckEnd = txmp.gossipIndex.Back()
|
||||
|
||||
for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
|
||||
wtx := e.Value.(*WrappedTx)
|
||||
|
||||
// Only execute CheckTx if the transaction is not marked as removed which
|
||||
// could happen if the transaction was evicted.
|
||||
if !txmp.txStore.IsTxRemoved(wtx.hash) {
|
||||
txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{
|
||||
Tx: wtx.tx,
|
||||
Type: abci.CheckTxType_Recheck,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
txmp.proxyAppConn.FlushAsync()
|
||||
}
|
||||
|
||||
// canAddTx returns an error if we cannot insert the provided *WrappedTx into
|
||||
// the mempool due to mempool configured constraints. Otherwise, nil is returned
|
||||
// and the transaction can be inserted into the mempool.
|
||||
func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
|
||||
var (
|
||||
numTxs = txmp.Size()
|
||||
sizeBytes = txmp.SizeBytes()
|
||||
)
|
||||
|
||||
if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
|
||||
return ErrMempoolIsFull{
|
||||
numTxs: numTxs,
|
||||
maxTxs: txmp.config.Size,
|
||||
txsBytes: sizeBytes,
|
||||
maxTxsBytes: txmp.config.MaxTxsBytes,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
|
||||
txmp.txStore.SetTx(wtx)
|
||||
txmp.priorityIndex.PushTx(wtx)
|
||||
txmp.heightIndex.Insert(wtx)
|
||||
txmp.timestampIndex.Insert(wtx)
|
||||
|
||||
// Insert the transaction into the gossip index and mark the reference to the
|
||||
// linked-list element, which will be needed at a later point when the
|
||||
// transaction is removed.
|
||||
gossipEl := txmp.gossipIndex.PushBack(wtx)
|
||||
wtx.gossipEl = gossipEl
|
||||
|
||||
atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
|
||||
if txmp.txStore.IsTxRemoved(wtx.hash) {
|
||||
return
|
||||
}
|
||||
|
||||
txmp.txStore.RemoveTx(wtx)
|
||||
txmp.priorityIndex.RemoveTx(wtx)
|
||||
txmp.heightIndex.Remove(wtx)
|
||||
txmp.timestampIndex.Remove(wtx)
|
||||
|
||||
// Remove the transaction from the gossip index and cleanup the linked-list
|
||||
// element so it can be garbage collected.
|
||||
txmp.gossipIndex.Remove(wtx.gossipEl)
|
||||
wtx.gossipEl.DetachPrev()
|
||||
|
||||
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
|
||||
|
||||
if removeFromCache {
|
||||
txmp.cache.Remove(wtx.tx)
|
||||
}
|
||||
}
|
||||
|
||||
// purgeExpiredTxs removes all transactions that have exceeded their respective
|
||||
// height and/or time based TTLs from their respective indexes. Every expired
|
||||
// transaction will be removed from the mempool entirely, except for the cache.
|
||||
//
|
||||
// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
|
||||
// the caller has a write-lock on the mempool and so we can safely iterate over
|
||||
// the height and time based indexes.
|
||||
func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
|
||||
now := time.Now()
|
||||
expiredTxs := make(map[types.TxKey]*WrappedTx)
|
||||
|
||||
if txmp.config.TTLNumBlocks > 0 {
|
||||
purgeIdx := -1
|
||||
for i, wtx := range txmp.heightIndex.txs {
|
||||
if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
|
||||
expiredTxs[wtx.tx.Key()] = wtx
|
||||
purgeIdx = i
|
||||
} else {
|
||||
// since the index is sorted, we know no other txs can be be purged
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if purgeIdx >= 0 {
|
||||
txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
|
||||
}
|
||||
}
|
||||
|
||||
if txmp.config.TTLDuration > 0 {
|
||||
purgeIdx := -1
|
||||
for i, wtx := range txmp.timestampIndex.txs {
|
||||
if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
|
||||
expiredTxs[wtx.tx.Key()] = wtx
|
||||
purgeIdx = i
|
||||
} else {
|
||||
// since the index is sorted, we know no other txs can be be purged
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if purgeIdx >= 0 {
|
||||
txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
|
||||
}
|
||||
}
|
||||
|
||||
for _, wtx := range expiredTxs {
|
||||
txmp.removeTx(wtx, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) notifyTxsAvailable() {
|
||||
if txmp.Size() == 0 {
|
||||
panic("attempt to notify txs available but mempool is empty!")
|
||||
}
|
||||
|
||||
if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
|
||||
// channel cap is 1, so this will send once
|
||||
txmp.notifiedTxsAvailable = true
|
||||
|
||||
select {
|
||||
case txmp.txsAvailable <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,23 +0,0 @@
|
||||
// The mempool pushes new txs onto the proxyAppConn.
|
||||
// It gets a stream of (req, res) tuples from the proxy.
|
||||
// The mempool stores good txs in a concurrent linked-list.
|
||||
|
||||
// Multiple concurrent go-routines can traverse this linked-list
|
||||
// safely by calling .NextWait() on each element.
|
||||
|
||||
// So we have several go-routines:
|
||||
// 1. Consensus calling Update() and ReapMaxBytesMaxGas() synchronously
|
||||
// 2. Many mempool reactor's peer routines calling CheckTx()
|
||||
// 3. Many mempool reactor's peer routines traversing the txs linked list
|
||||
|
||||
// To manage these goroutines, there are three methods of locking.
|
||||
// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
|
||||
// 2. Mutations to the linked-list elements are atomic
|
||||
// 3. CheckTx() and/or ReapMaxBytesMaxGas() calls can be paused upon Update(), protected by .updateMtx
|
||||
|
||||
// Garbage collection of old elements from mempool.txs is handlde via the
|
||||
// DetachPrev() call, which makes old elements not reachable by peer
|
||||
// broadcastTxRoutine().
|
||||
|
||||
// TODO: Better handle abci client errors. (make it automatically handle connection errors)
|
||||
package mempool
|
||||
@@ -1,52 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrTxInCache is returned to the client if we saw tx earlier
|
||||
ErrTxInCache = errors.New("tx already exists in cache")
|
||||
)
|
||||
|
||||
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
|
||||
type ErrTxTooLarge struct {
|
||||
max int
|
||||
actual int
|
||||
}
|
||||
|
||||
func (e ErrTxTooLarge) Error() string {
|
||||
return fmt.Sprintf("Tx too large. Max size is %d, but got %d", e.max, e.actual)
|
||||
}
|
||||
|
||||
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
|
||||
type ErrMempoolIsFull struct {
|
||||
numTxs int
|
||||
maxTxs int
|
||||
|
||||
txsBytes int64
|
||||
maxTxsBytes int64
|
||||
}
|
||||
|
||||
func (e ErrMempoolIsFull) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)",
|
||||
e.numTxs, e.maxTxs,
|
||||
e.txsBytes, e.maxTxsBytes)
|
||||
}
|
||||
|
||||
// ErrPreCheck is returned when tx is too big
|
||||
type ErrPreCheck struct {
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (e ErrPreCheck) Error() string {
|
||||
return e.Reason.Error()
|
||||
}
|
||||
|
||||
// IsPreCheckError returns true if err is due to pre check failure.
|
||||
func IsPreCheckError(err error) bool {
|
||||
_, ok := err.(ErrPreCheck)
|
||||
return ok
|
||||
}
|
||||
@@ -1,127 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Mempool defines the mempool interface.
|
||||
//
|
||||
// Updates to the mempool need to be synchronized with committing a block so
|
||||
// apps can reset their transient state on Commit.
|
||||
type Mempool interface {
|
||||
// CheckTx executes a new transaction against the application to determine
|
||||
// its validity and whether it should be added to the mempool.
|
||||
CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error
|
||||
|
||||
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
|
||||
// bytes total with the condition that the total gasWanted must be less than
|
||||
// maxGas.
|
||||
// If both maxes are negative, there is no cap on the size of all returned
|
||||
// transactions (~ all available transactions).
|
||||
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
|
||||
|
||||
// ReapMaxTxs reaps up to max transactions from the mempool.
|
||||
// If max is negative, there is no cap on the size of all returned
|
||||
// transactions (~ all available transactions).
|
||||
ReapMaxTxs(max int) types.Txs
|
||||
|
||||
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
|
||||
Lock()
|
||||
|
||||
// Unlock unlocks the mempool.
|
||||
Unlock()
|
||||
|
||||
// Update informs the mempool that the given txs were committed and can be discarded.
|
||||
// NOTE: this should be called *after* block is committed by consensus.
|
||||
// NOTE: Lock/Unlock must be managed by caller
|
||||
Update(
|
||||
blockHeight int64,
|
||||
blockTxs types.Txs,
|
||||
deliverTxResponses []*abci.ResponseDeliverTx,
|
||||
newPreFn PreCheckFunc,
|
||||
newPostFn PostCheckFunc,
|
||||
) error
|
||||
|
||||
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
|
||||
// done. E.g. from CheckTx.
|
||||
// NOTE: Lock/Unlock must be managed by caller
|
||||
FlushAppConn() error
|
||||
|
||||
// Flush removes all transactions from the mempool and cache
|
||||
Flush()
|
||||
|
||||
// TxsAvailable returns a channel which fires once for every height,
|
||||
// and only when transactions are available in the mempool.
|
||||
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
|
||||
TxsAvailable() <-chan struct{}
|
||||
|
||||
// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will
|
||||
// trigger once every height when transactions are available.
|
||||
EnableTxsAvailable()
|
||||
|
||||
// Size returns the number of transactions in the mempool.
|
||||
Size() int
|
||||
|
||||
// SizeBytes returns the total size of all txs in the mempool.
|
||||
SizeBytes() int64
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// PreCheckFunc is an optional filter executed before CheckTx and rejects
|
||||
// transaction if false is returned. An example would be to ensure that a
|
||||
// transaction doesn't exceeded the block size.
|
||||
type PreCheckFunc func(types.Tx) error
|
||||
|
||||
// PostCheckFunc is an optional filter executed after CheckTx and rejects
|
||||
// transaction if false is returned. An example would be to ensure a
|
||||
// transaction doesn't require more gas than available for the block.
|
||||
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error
|
||||
|
||||
// TxInfo are parameters that get passed when attempting to add a tx to the
|
||||
// mempool.
|
||||
type TxInfo struct {
|
||||
// SenderID is the internal peer ID used in the mempool to identify the
|
||||
// sender, storing 2 bytes with each tx instead of 20 bytes for the p2p.ID.
|
||||
SenderID uint16
|
||||
// SenderP2PID is the actual p2p.ID of the sender, used e.g. for logging.
|
||||
SenderP2PID p2p.ID
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.
|
||||
func PreCheckMaxBytes(maxBytes int64) PreCheckFunc {
|
||||
return func(tx types.Tx) error {
|
||||
txSize := types.ComputeProtoSizeForTxs([]types.Tx{tx})
|
||||
|
||||
if txSize > maxBytes {
|
||||
return fmt.Errorf("tx size is too big: %d, max: %d",
|
||||
txSize, maxBytes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
|
||||
// maxGas. Returns nil if maxGas is -1.
|
||||
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
|
||||
return func(tx types.Tx, res *abci.ResponseCheckTx) error {
|
||||
if maxGas == -1 {
|
||||
return nil
|
||||
}
|
||||
if res.GasWanted < 0 {
|
||||
return fmt.Errorf("gas wanted %d is negative",
|
||||
res.GasWanted)
|
||||
}
|
||||
if res.GasWanted > maxGas {
|
||||
return fmt.Errorf("gas wanted %d is greater than max gas %d",
|
||||
res.GasWanted, maxGas)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -1,108 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/discard"
|
||||
"github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
|
||||
// package.
|
||||
MetricsSubsystem = "mempool"
|
||||
)
|
||||
|
||||
// Metrics contains metrics exposed by this package.
|
||||
// see MetricsProvider for descriptions.
|
||||
type Metrics struct {
|
||||
// Size of the mempool.
|
||||
Size metrics.Gauge
|
||||
|
||||
// Histogram of transaction sizes, in bytes.
|
||||
TxSizeBytes metrics.Histogram
|
||||
|
||||
// Number of failed transactions.
|
||||
FailedTxs metrics.Counter
|
||||
|
||||
// RejectedTxs defines the number of rejected transactions. These are
|
||||
// transactions that passed CheckTx but failed to make it into the mempool
|
||||
// due to resource limits, e.g. mempool is full and no lower priority
|
||||
// transactions exist in the mempool.
|
||||
RejectedTxs metrics.Counter
|
||||
|
||||
// EvictedTxs defines the number of evicted transactions. These are valid
|
||||
// transactions that passed CheckTx and existed in the mempool but were later
|
||||
// evicted to make room for higher priority valid transactions that passed
|
||||
// CheckTx.
|
||||
EvictedTxs metrics.Counter
|
||||
|
||||
// Number of times transactions are rechecked in the mempool.
|
||||
RecheckTimes metrics.Counter
|
||||
}
|
||||
|
||||
// PrometheusMetrics returns Metrics build using Prometheus client library.
|
||||
// Optionally, labels can be provided along with their values ("foo",
|
||||
// "fooValue").
|
||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
labels := []string{}
|
||||
for i := 0; i < len(labelsAndValues); i += 2 {
|
||||
labels = append(labels, labelsAndValues[i])
|
||||
}
|
||||
return &Metrics{
|
||||
Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "size",
|
||||
Help: "Size of the mempool (number of uncommitted transactions).",
|
||||
}, labels).With(labelsAndValues...),
|
||||
|
||||
TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "tx_size_bytes",
|
||||
Help: "Transaction sizes in bytes.",
|
||||
Buckets: stdprometheus.ExponentialBuckets(1, 3, 17),
|
||||
}, labels).With(labelsAndValues...),
|
||||
|
||||
FailedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "failed_txs",
|
||||
Help: "Number of failed transactions.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
|
||||
RejectedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "rejected_txs",
|
||||
Help: "Number of rejected transactions.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
|
||||
EvictedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "evicted_txs",
|
||||
Help: "Number of evicted transactions.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
|
||||
RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "recheck_times",
|
||||
Help: "Number of times transactions are rechecked in the mempool.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
// NopMetrics returns no-op Metrics.
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Size: discard.NewGauge(),
|
||||
TxSizeBytes: discard.NewHistogram(),
|
||||
FailedTxs: discard.NewCounter(),
|
||||
RejectedTxs: discard.NewCounter(),
|
||||
EvictedTxs: discard.NewCounter(),
|
||||
RecheckTimes: discard.NewCounter(),
|
||||
}
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Mempool is an empty implementation of a Mempool, useful for testing.
|
||||
type Mempool struct{}
|
||||
|
||||
var _ mempl.Mempool = Mempool{}
|
||||
|
||||
func (Mempool) Lock() {}
|
||||
func (Mempool) Unlock() {}
|
||||
func (Mempool) Size() int { return 0 }
|
||||
func (Mempool) SizeBytes() int64 { return 0 }
|
||||
func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
|
||||
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
|
||||
func (Mempool) Update(
|
||||
_ int64,
|
||||
_ types.Txs,
|
||||
_ []*abci.ResponseDeliverTx,
|
||||
_ mempl.PreCheckFunc,
|
||||
_ mempl.PostCheckFunc,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
func (Mempool) Flush() {}
|
||||
func (Mempool) FlushAppConn() error { return nil }
|
||||
func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
|
||||
func (Mempool) EnableTxsAvailable() {}
|
||||
func (Mempool) TxsBytes() int64 { return 0 }
|
||||
|
||||
func (Mempool) TxsFront() *clist.CElement { return nil }
|
||||
func (Mempool) TxsWaitChan() <-chan struct{} { return nil }
|
||||
|
||||
func (Mempool) InitWAL() error { return nil }
|
||||
func (Mempool) CloseWAL() {}
|
||||
|
||||
// RemoveTxByKey implements mempool.Mempool
|
||||
func (Mempool) RemoveTxByKey(txKey types.TxKey) error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,159 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sort"
|
||||
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
)
|
||||
|
||||
var _ heap.Interface = (*TxPriorityQueue)(nil)
|
||||
|
||||
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
|
||||
type TxPriorityQueue struct {
|
||||
mtx tmsync.RWMutex
|
||||
txs []*WrappedTx
|
||||
}
|
||||
|
||||
func NewTxPriorityQueue() *TxPriorityQueue {
|
||||
pq := &TxPriorityQueue{
|
||||
txs: make([]*WrappedTx, 0),
|
||||
}
|
||||
|
||||
heap.Init(pq)
|
||||
|
||||
return pq
|
||||
}
|
||||
|
||||
// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
|
||||
// evicted to make room for another *WrappedTx with higher priority. If no such
|
||||
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx
|
||||
// indicate that these transactions can be removed due to them being of lower
|
||||
// priority and that their total sum in size allows room for the incoming
|
||||
// transaction according to the mempool's configured limits.
|
||||
func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx {
|
||||
pq.mtx.RLock()
|
||||
defer pq.mtx.RUnlock()
|
||||
|
||||
txs := make([]*WrappedTx, len(pq.txs))
|
||||
copy(txs, pq.txs)
|
||||
|
||||
sort.Slice(txs, func(i, j int) bool {
|
||||
return txs[i].priority < txs[j].priority
|
||||
})
|
||||
|
||||
var (
|
||||
toEvict []*WrappedTx
|
||||
i int
|
||||
)
|
||||
|
||||
currSize := totalSize
|
||||
|
||||
// Loop over all transactions in ascending priority order evaluating those
|
||||
// that are only of less priority than the provided argument. We continue
|
||||
// evaluating transactions until there is sufficient capacity for the new
|
||||
// transaction (size) as defined by txSize.
|
||||
for i < len(txs) && txs[i].priority < priority {
|
||||
toEvict = append(toEvict, txs[i])
|
||||
currSize -= int64(txs[i].Size())
|
||||
|
||||
if currSize+txSize <= cap {
|
||||
return toEvict
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NumTxs returns the number of transactions in the priority queue. It is
|
||||
// thread safe.
|
||||
func (pq *TxPriorityQueue) NumTxs() int {
|
||||
pq.mtx.RLock()
|
||||
defer pq.mtx.RUnlock()
|
||||
|
||||
return len(pq.txs)
|
||||
}
|
||||
|
||||
// RemoveTx removes a specific transaction from the priority queue.
|
||||
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
|
||||
pq.mtx.Lock()
|
||||
defer pq.mtx.Unlock()
|
||||
|
||||
if tx.heapIndex < len(pq.txs) {
|
||||
heap.Remove(pq, tx.heapIndex)
|
||||
}
|
||||
}
|
||||
|
||||
// PushTx adds a valid transaction to the priority queue. It is thread safe.
|
||||
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
|
||||
pq.mtx.Lock()
|
||||
defer pq.mtx.Unlock()
|
||||
|
||||
heap.Push(pq, tx)
|
||||
}
|
||||
|
||||
// PopTx removes the top priority transaction from the queue. It is thread safe.
|
||||
func (pq *TxPriorityQueue) PopTx() *WrappedTx {
|
||||
pq.mtx.Lock()
|
||||
defer pq.mtx.Unlock()
|
||||
|
||||
x := heap.Pop(pq)
|
||||
if x != nil {
|
||||
return x.(*WrappedTx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Push implements the Heap interface.
|
||||
//
|
||||
// NOTE: A caller should never call Push. Use PushTx instead.
|
||||
func (pq *TxPriorityQueue) Push(x interface{}) {
|
||||
n := len(pq.txs)
|
||||
item := x.(*WrappedTx)
|
||||
item.heapIndex = n
|
||||
pq.txs = append(pq.txs, item)
|
||||
}
|
||||
|
||||
// Pop implements the Heap interface.
|
||||
//
|
||||
// NOTE: A caller should never call Pop. Use PopTx instead.
|
||||
func (pq *TxPriorityQueue) Pop() interface{} {
|
||||
old := pq.txs
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
old[n-1] = nil // avoid memory leak
|
||||
item.heapIndex = -1 // for safety
|
||||
pq.txs = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// Len implements the Heap interface.
|
||||
//
|
||||
// NOTE: A caller should never call Len. Use NumTxs instead.
|
||||
func (pq *TxPriorityQueue) Len() int {
|
||||
return len(pq.txs)
|
||||
}
|
||||
|
||||
// Less implements the Heap interface. It returns true if the transaction at
|
||||
// position i in the queue is of less priority than the transaction at position j.
|
||||
func (pq *TxPriorityQueue) Less(i, j int) bool {
|
||||
// If there exists two transactions with the same priority, consider the one
|
||||
// that we saw the earliest as the higher priority transaction.
|
||||
if pq.txs[i].priority == pq.txs[j].priority {
|
||||
return pq.txs[i].timestamp.Before(pq.txs[j].timestamp)
|
||||
}
|
||||
|
||||
// We want Pop to give us the highest, not lowest, priority so we use greater
|
||||
// than here.
|
||||
return pq.txs[i].priority > pq.txs[j].priority
|
||||
}
|
||||
|
||||
// Swap implements the Heap interface. It swaps two transactions in the queue.
|
||||
func (pq *TxPriorityQueue) Swap(i, j int) {
|
||||
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
|
||||
pq.txs[i].heapIndex = i
|
||||
pq.txs[j].heapIndex = j
|
||||
}
|
||||
@@ -1,323 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
MempoolChannel = byte(0x30)
|
||||
|
||||
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
|
||||
|
||||
// UnknownPeerID is the peer ID to use when running CheckTx when there is
|
||||
// no peer (e.g. RPC)
|
||||
UnknownPeerID uint16 = 0
|
||||
|
||||
maxActiveIDs = math.MaxUint16
|
||||
)
|
||||
|
||||
// Reactor handles mempool tx broadcasting amongst peers.
|
||||
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
|
||||
// peers you received it from.
|
||||
type Reactor struct {
|
||||
p2p.BaseReactor
|
||||
config *cfg.MempoolConfig
|
||||
mempool *TxMempool
|
||||
ids *mempoolIDs
|
||||
}
|
||||
|
||||
type mempoolIDs struct {
|
||||
mtx tmsync.RWMutex
|
||||
peerMap map[p2p.ID]uint16
|
||||
nextID uint16 // assumes that a node will never have over 65536 active peers
|
||||
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
|
||||
}
|
||||
|
||||
// Reserve searches for the next unused ID and assigns it to the
|
||||
// peer.
|
||||
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
|
||||
ids.mtx.Lock()
|
||||
defer ids.mtx.Unlock()
|
||||
|
||||
curID := ids.nextPeerID()
|
||||
ids.peerMap[peer.ID()] = curID
|
||||
ids.activeIDs[curID] = struct{}{}
|
||||
}
|
||||
|
||||
// nextPeerID returns the next unused peer ID to use.
|
||||
// This assumes that ids's mutex is already locked.
|
||||
func (ids *mempoolIDs) nextPeerID() uint16 {
|
||||
if len(ids.activeIDs) == maxActiveIDs {
|
||||
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs))
|
||||
}
|
||||
|
||||
_, idExists := ids.activeIDs[ids.nextID]
|
||||
for idExists {
|
||||
ids.nextID++
|
||||
_, idExists = ids.activeIDs[ids.nextID]
|
||||
}
|
||||
curID := ids.nextID
|
||||
ids.nextID++
|
||||
return curID
|
||||
}
|
||||
|
||||
// Reclaim returns the ID reserved for the peer back to unused pool.
|
||||
func (ids *mempoolIDs) Reclaim(peer p2p.Peer) {
|
||||
ids.mtx.Lock()
|
||||
defer ids.mtx.Unlock()
|
||||
|
||||
removedID, ok := ids.peerMap[peer.ID()]
|
||||
if ok {
|
||||
delete(ids.activeIDs, removedID)
|
||||
delete(ids.peerMap, peer.ID())
|
||||
}
|
||||
}
|
||||
|
||||
// GetForPeer returns an ID reserved for the peer.
|
||||
func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 {
|
||||
ids.mtx.RLock()
|
||||
defer ids.mtx.RUnlock()
|
||||
|
||||
return ids.peerMap[peer.ID()]
|
||||
}
|
||||
|
||||
func newMempoolIDs() *mempoolIDs {
|
||||
return &mempoolIDs{
|
||||
peerMap: make(map[p2p.ID]uint16),
|
||||
activeIDs: map[uint16]struct{}{0: {}},
|
||||
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
|
||||
}
|
||||
}
|
||||
|
||||
// NewReactor returns a new Reactor with the given config and mempool.
|
||||
func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor {
|
||||
memR := &Reactor{
|
||||
config: config,
|
||||
mempool: mempool,
|
||||
ids: newMempoolIDs(),
|
||||
}
|
||||
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
|
||||
return memR
|
||||
}
|
||||
|
||||
// InitPeer implements Reactor by creating a state for the peer.
|
||||
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
|
||||
memR.ids.ReserveForPeer(peer)
|
||||
return peer
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger on the reactor and the underlying mempool.
|
||||
func (memR *Reactor) SetLogger(l log.Logger) {
|
||||
memR.Logger = l
|
||||
memR.mempool.SetLogger(l)
|
||||
}
|
||||
|
||||
// OnStart implements p2p.BaseReactor.
|
||||
func (memR *Reactor) OnStart() error {
|
||||
if !memR.config.Broadcast {
|
||||
memR.Logger.Info("Tx broadcasting is disabled")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetChannels implements Reactor by returning the list of channels for this
|
||||
// reactor.
|
||||
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
largestTx := make([]byte, memR.config.MaxTxBytes)
|
||||
batchMsg := protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
Txs: &protomem.Txs{Txs: [][]byte{largestTx}},
|
||||
},
|
||||
}
|
||||
|
||||
return []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: MempoolChannel,
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// AddPeer implements Reactor.
|
||||
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
|
||||
func (memR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
if memR.config.Broadcast {
|
||||
go memR.broadcastTxRoutine(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// RemovePeer implements Reactor.
|
||||
func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
memR.ids.Reclaim(peer)
|
||||
// broadcast routine checks if peer is gone and returns
|
||||
}
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
msg, err := memR.decodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
memR.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
|
||||
|
||||
txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)}
|
||||
if src != nil {
|
||||
txInfo.SenderP2PID = src.ID()
|
||||
}
|
||||
for _, tx := range msg.Txs {
|
||||
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
||||
if err == ErrTxInCache {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
// broadcasting happens from go routines per peer
|
||||
}
|
||||
|
||||
// PeerState describes the state of a peer.
|
||||
type PeerState interface {
|
||||
GetHeight() int64
|
||||
}
|
||||
|
||||
// Send new mempool txs to peer.
|
||||
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
peerID := memR.ids.GetForPeer(peer)
|
||||
var next *clist.CElement
|
||||
|
||||
for {
|
||||
// In case of both next.NextWaitChan() and peer.Quit() are variable at the same time
|
||||
if !memR.IsRunning() || !peer.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
// This happens because the CElement we were looking at got garbage
|
||||
// collected (removed). That is, .NextWait() returned nil. Go ahead and
|
||||
// start from the beginning.
|
||||
if next == nil {
|
||||
select {
|
||||
case <-memR.mempool.WaitForNextTx(): // Wait until a tx is available
|
||||
if next = memR.mempool.NextGossipTx(); next == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
case <-peer.Quit():
|
||||
return
|
||||
|
||||
case <-memR.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the peer is up to date.
|
||||
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
|
||||
if !ok {
|
||||
// Peer does not have a state yet. We set it in the consensus reactor, but
|
||||
// when we add peer in Switch, the order we call reactors#AddPeer is
|
||||
// different every time due to us using a map. Sometimes other reactors
|
||||
// will be initialized before the consensus reactor. We should wait a few
|
||||
// milliseconds and retry.
|
||||
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
// Allow for a lag of 1 block.
|
||||
memTx := next.Value.(*WrappedTx)
|
||||
if peerState.GetHeight() < memTx.height-1 {
|
||||
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
// NOTE: Transaction batching was disabled due to
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
if ok := memR.mempool.txStore.TxHasPeer(memTx.hash, peerID); !ok {
|
||||
msg := protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
},
|
||||
}
|
||||
|
||||
bz, err := msg.Marshal()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
success := peer.Send(MempoolChannel, bz)
|
||||
if !success {
|
||||
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-next.NextWaitChan():
|
||||
// see the start of the for loop for nil check
|
||||
next = next.Next()
|
||||
|
||||
case <-peer.Quit():
|
||||
return
|
||||
|
||||
case <-memR.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
msg := protomem.Message{}
|
||||
err := msg.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return TxsMessage{}, err
|
||||
}
|
||||
|
||||
var message TxsMessage
|
||||
|
||||
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
||||
txs := i.Txs.GetTxs()
|
||||
|
||||
if len(txs) == 0 {
|
||||
return message, errors.New("empty TxsMessage")
|
||||
}
|
||||
|
||||
decoded := make([]types.Tx, len(txs))
|
||||
for j, tx := range txs {
|
||||
decoded[j] = types.Tx(tx)
|
||||
}
|
||||
|
||||
message = TxsMessage{
|
||||
Txs: decoded,
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
return message, fmt.Errorf("msg type: %T is not supported", msg)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// TxsMessage is a Message containing transactions.
|
||||
type TxsMessage struct {
|
||||
Txs []types.Tx
|
||||
}
|
||||
|
||||
// String returns a string representation of the TxsMessage.
|
||||
func (m *TxsMessage) String() string {
|
||||
return fmt.Sprintf("[TxsMessage %v]", m.Txs)
|
||||
}
|
||||
@@ -1,387 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/go-kit/log/term"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/p2p/mock"
|
||||
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
numTxs = 1000
|
||||
timeout = 120 * time.Second // ridiculously high because CircleCI is slow
|
||||
)
|
||||
|
||||
type peerState struct {
|
||||
height int64
|
||||
}
|
||||
|
||||
func (ps peerState) GetHeight() int64 {
|
||||
return ps.height
|
||||
}
|
||||
|
||||
// Send a bunch of txs to the first reactor's mempool and wait for them all to
|
||||
// be received in the others.
|
||||
func TestReactorBroadcastTxsMessage(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
// if there were more than two reactors, the order of transactions could not be
|
||||
// asserted in waitForTxsOnReactors (due to transactions gossiping). If we
|
||||
// replace Connect2Switches (full mesh) with a func, which connects first
|
||||
// reactor to others and nothing else, this test should also pass with >2 reactors.
|
||||
const N = 2
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
defer func() {
|
||||
for _, r := range reactors {
|
||||
if err := r.Stop(); err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
for _, r := range reactors {
|
||||
for _, peer := range r.Switch.Peers().List() {
|
||||
peer.Set(types.PeerStateKey, peerState{1})
|
||||
}
|
||||
}
|
||||
|
||||
txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID)
|
||||
waitForTxsOnReactors(t, txs, reactors)
|
||||
}
|
||||
|
||||
// // regression test for https://github.com/tendermint/tendermint/issues/5408
|
||||
// func TestReactorConcurrency(t *testing.T) {
|
||||
// config := cfg.TestConfig()
|
||||
// const N = 2
|
||||
// reactors := makeAndConnectReactors(config, N)
|
||||
// defer func() {
|
||||
// for _, r := range reactors {
|
||||
// if err := r.Stop(); err != nil {
|
||||
// assert.NoError(t, err)
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
// for _, r := range reactors {
|
||||
// for _, peer := range r.Switch.Peers().List() {
|
||||
// peer.Set(types.PeerStateKey, peerState{1})
|
||||
// }
|
||||
// }
|
||||
// var wg sync.WaitGroup
|
||||
|
||||
// const numTxs = 5
|
||||
|
||||
// for i := 0; i < 1000; i++ {
|
||||
// wg.Add(2)
|
||||
|
||||
// // 1. submit a bunch of txs
|
||||
// // 2. update the whole mempool
|
||||
// txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID)
|
||||
// go func() {
|
||||
// defer wg.Done()
|
||||
|
||||
// reactors[0].mempool.Lock()
|
||||
// defer reactors[0].mempool.Unlock()
|
||||
|
||||
// deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs))
|
||||
// for i := range txs {
|
||||
// deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0}
|
||||
// }
|
||||
|
||||
// err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil)
|
||||
// assert.NoError(t, err)
|
||||
// }()
|
||||
|
||||
// // 1. submit a bunch of txs
|
||||
// // 2. update none
|
||||
// _ = checkTxs(t, reactors[1].mempool, numTxs, UnknownPeerID)
|
||||
// go func() {
|
||||
// defer wg.Done()
|
||||
|
||||
// reactors[1].mempool.Lock()
|
||||
// defer reactors[1].mempool.Unlock()
|
||||
// err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil)
|
||||
// assert.NoError(t, err)
|
||||
// }()
|
||||
|
||||
// // 1. flush the mempool
|
||||
// reactors[1].mempool.Flush()
|
||||
// }
|
||||
|
||||
// wg.Wait()
|
||||
// }
|
||||
|
||||
// Send a bunch of txs to the first reactor's mempool, claiming it came from peer
|
||||
// ensure peer gets no txs.
|
||||
func TestReactorNoBroadcastToSender(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
const N = 2
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
defer func() {
|
||||
for _, r := range reactors {
|
||||
if err := r.Stop(); err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
for _, r := range reactors {
|
||||
for _, peer := range r.Switch.Peers().List() {
|
||||
peer.Set(types.PeerStateKey, peerState{1})
|
||||
}
|
||||
}
|
||||
|
||||
const peerID = 1
|
||||
checkTxs(t, reactors[0].mempool, numTxs, peerID)
|
||||
ensureNoTxs(t, reactors[peerID], 100*time.Millisecond)
|
||||
}
|
||||
|
||||
// func TestReactor_MaxTxBytes(t *testing.T) {
|
||||
// config := cfg.TestConfig()
|
||||
|
||||
// const N = 2
|
||||
// reactors := makeAndConnectReactors(config, N)
|
||||
// defer func() {
|
||||
// for _, r := range reactors {
|
||||
// if err := r.Stop(); err != nil {
|
||||
// assert.NoError(t, err)
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
// for _, r := range reactors {
|
||||
// for _, peer := range r.Switch.Peers().List() {
|
||||
// peer.Set(types.PeerStateKey, peerState{1})
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Broadcast a tx, which has the max size
|
||||
// // => ensure it's received by the second reactor.
|
||||
// tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes)
|
||||
// err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID})
|
||||
// require.NoError(t, err)
|
||||
// waitForTxsOnReactors(t, []types.Tx{tx1}, reactors)
|
||||
|
||||
// reactors[0].mempool.Flush()
|
||||
// reactors[1].mempool.Flush()
|
||||
|
||||
// // Broadcast a tx, which is beyond the max size
|
||||
// // => ensure it's not sent
|
||||
// tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1)
|
||||
// err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID})
|
||||
// require.Error(t, err)
|
||||
// }
|
||||
|
||||
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
config := cfg.TestConfig()
|
||||
const N = 2
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
defer func() {
|
||||
for _, r := range reactors {
|
||||
if err := r.Stop(); err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// stop peer
|
||||
sw := reactors[1].Switch
|
||||
sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason"))
|
||||
|
||||
// check that we are not leaking any go-routines
|
||||
// i.e. broadcastTxRoutine finishes when peer is stopped
|
||||
leaktest.CheckTimeout(t, 10*time.Second)()
|
||||
}
|
||||
|
||||
func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
config := cfg.TestConfig()
|
||||
const N = 2
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
|
||||
// stop reactors
|
||||
for _, r := range reactors {
|
||||
if err := r.Stop(); err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// check that we are not leaking any go-routines
|
||||
// i.e. broadcastTxRoutine finishes when reactor is stopped
|
||||
leaktest.CheckTimeout(t, 10*time.Second)()
|
||||
}
|
||||
|
||||
func TestMempoolIDsBasic(t *testing.T) {
|
||||
ids := newMempoolIDs()
|
||||
|
||||
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
|
||||
|
||||
ids.ReserveForPeer(peer)
|
||||
assert.EqualValues(t, 1, ids.GetForPeer(peer))
|
||||
ids.Reclaim(peer)
|
||||
|
||||
ids.ReserveForPeer(peer)
|
||||
assert.EqualValues(t, 2, ids.GetForPeer(peer))
|
||||
ids.Reclaim(peer)
|
||||
}
|
||||
|
||||
func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
|
||||
// 0 is already reserved for UnknownPeerID
|
||||
ids := newMempoolIDs()
|
||||
|
||||
for i := 0; i < maxActiveIDs-1; i++ {
|
||||
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
|
||||
ids.ReserveForPeer(peer)
|
||||
}
|
||||
|
||||
assert.Panics(t, func() {
|
||||
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
|
||||
ids.ReserveForPeer(peer)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
const N = 1
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
defer func() {
|
||||
for _, r := range reactors {
|
||||
if err := r.Stop(); err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
reactor := reactors[0]
|
||||
|
||||
for i := 0; i < maxActiveIDs+1; i++ {
|
||||
peer := mock.NewPeer(nil)
|
||||
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
|
||||
reactor.AddPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// mempoolLogger is a TestingLogger which uses a different
|
||||
// color for each validator ("validator" key must exist).
|
||||
func mempoolLogger() log.Logger {
|
||||
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
|
||||
for i := 0; i < len(keyvals)-1; i += 2 {
|
||||
if keyvals[i] == "validator" {
|
||||
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
|
||||
}
|
||||
}
|
||||
return term.FgBgColor{}
|
||||
})
|
||||
}
|
||||
|
||||
// connect N mempool reactors through N switches
|
||||
func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
|
||||
reactors := make([]*Reactor, n)
|
||||
logger := mempoolLogger()
|
||||
for i := 0; i < n; i++ {
|
||||
app := kvstore.NewApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
mempool, cleanup := newMempoolWithApp(cc)
|
||||
defer cleanup()
|
||||
|
||||
reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
|
||||
reactors[i].SetLogger(logger.With("validator", i))
|
||||
}
|
||||
|
||||
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("MEMPOOL", reactors[i])
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
return reactors
|
||||
}
|
||||
|
||||
func waitForTxsOnReactors(t *testing.T, txs []testTx, reactors []*Reactor) {
|
||||
// wait for the txs in all mempools
|
||||
wg := new(sync.WaitGroup)
|
||||
for i, reactor := range reactors {
|
||||
wg.Add(1)
|
||||
go func(r *Reactor, reactorIndex int) {
|
||||
defer wg.Done()
|
||||
waitForTxsOnReactor(t, txs, r, reactorIndex)
|
||||
}(reactor, i)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
timer := time.After(timeout)
|
||||
select {
|
||||
case <-timer:
|
||||
t.Fatal("Timed out waiting for txs")
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func waitForTxsOnReactor(t *testing.T, txs []testTx, reactor *Reactor, reactorIndex int) {
|
||||
mempool := reactor.mempool
|
||||
for mempool.Size() < len(txs) {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
reapedTxs := mempool.ReapMaxTxs(len(txs))
|
||||
for i, tx := range txs {
|
||||
assert.Equalf(t, tx, reapedTxs[i],
|
||||
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// ensure no txs on reactor after some timeout
|
||||
func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) {
|
||||
time.Sleep(timeout) // wait for the txs in all mempools
|
||||
assert.Zero(t, reactor.mempool.Size())
|
||||
}
|
||||
|
||||
func TestMempoolVectors(t *testing.T) {
|
||||
testCases := []struct {
|
||||
testName string
|
||||
tx []byte
|
||||
expBytes string
|
||||
}{
|
||||
{"tx 1", []byte{123}, "0a030a017b"},
|
||||
{"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
|
||||
msg := memproto.Message{
|
||||
Sum: &memproto.Message_Txs{
|
||||
Txs: &memproto.Txs{Txs: [][]byte{tc.tx}},
|
||||
},
|
||||
}
|
||||
bz, err := msg.Marshal()
|
||||
require.NoError(t, err, tc.testName)
|
||||
|
||||
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
|
||||
}
|
||||
}
|
||||
@@ -1,281 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// WrappedTx defines a wrapper around a raw transaction with additional metadata
|
||||
// that is used for indexing.
|
||||
type WrappedTx struct {
|
||||
// tx represents the raw binary transaction data
|
||||
tx types.Tx
|
||||
|
||||
// hash defines the transaction hash and the primary key used in the mempool
|
||||
hash types.TxKey
|
||||
|
||||
// height defines the height at which the transaction was validated at
|
||||
height int64
|
||||
|
||||
// gasWanted defines the amount of gas the transaction sender requires
|
||||
gasWanted int64
|
||||
|
||||
// priority defines the transaction's priority as specified by the application
|
||||
// in the ResponseCheckTx response.
|
||||
priority int64
|
||||
|
||||
// sender defines the transaction's sender as specified by the application in
|
||||
// the ResponseCheckTx response.
|
||||
sender string
|
||||
|
||||
// timestamp is the time at which the node first received the transaction from
|
||||
// a peer. It is used as a second dimension is prioritizing transactions when
|
||||
// two transactions have the same priority.
|
||||
timestamp time.Time
|
||||
|
||||
// peers records a mapping of all peers that sent a given transaction
|
||||
peers map[uint16]struct{}
|
||||
|
||||
// heapIndex defines the index of the item in the heap
|
||||
heapIndex int
|
||||
|
||||
// gossipEl references the linked-list element in the gossip index
|
||||
gossipEl *clist.CElement
|
||||
|
||||
// removed marks the transaction as removed from the mempool. This is set
|
||||
// during RemoveTx and is needed due to the fact that a given existing
|
||||
// transaction in the mempool can be evicted when it is simultaneously having
|
||||
// a reCheckTx callback executed.
|
||||
removed bool
|
||||
}
|
||||
|
||||
func (wtx *WrappedTx) Size() int {
|
||||
return len(wtx.tx)
|
||||
}
|
||||
|
||||
// TxStore implements a thread-safe mapping of valid transaction(s).
|
||||
//
|
||||
// NOTE:
|
||||
// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative
|
||||
// access is not allowed. Regardless, it is not expected for the mempool to
|
||||
// need mutative access.
|
||||
type TxStore struct {
|
||||
mtx tmsync.RWMutex
|
||||
hashTxs map[types.TxKey]*WrappedTx // primary index
|
||||
senderTxs map[string]*WrappedTx // sender is defined by the ABCI application
|
||||
}
|
||||
|
||||
func NewTxStore() *TxStore {
|
||||
return &TxStore{
|
||||
senderTxs: make(map[string]*WrappedTx),
|
||||
hashTxs: make(map[types.TxKey]*WrappedTx),
|
||||
}
|
||||
}
|
||||
|
||||
// Size returns the total number of transactions in the store.
|
||||
func (txs *TxStore) Size() int {
|
||||
txs.mtx.RLock()
|
||||
defer txs.mtx.RUnlock()
|
||||
|
||||
return len(txs.hashTxs)
|
||||
}
|
||||
|
||||
// GetAllTxs returns all the transactions currently in the store.
|
||||
func (txs *TxStore) GetAllTxs() []*WrappedTx {
|
||||
txs.mtx.RLock()
|
||||
defer txs.mtx.RUnlock()
|
||||
|
||||
wTxs := make([]*WrappedTx, len(txs.hashTxs))
|
||||
i := 0
|
||||
for _, wtx := range txs.hashTxs {
|
||||
wTxs[i] = wtx
|
||||
i++
|
||||
}
|
||||
|
||||
return wTxs
|
||||
}
|
||||
|
||||
// GetTxBySender returns a *WrappedTx by the transaction's sender property
|
||||
// defined by the ABCI application.
|
||||
func (txs *TxStore) GetTxBySender(sender string) *WrappedTx {
|
||||
txs.mtx.RLock()
|
||||
defer txs.mtx.RUnlock()
|
||||
|
||||
return txs.senderTxs[sender]
|
||||
}
|
||||
|
||||
// GetTxByHash returns a *WrappedTx by the transaction's hash.
|
||||
func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx {
|
||||
txs.mtx.RLock()
|
||||
defer txs.mtx.RUnlock()
|
||||
|
||||
return txs.hashTxs[hash]
|
||||
}
|
||||
|
||||
// IsTxRemoved returns true if a transaction by hash is marked as removed and
|
||||
// false otherwise.
|
||||
func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool {
|
||||
txs.mtx.RLock()
|
||||
defer txs.mtx.RUnlock()
|
||||
|
||||
wtx, ok := txs.hashTxs[hash]
|
||||
if ok {
|
||||
return wtx.removed
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a
|
||||
// non-empty sender, we additionally store the transaction by the sender as
|
||||
// defined by the ABCI application.
|
||||
func (txs *TxStore) SetTx(wtx *WrappedTx) {
|
||||
txs.mtx.Lock()
|
||||
defer txs.mtx.Unlock()
|
||||
|
||||
if len(wtx.sender) > 0 {
|
||||
txs.senderTxs[wtx.sender] = wtx
|
||||
}
|
||||
|
||||
txs.hashTxs[wtx.tx.Key()] = wtx
|
||||
}
|
||||
|
||||
// RemoveTx removes a *WrappedTx from the transaction store. It deletes all
|
||||
// indexes of the transaction.
|
||||
func (txs *TxStore) RemoveTx(wtx *WrappedTx) {
|
||||
txs.mtx.Lock()
|
||||
defer txs.mtx.Unlock()
|
||||
|
||||
if len(wtx.sender) > 0 {
|
||||
delete(txs.senderTxs, wtx.sender)
|
||||
}
|
||||
|
||||
delete(txs.hashTxs, wtx.tx.Key())
|
||||
wtx.removed = true
|
||||
}
|
||||
|
||||
// TxHasPeer returns true if a transaction by hash has a given peer ID and false
|
||||
// otherwise. If the transaction does not exist, false is returned.
|
||||
func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool {
|
||||
txs.mtx.RLock()
|
||||
defer txs.mtx.RUnlock()
|
||||
|
||||
wtx := txs.hashTxs[hash]
|
||||
if wtx == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := wtx.peers[peerID]
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the
|
||||
// given peerID to the WrappedTx's set of peers that sent us this transaction.
|
||||
// We return true if we've already recorded the given peer for this transaction
|
||||
// and false otherwise. If the transaction does not exist by hash, we return
|
||||
// (nil, false).
|
||||
func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) {
|
||||
txs.mtx.Lock()
|
||||
defer txs.mtx.Unlock()
|
||||
|
||||
wtx := txs.hashTxs[hash]
|
||||
if wtx == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if wtx.peers == nil {
|
||||
wtx.peers = make(map[uint16]struct{})
|
||||
}
|
||||
|
||||
if _, ok := wtx.peers[peerID]; ok {
|
||||
return wtx, true
|
||||
}
|
||||
|
||||
wtx.peers[peerID] = struct{}{}
|
||||
return wtx, false
|
||||
}
|
||||
|
||||
// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be
|
||||
// used to build generic transaction indexes in the mempool. It accepts a
|
||||
// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx
|
||||
// references which is used during Insert in order to determine sorted order. If
|
||||
// less returns true, a <= b.
|
||||
type WrappedTxList struct {
|
||||
mtx tmsync.RWMutex
|
||||
txs []*WrappedTx
|
||||
less func(*WrappedTx, *WrappedTx) bool
|
||||
}
|
||||
|
||||
func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList {
|
||||
return &WrappedTxList{
|
||||
txs: make([]*WrappedTx, 0),
|
||||
less: less,
|
||||
}
|
||||
}
|
||||
|
||||
// Size returns the number of WrappedTx objects in the list.
|
||||
func (wtl *WrappedTxList) Size() int {
|
||||
wtl.mtx.RLock()
|
||||
defer wtl.mtx.RUnlock()
|
||||
|
||||
return len(wtl.txs)
|
||||
}
|
||||
|
||||
// Reset resets the list of transactions to an empty list.
|
||||
func (wtl *WrappedTxList) Reset() {
|
||||
wtl.mtx.Lock()
|
||||
defer wtl.mtx.Unlock()
|
||||
|
||||
wtl.txs = make([]*WrappedTx, 0)
|
||||
}
|
||||
|
||||
// Insert inserts a WrappedTx reference into the sorted list based on the list's
|
||||
// comparator function.
|
||||
func (wtl *WrappedTxList) Insert(wtx *WrappedTx) {
|
||||
wtl.mtx.Lock()
|
||||
defer wtl.mtx.Unlock()
|
||||
|
||||
i := sort.Search(len(wtl.txs), func(i int) bool {
|
||||
return wtl.less(wtl.txs[i], wtx)
|
||||
})
|
||||
|
||||
if i == len(wtl.txs) {
|
||||
// insert at the end
|
||||
wtl.txs = append(wtl.txs, wtx)
|
||||
return
|
||||
}
|
||||
|
||||
// Make space for the inserted element by shifting values at the insertion
|
||||
// index up one index.
|
||||
//
|
||||
// NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs).
|
||||
wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...)
|
||||
wtl.txs[i] = wtx
|
||||
}
|
||||
|
||||
// Remove attempts to remove a WrappedTx from the sorted list.
|
||||
func (wtl *WrappedTxList) Remove(wtx *WrappedTx) {
|
||||
wtl.mtx.Lock()
|
||||
defer wtl.mtx.Unlock()
|
||||
|
||||
i := sort.Search(len(wtl.txs), func(i int) bool {
|
||||
return wtl.less(wtl.txs[i], wtx)
|
||||
})
|
||||
|
||||
// Since the list is sorted, we evaluate all elements starting at i. Note, if
|
||||
// the element does not exist, we may potentially evaluate the entire remainder
|
||||
// of the list. However, a caller should not be expected to call Remove with a
|
||||
// non-existing element.
|
||||
for i < len(wtl.txs) {
|
||||
if wtl.txs[i] == wtx {
|
||||
wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...)
|
||||
return
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
}
|
||||
0
test/fuzz/mempool/v0/testdata/cases/empty
vendored
Normal file
0
test/fuzz/mempool/v0/testdata/cases/empty
vendored
Normal file
0
test/fuzz/mempool/v1/testdata/cases/empty
vendored
Normal file
0
test/fuzz/mempool/v1/testdata/cases/empty
vendored
Normal file
Reference in New Issue
Block a user