diff --git a/consensus/common_test.go b/consensus/common_test.go index 5d6a905e2..09512f49d 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -394,7 +394,7 @@ func newStateWithConfigAndBlockStore( mtx := new(tmsync.Mutex) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) - + proxyAppConnConMem := abcicli.NewLocalClient(mtx, app) // Make Mempool memplMetrics := mempl.NopMetrics() @@ -404,7 +404,7 @@ func newStateWithConfigAndBlockStore( switch config.Mempool.Version { case cfg.MempoolV0: mempool = mempoolv0.NewCListMempool(config.Mempool, - proxyAppConnCon, + proxyAppConnConMem, state.LastBlockHeight, mempoolv0.WithMetrics(memplMetrics), mempoolv0.WithPreCheck(sm.TxPreCheck(state)), @@ -413,7 +413,7 @@ func newStateWithConfigAndBlockStore( logger := consensusLogger() mempool = mempoolv1.NewTxMempool(logger, config.Mempool, - proxyAppConnCon, + proxyAppConnConMem, state.LastBlockHeight, mempoolv1.WithMetrics(memplMetrics), mempoolv1.WithPreCheck(sm.TxPreCheck(state)), diff --git a/mempool_bak/bench_test.go b/mempool_bak/bench_test.go deleted file mode 100644 index 1c26999d1..000000000 --- a/mempool_bak/bench_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package mempool - -import ( - "encoding/binary" - "testing" -) - -func BenchmarkCacheInsertTime(b *testing.B) { - cache := NewLRUTxCache(b.N) - - txs := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - txs[i] = make([]byte, 8) - binary.BigEndian.PutUint64(txs[i], uint64(i)) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - cache.Push(txs[i]) - } -} - -// This benchmark is probably skewed, since we actually will be removing -// txs in parallel, which may cause some overhead due to mutex locking. -func BenchmarkCacheRemoveTime(b *testing.B) { - cache := NewLRUTxCache(b.N) - - txs := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - txs[i] = make([]byte, 8) - binary.BigEndian.PutUint64(txs[i], uint64(i)) - cache.Push(txs[i]) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - cache.Remove(txs[i]) - } -} diff --git a/mempool_bak/cache.go b/mempool_bak/cache.go deleted file mode 100644 index cdaefe762..000000000 --- a/mempool_bak/cache.go +++ /dev/null @@ -1,107 +0,0 @@ -package mempool - -import ( - "container/list" - - tmsync "github.com/tendermint/tendermint/libs/sync" - "github.com/tendermint/tendermint/types" -) - -// TxCache defines an interface for raw transaction caching in a mempool. -// Currently, a TxCache does not allow direct reading or getting of transaction -// values. A TxCache is used primarily to push transactions and removing -// transactions. Pushing via Push returns a boolean telling the caller if the -// transaction already exists in the cache or not. -type TxCache interface { - // Reset resets the cache to an empty state. - Reset() - - // Push adds the given raw transaction to the cache and returns true if it was - // newly added. Otherwise, it returns false. - Push(tx types.Tx) bool - - // Remove removes the given raw transaction from the cache. - Remove(tx types.Tx) -} - -var _ TxCache = (*LRUTxCache)(nil) - -// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache -// only stores the hash of the raw transaction. -type LRUTxCache struct { - mtx tmsync.Mutex - size int - cacheMap map[types.TxKey]*list.Element - list *list.List -} - -func NewLRUTxCache(cacheSize int) *LRUTxCache { - return &LRUTxCache{ - size: cacheSize, - cacheMap: make(map[types.TxKey]*list.Element, cacheSize), - list: list.New(), - } -} - -// GetList returns the underlying linked-list that backs the LRU cache. Note, -// this should be used for testing purposes only! -func (c *LRUTxCache) GetList() *list.List { - return c.list -} - -func (c *LRUTxCache) Reset() { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.cacheMap = make(map[types.TxKey]*list.Element, c.size) - c.list.Init() -} - -func (c *LRUTxCache) Push(tx types.Tx) bool { - c.mtx.Lock() - defer c.mtx.Unlock() - - key := tx.Key() - - moved, ok := c.cacheMap[key] - if ok { - c.list.MoveToBack(moved) - return false - } - - if c.list.Len() >= c.size { - front := c.list.Front() - if front != nil { - frontKey := front.Value.(types.TxKey) - delete(c.cacheMap, frontKey) - c.list.Remove(front) - } - } - - e := c.list.PushBack(key) - c.cacheMap[key] = e - - return true -} - -func (c *LRUTxCache) Remove(tx types.Tx) { - c.mtx.Lock() - defer c.mtx.Unlock() - - key := tx.Key() - e := c.cacheMap[key] - delete(c.cacheMap, key) - - if e != nil { - c.list.Remove(e) - } -} - -// NopTxCache defines a no-op raw transaction cache. -type NopTxCache struct{} - -var _ TxCache = (*NopTxCache)(nil) - -func (NopTxCache) Reset() {} -func (NopTxCache) Push(types.Tx) bool { return true } -func (NopTxCache) Remove(types.Tx) {} diff --git a/mempool_bak/cache_test.go b/mempool_bak/cache_test.go deleted file mode 100644 index 44b2beb01..000000000 --- a/mempool_bak/cache_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package mempool - -import ( - "crypto/rand" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestCacheRemove(t *testing.T) { - cache := NewLRUTxCache(100) - numTxs := 10 - - txs := make([][]byte, numTxs) - for i := 0; i < numTxs; i++ { - // probability of collision is 2**-256 - txBytes := make([]byte, 32) - _, err := rand.Read(txBytes) - require.NoError(t, err) - - txs[i] = txBytes - cache.Push(txBytes) - - // make sure its added to both the linked list and the map - require.Equal(t, i+1, len(cache.cacheMap)) - require.Equal(t, i+1, cache.list.Len()) - } - - for i := 0; i < numTxs; i++ { - cache.Remove(txs[i]) - // make sure its removed from both the map and the linked list - require.Equal(t, numTxs-(i+1), len(cache.cacheMap)) - require.Equal(t, numTxs-(i+1), cache.list.Len()) - } -} diff --git a/mempool_bak/clist_mempool.go b/mempool_bak/clist_mempool.go deleted file mode 100644 index 601052152..000000000 --- a/mempool_bak/clist_mempool.go +++ /dev/null @@ -1,872 +0,0 @@ -package mempool - -import ( - "bytes" - "errors" - "fmt" - "reflect" - "sync/atomic" - "time" - - abci "github.com/tendermint/tendermint/abci/types" - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/clist" - "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" - tmsync "github.com/tendermint/tendermint/libs/sync" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" -) - -var ( - _ Mempool = (*TxMempool)(nil) - - newline = []byte("\n") -) - -// TxMempoolOption sets an optional parameter on the TxMempool. -type TxMempoolOption func(*TxMempool) - -// TxMempool defines a prioritized mempool data structure used by the v1 mempool -// reactor. It keeps a thread-safe priority queue of transactions that is used -// when a block proposer constructs a block and a thread-safe linked-list that -// is used to gossip transactions to peers in a FIFO manner. -type TxMempool struct { - logger log.Logger - metrics *Metrics - config *cfg.MempoolConfig - proxyAppConn proxy.AppConnMempool - - // txsAvailable fires once for each height when the mempool is not empty - txsAvailable chan struct{} - notifiedTxsAvailable bool - - // height defines the last block height process during Update() - height int64 - - // sizeBytes defines the total size of the mempool (sum of all tx bytes) - sizeBytes int64 - - // cache defines a fixed-size cache of already seen transactions as this - // reduces pressure on the proxyApp. - cache TxCache - - // txStore defines the main storage of valid transactions. Indexes are built - // on top of this store. - txStore *TxStore - - // gossipIndex defines the gossiping index of valid transactions via a - // thread-safe linked-list. We also use the gossip index as a cursor for - // rechecking transactions already in the mempool. - gossipIndex *clist.CList - - // recheckCursor and recheckEnd are used as cursors based on the gossip index - // to recheck transactions that are already in the mempool. Iteration is not - // thread-safe and transaction may be mutated in serial order. - // - // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for - // iterator and cursor management when rechecking transactions. If the gossip - // index changes or is removed in a future refactor, this will have to be - // refactored. Instead, we should consider just keeping a slice of a snapshot - // of the mempool's current transactions during Update and an integer cursor - // into that slice. This, however, requires additional O(n) space complexity. - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // priorityIndex defines the priority index of valid transactions via a - // thread-safe priority queue. - priorityIndex *TxPriorityQueue - - // heightIndex defines a height-based, in ascending order, transaction index. - // i.e. older transactions are first. - heightIndex *WrappedTxList - - // timestampIndex defines a timestamp-based, in ascending order, transaction - // index. i.e. older transactions are first. - timestampIndex *WrappedTxList - - // A read/write lock is used to safe guard updates, insertions and deletions - // from the mempool. A read-lock is implicitly acquired when executing CheckTx, - // however, a caller must explicitly grab a write-lock via Lock when updating - // the mempool via Update(). - mtx tmsync.RWMutex - preCheck PreCheckFunc - postCheck PostCheckFunc -} - -func NewTxMempool( - logger log.Logger, - config *cfg.MempoolConfig, - proxyAppConn proxy.AppConnMempool, - height int64, - options ...TxMempoolOption, -) *TxMempool { - - txmp := &TxMempool{ - logger: logger, - config: config, - proxyAppConn: proxyAppConn, - height: height, - cache: NopTxCache{}, - metrics: NopMetrics(), - txStore: NewTxStore(), - gossipIndex: clist.New(), - priorityIndex: NewTxPriorityQueue(), - heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }), - timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) - }), - } - - if config.CacheSize > 0 { - txmp.cache = NewLRUTxCache(config.CacheSize) - } - - proxyAppConn.SetResponseCallback(txmp.defaultTxCallback) - - for _, opt := range options { - opt(txmp) - } - - return txmp -} - -// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) -// returns an error. This is executed before CheckTx. It only applies to the -// first created block. After that, Update() overwrites the existing value. -func WithPreCheck(f PreCheckFunc) TxMempoolOption { - return func(txmp *TxMempool) { txmp.preCheck = f } -} - -// WithPostCheck sets a filter for the mempool to reject a transaction if -// f(tx, resp) returns an error. This is executed after CheckTx. It only applies -// to the first created block. After that, Update overwrites the existing value. -func WithPostCheck(f PostCheckFunc) TxMempoolOption { - return func(txmp *TxMempool) { txmp.postCheck = f } -} - -// WithMetrics sets the mempool's metrics collector. -func WithMetrics(metrics *Metrics) TxMempoolOption { - return func(txmp *TxMempool) { txmp.metrics = metrics } -} - -func (txmp *TxMempool) SetLogger(l log.Logger) { - txmp.logger = l -} - -// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly -// release the lock when finished. -func (txmp *TxMempool) Lock() { - txmp.mtx.Lock() -} - -// Unlock releases a write-lock on the mempool. -func (txmp *TxMempool) Unlock() { - txmp.mtx.Unlock() -} - -// Size returns the number of valid transactions in the mempool. It is -// thread-safe. -func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() -} - -// SizeBytes return the total sum in bytes of all the valid transactions in the -// mempool. It is thread-safe. -func (txmp *TxMempool) SizeBytes() int64 { - return atomic.LoadInt64(&txmp.sizeBytes) -} - -// FlushAppConn executes FlushSync on the mempool's proxyAppConn. -// -// NOTE: The caller must obtain a write-lock via Lock() prior to execution. -func (txmp *TxMempool) FlushAppConn() error { - return txmp.proxyAppConn.FlushSync() -} - -// WaitForNextTx returns a blocking channel that will be closed when the next -// valid transaction is available to gossip. It is thread-safe. -func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { - return txmp.gossipIndex.WaitChan() -} - -// NextGossipTx returns the next valid transaction to gossip. A caller must wait -// for WaitForNextTx to signal a transaction is available to gossip first. It is -// thread-safe. -func (txmp *TxMempool) NextGossipTx() *clist.CElement { - return txmp.gossipIndex.Front() -} - -// EnableTxsAvailable enables the mempool to trigger events when transactions -// are available on a block by block basis. -func (txmp *TxMempool) EnableTxsAvailable() { - txmp.mtx.Lock() - defer txmp.mtx.Unlock() - - txmp.txsAvailable = make(chan struct{}, 1) -} - -// TxsAvailable returns a channel which fires once for every height, and only -// when transactions are available in the mempool. It is thread-safe. -func (txmp *TxMempool) TxsAvailable() <-chan struct{} { - return txmp.txsAvailable -} - -// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires -// a read-lock attempts to execute the application's CheckTx ABCI method via -// CheckTxAsync. We return an error if any of the following happen: -// -// - The CheckTxAsync execution fails. -// - The transaction already exists in the cache and we've already received the -// transaction from the peer. Otherwise, if it solely exists in the cache, we -// return nil. -// - The transaction size exceeds the maximum transaction size as defined by the -// configuration provided to the mempool. -// - The transaction fails Pre-Check (if it is defined). -// - The proxyAppConn fails, e.g. the buffer is full. -// -// If the mempool is full, we still execute CheckTx and attempt to find a lower -// priority transaction to evict. If such a transaction exists, we remove the -// lower priority transaction and add the new one with higher priority. -// -// NOTE: -// - The applications' CheckTx implementation may panic. -// - The caller is not to explicitly require any locks for executing CheckTx. -func (txmp *TxMempool) CheckTx( - tx types.Tx, - cb func(*abci.Response), - txInfo TxInfo, -) error { - - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() - - txSize := len(tx) - if txSize > txmp.config.MaxTxBytes { - return ErrTxTooLarge{ - max: txmp.config.MaxTxBytes, - actual: txSize, - } - } - - if txmp.preCheck != nil { - if err := txmp.preCheck(tx); err != nil { - return ErrPreCheck{ - Reason: err, - } - } - } - - if err := txmp.proxyAppConn.Error(); err != nil { - return err - } - - txHash := tx.Key() - - // We add the transaction to the mempool's cache and if the - // transaction is already present in the cache, i.e. false is returned, then we - // check if we've seen this transaction and error if we have. - if !txmp.cache.Push(tx) { - txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) - return ErrTxInCache - } - - reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - - reqRes.SetCallback(func(res *abci.Response) { - if txmp.recheckCursor != nil { - panic("recheck cursor is non-nil in CheckTx callback") - } - - wtx := &WrappedTx{ - tx: tx, - hash: txHash, - timestamp: time.Now().UTC(), - height: txmp.height, - } - txmp.initTxCallback(wtx, res, txInfo) - - if cb != nil { - cb(res) - } - }) - - return nil -} - -func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { - txmp.Lock() - defer txmp.Unlock() - - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { - txmp.removeTx(wtx, false) - return nil - } - - return errors.New("transaction not found") -} - -// Flush flushes out the mempool. It acquires a read-lock, fetches all the -// transactions currently in the transaction store and removes each transaction -// from the store and all indexes and finally resets the cache. -// -// NOTE: -// - Flushing the mempool may leave the mempool in an inconsistent state. -func (txmp *TxMempool) Flush() { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() - - txmp.heightIndex.Reset() - txmp.timestampIndex.Reset() - - for _, wtx := range txmp.txStore.GetAllTxs() { - txmp.removeTx(wtx, false) - } - - atomic.SwapInt64(&txmp.sizeBytes, 0) - txmp.cache.Reset() -} - -// ReapMaxBytesMaxGas returns a list of transactions within the provided size -// and gas constraints. Transaction are retrieved in priority order. -// -// NOTE: -// - A read-lock is acquired. -// - Transactions returned are not actually removed from the mempool transaction -// store or indexes. -func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() - - var ( - totalGas int64 - totalSize int64 - ) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) - } - }() - - txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) - for txmp.priorityIndex.NumTxs() > 0 { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) - - // Ensure we have capacity for the transaction with respect to the - // transaction size. - if maxBytes > -1 && totalSize+size > maxBytes { - return txs[:len(txs)-1] - } - - totalSize += size - - // ensure we have capacity for the transaction with respect to total gas - gas := totalGas + wtx.gasWanted - if maxGas > -1 && gas > maxGas { - return txs[:len(txs)-1] - } - - totalGas = gas - } - - return txs -} - -// ReapMaxTxs returns a list of transactions within the provided number of -// transactions bound. Transaction are retrieved in priority order. -// -// NOTE: -// - A read-lock is acquired. -// - Transactions returned are not actually removed from the mempool transaction -// store or indexes. -func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() - - numTxs := txmp.priorityIndex.NumTxs() - if max < 0 { - max = numTxs - } - - cap := tmmath.MinInt(numTxs, max) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, cap) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) - } - }() - - txs := make([]types.Tx, 0, cap) - for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - } - - return txs -} - -// Update iterates over all the transactions provided by the caller, i.e. the -// block producer, and removes them from the cache (if applicable) and removes -// the transactions from the main transaction store and associated indexes. -// Finally, if there are trainsactions remaining in the mempool, we initiate a -// re-CheckTx for them (if applicable), otherwise, we notify the caller more -// transactions are available. -// -// NOTE: -// - The caller must explicitly acquire a write-lock via Lock(). -func (txmp *TxMempool) Update( - blockHeight int64, - blockTxs types.Txs, - deliverTxResponses []*abci.ResponseDeliverTx, - newPreFn PreCheckFunc, - newPostFn PostCheckFunc, -) error { - - txmp.height = blockHeight - txmp.notifiedTxsAvailable = false - - if newPreFn != nil { - txmp.preCheck = newPreFn - } - if newPostFn != nil { - txmp.postCheck = newPostFn - } - - for i, tx := range blockTxs { - if deliverTxResponses[i].Code == abci.CodeTypeOK { - // add the valid committed transaction to the cache (if missing) - _ = txmp.cache.Push(tx) - } else if !txmp.config.KeepInvalidTxsInCache { - // allow invalid transactions to be re-submitted - txmp.cache.Remove(tx) - } - - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { - txmp.removeTx(wtx, false) - } - } - - txmp.purgeExpiredTxs(blockHeight) - - // If there any uncommitted transactions left in the mempool, we either - // initiate re-CheckTx per remaining transaction or notify that remaining - // transactions are left. - if txmp.Size() > 0 { - if txmp.config.Recheck { - txmp.logger.Debug( - "executing re-CheckTx for all remaining transactions", - "num_txs", txmp.Size(), - "height", blockHeight, - ) - txmp.updateReCheckTxs() - } else { - txmp.notifyTxsAvailable() - } - } - - txmp.metrics.Size.Set(float64(txmp.Size())) - return nil -} - -// initTxCallback performs the initial, i.e. the first, callback after CheckTx -// has been executed by the ABCI application. In other words, initTxCallback is -// called after executing CheckTx when we see a unique transaction for the first -// time. CheckTx can be called again for the same transaction at a later point -// in time when re-checking, however, this callback will not be called. -// -// After the ABCI application executes CheckTx, initTxCallback is called with -// the ABCI *Response object and TxInfo. If postCheck is defined on the mempool, -// we execute that first. If there is no error from postCheck (if defined) and -// the ABCI CheckTx response code is OK, we attempt to insert the transaction. -// -// When attempting to insert the transaction, we first check if there is -// sufficient capacity. If there is sufficient capacity, the transaction is -// inserted into the txStore and indexed across all indexes. Otherwise, if the -// mempool is full, we attempt to find a lower priority transaction to evict in -// place of the new incoming transaction. If no such transaction exists, the -// new incoming transaction is rejected. -// -// If the new incoming transaction fails CheckTx or postCheck fails, we reject -// the new incoming transaction. -// -// NOTE: -// - An explicit lock is NOT required. -func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - return - } - - var err error - if txmp.postCheck != nil { - err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx) - } - - if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK { - // ignore bad transactions - txmp.logger.Info( - "rejected bad transaction", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "peer_id", txInfo.SenderP2PID, - "code", checkTxRes.CheckTx.Code, - "post_check_err", err, - ) - - txmp.metrics.FailedTxs.Add(1) - - if !txmp.config.KeepInvalidTxsInCache { - txmp.cache.Remove(wtx.tx) - } - if err != nil { - checkTxRes.CheckTx.MempoolError = err.Error() - } - return - } - - sender := checkTxRes.CheckTx.Sender - priority := checkTxRes.CheckTx.Priority - - if len(sender) > 0 { - if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil { - txmp.logger.Error( - "rejected incoming good transaction; tx already exists for sender", - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "sender", sender, - ) - txmp.metrics.RejectedTxs.Add(1) - return - } - } - - if err := txmp.canAddTx(wtx); err != nil { - evictTxs := txmp.priorityIndex.GetEvictableTxs( - priority, - int64(wtx.Size()), - txmp.SizeBytes(), - txmp.config.MaxTxsBytes, - ) - if len(evictTxs) == 0 { - // No room for the new incoming transaction so we just remove it from - // the cache. - txmp.cache.Remove(wtx.tx) - txmp.logger.Error( - "rejected incoming good transaction; mempool full", - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "err", err.Error(), - ) - txmp.metrics.RejectedTxs.Add(1) - return - } - - // evict an existing transaction(s) - // - // NOTE: - // - The transaction, toEvict, can be removed while a concurrent - // reCheckTx callback is being executed for the same transaction. - for _, toEvict := range evictTxs { - txmp.removeTx(toEvict, true) - txmp.logger.Debug( - "evicted existing good transaction; mempool full", - "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), - "old_priority", toEvict.priority, - "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "new_priority", wtx.priority, - ) - txmp.metrics.EvictedTxs.Add(1) - } - } - - wtx.gasWanted = checkTxRes.CheckTx.GasWanted - wtx.priority = priority - wtx.sender = sender - wtx.peers = map[uint16]struct{}{ - txInfo.SenderID: {}, - } - - txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) - txmp.metrics.Size.Set(float64(txmp.Size())) - - txmp.insertTx(wtx) - txmp.logger.Debug( - "inserted good transaction", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "height", txmp.height, - "num_txs", txmp.Size(), - ) - txmp.notifyTxsAvailable() - -} - -// defaultTxCallback performs the default CheckTx application callback. This is -// NOT executed when a transaction is first seen/received. Instead, this callback -// is executed during re-checking transactions (if enabled). A caller, i.e a -// block proposer, acquires a mempool write-lock via Lock() and when executing -// Update(), if the mempool is non-empty and Recheck is enabled, then all -// remaining transactions will be rechecked via CheckTxAsync. The order in which -// they are rechecked must be the same order in which this callback is called -// per transaction. -func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) { - if txmp.recheckCursor == nil { - return - } - - txmp.metrics.RecheckTimes.Add(1) - - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - txmp.logger.Error( - "received incorrect type in mempool callback", - "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(), - "got", reflect.TypeOf(res.Value).Name(), - ) - return - } - - tx := req.GetCheckTx().Tx - wtx := txmp.recheckCursor.Value.(*WrappedTx) - - // Search through the remaining list of tx to recheck for a transaction that matches - // the one we received from the ABCI application. - for { - if bytes.Equal(tx, wtx.tx) { - // We've found a tx in the recheck list that matches the tx that we - // received from the ABCI application. - // Break, and use this transaction for further checks. - break - } - - txmp.logger.Error( - "re-CheckTx transaction mismatch", - "got", wtx.tx.Hash(), - "expected", types.Tx(tx).Key(), - ) - - if txmp.recheckCursor == txmp.recheckEnd { - // we reached the end of the recheckTx list without finding a tx - // matching the one we received from the ABCI application. - // Return without processing any tx. - txmp.recheckCursor = nil - return - } - - txmp.recheckCursor = txmp.recheckCursor.Next() - wtx = txmp.recheckCursor.Value.(*WrappedTx) - } - - // Only evaluate transactions that have not been removed. This can happen - // if an existing transaction is evicted during CheckTx and while this - // callback is being executed for the same evicted transaction. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - var err error - if txmp.postCheck != nil { - err = txmp.postCheck(tx, checkTxRes.CheckTx) - } - - if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { - wtx.priority = checkTxRes.CheckTx.Priority - } else { - txmp.logger.Debug( - "existing transaction no longer valid; failed re-CheckTx callback", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "err", err, - "code", checkTxRes.CheckTx.Code, - ) - - if wtx.gossipEl != txmp.recheckCursor { - panic("corrupted reCheckTx cursor") - } - - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) - } - } - - // move reCheckTx cursor to next element - if txmp.recheckCursor == txmp.recheckEnd { - txmp.recheckCursor = nil - } else { - txmp.recheckCursor = txmp.recheckCursor.Next() - } - - if txmp.recheckCursor == nil { - txmp.logger.Debug("finished rechecking transactions") - - if txmp.Size() > 0 { - txmp.notifyTxsAvailable() - } - } - - txmp.metrics.Size.Set(float64(txmp.Size())) -} - -// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For -// each transaction, it executes CheckTxAsync. The global callback defined on -// the proxyAppConn will be executed for each transaction after CheckTx is -// executed. -// -// NOTE: -// - The caller must have a write-lock when executing updateReCheckTxs. -func (txmp *TxMempool) updateReCheckTxs() { - if txmp.Size() == 0 { - panic("attempted to update re-CheckTx txs when mempool is empty") - } - - txmp.recheckCursor = txmp.gossipIndex.Front() - txmp.recheckEnd = txmp.gossipIndex.Back() - - for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { - wtx := e.Value.(*WrappedTx) - - // Only execute CheckTx if the transaction is not marked as removed which - // could happen if the transaction was evicted. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, - }) - } - } - - txmp.proxyAppConn.FlushAsync() -} - -// canAddTx returns an error if we cannot insert the provided *WrappedTx into -// the mempool due to mempool configured constraints. Otherwise, nil is returned -// and the transaction can be inserted into the mempool. -func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { - var ( - numTxs = txmp.Size() - sizeBytes = txmp.SizeBytes() - ) - - if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes { - return ErrMempoolIsFull{ - numTxs: numTxs, - maxTxs: txmp.config.Size, - txsBytes: sizeBytes, - maxTxsBytes: txmp.config.MaxTxsBytes, - } - } - - return nil -} - -func (txmp *TxMempool) insertTx(wtx *WrappedTx) { - txmp.txStore.SetTx(wtx) - txmp.priorityIndex.PushTx(wtx) - txmp.heightIndex.Insert(wtx) - txmp.timestampIndex.Insert(wtx) - - // Insert the transaction into the gossip index and mark the reference to the - // linked-list element, which will be needed at a later point when the - // transaction is removed. - gossipEl := txmp.gossipIndex.PushBack(wtx) - wtx.gossipEl = gossipEl - - atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) -} - -func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { - if txmp.txStore.IsTxRemoved(wtx.hash) { - return - } - - txmp.txStore.RemoveTx(wtx) - txmp.priorityIndex.RemoveTx(wtx) - txmp.heightIndex.Remove(wtx) - txmp.timestampIndex.Remove(wtx) - - // Remove the transaction from the gossip index and cleanup the linked-list - // element so it can be garbage collected. - txmp.gossipIndex.Remove(wtx.gossipEl) - wtx.gossipEl.DetachPrev() - - atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } -} - -// purgeExpiredTxs removes all transactions that have exceeded their respective -// height and/or time based TTLs from their respective indexes. Every expired -// transaction will be removed from the mempool entirely, except for the cache. -// -// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which -// the caller has a write-lock on the mempool and so we can safely iterate over -// the height and time based indexes. -func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { - now := time.Now() - expiredTxs := make(map[types.TxKey]*WrappedTx) - - if txmp.config.TTLNumBlocks > 0 { - purgeIdx := -1 - for i, wtx := range txmp.heightIndex.txs { - if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } - } - - if purgeIdx >= 0 { - txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:] - } - } - - if txmp.config.TTLDuration > 0 { - purgeIdx := -1 - for i, wtx := range txmp.timestampIndex.txs { - if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } - } - - if purgeIdx >= 0 { - txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:] - } - } - - for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) - } -} - -func (txmp *TxMempool) notifyTxsAvailable() { - if txmp.Size() == 0 { - panic("attempt to notify txs available but mempool is empty!") - } - - if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { - // channel cap is 1, so this will send once - txmp.notifiedTxsAvailable = true - - select { - case txmp.txsAvailable <- struct{}{}: - default: - } - } -} diff --git a/mempool_bak/clist_mempool_test.go b/mempool_bak/clist_mempool_test.go deleted file mode 100644 index 3b9507344..000000000 --- a/mempool_bak/clist_mempool_test.go +++ /dev/null @@ -1,1178 +0,0 @@ -package mempool - -import ( - "bytes" - "errors" - "fmt" - "math/rand" - "os" - "sort" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/abci/example/code" - "github.com/tendermint/tendermint/abci/example/kvstore" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" -) - -// application extends the KV store application by overriding CheckTx to provide -// transaction priority based on the value in the key/value pair. -type application struct { - *kvstore.Application -} - -type testTx struct { - tx types.Tx - priority int64 -} - -func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { - var ( - priority int64 - sender string - ) - - // infer the priority from the raw transaction value (sender=key=value) - parts := bytes.Split(req.Tx, []byte("=")) - if len(parts) == 3 { - v, err := strconv.ParseInt(string(parts[2]), 10, 64) - if err != nil { - return abci.ResponseCheckTx{ - Priority: priority, - Code: 100, - GasWanted: 1, - } - } - - priority = v - sender = string(parts[0]) - } else { - return abci.ResponseCheckTx{ - Priority: priority, - Code: 101, - GasWanted: 1, - } - } - - return abci.ResponseCheckTx{ - Priority: priority, - Sender: sender, - Code: code.CodeTypeOK, - GasWanted: 1, - } -} - -func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { - t.Helper() - - app := &application{kvstore.NewApplication()} - cc := proxy.NewLocalClientCreator(app) - - cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) - cfg.Mempool.CacheSize = cacheSize - - appConnMem, err := cc.NewABCIClient() - require.NoError(t, err) - require.NoError(t, appConnMem.Start()) - - t.Cleanup(func() { - os.RemoveAll(cfg.RootDir) - require.NoError(t, appConnMem.Stop()) - }) - - return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) -} - -func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { - txs := make([]testTx, numTxs) - txInfo := TxInfo{SenderID: peerID} - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - for i := 0; i < numTxs; i++ { - prefix := make([]byte, 20) - _, err := rng.Read(prefix) - require.NoError(t, err) - - priority := int64(rng.Intn(9999-1000) + 1000) - - txs[i] = testTx{ - tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)), - priority: priority, - } - require.NoError(t, txmp.CheckTx(txs[i].tx, nil, txInfo)) - } - - return txs -} - -func TestTxMempool_TxsAvailable(t *testing.T) { - txmp := setup(t, 0) - txmp.EnableTxsAvailable() - - ensureNoTxFire := func() { - timer := time.NewTimer(500 * time.Millisecond) - select { - case <-txmp.TxsAvailable(): - require.Fail(t, "unexpected transactions event") - case <-timer.C: - } - } - - ensureTxFire := func() { - timer := time.NewTimer(500 * time.Millisecond) - select { - case <-txmp.TxsAvailable(): - case <-timer.C: - require.Fail(t, "expected transactions event") - } - } - - // ensure no event as we have not executed any transactions yet - ensureNoTxFire() - - // Execute CheckTx for some transactions and ensure TxsAvailable only fires - // once. - txs := checkTxs(t, txmp, 100, 0) - ensureTxFire() - ensureNoTxFire() - - rawTxs := make([]types.Tx, len(txs)) - for i, tx := range txs { - rawTxs[i] = tx.tx - } - - responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) - for i := 0; i < len(responses); i++ { - responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} - } - - // commit half the transactions and ensure we fire an event - txmp.Lock() - require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) - txmp.Unlock() - ensureTxFire() - ensureNoTxFire() - - // Execute CheckTx for more transactions and ensure we do not fire another - // event as we're still on the same height (1). - _ = checkTxs(t, txmp, 100, 0) - ensureNoTxFire() -} - -func TestTxMempool_Size(t *testing.T) { - txmp := setup(t, 0) - txs := checkTxs(t, txmp, 100, 0) - require.Equal(t, len(txs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - - rawTxs := make([]types.Tx, len(txs)) - for i, tx := range txs { - rawTxs[i] = tx.tx - } - - responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) - for i := 0; i < len(responses); i++ { - responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} - } - - txmp.Lock() - require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) - txmp.Unlock() - - require.Equal(t, len(rawTxs)/2, txmp.Size()) - require.Equal(t, int64(2850), txmp.SizeBytes()) -} - -func TestTxMempool_Flush(t *testing.T) { - txmp := setup(t, 0) - txs := checkTxs(t, txmp, 100, 0) - require.Equal(t, len(txs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - - rawTxs := make([]types.Tx, len(txs)) - for i, tx := range txs { - rawTxs[i] = tx.tx - } - - responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) - for i := 0; i < len(responses); i++ { - responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} - } - - txmp.Lock() - require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) - txmp.Unlock() - - txmp.Flush() - require.Zero(t, txmp.Size()) - require.Equal(t, int64(0), txmp.SizeBytes()) -} - -func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { - txmp := setup(t, 0) - tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - - txMap := make(map[types.TxKey]testTx) - priorities := make([]int64, len(tTxs)) - for i, tTx := range tTxs { - txMap[tTx.tx.Key()] = tTx - priorities[i] = tTx.priority - } - - sort.Slice(priorities, func(i, j int) bool { - // sort by priority, i.e. decreasing order - return priorities[i] > priorities[j] - }) - - ensurePrioritized := func(reapedTxs types.Txs) { - reapedPriorities := make([]int64, len(reapedTxs)) - for i, rTx := range reapedTxs { - reapedPriorities[i] = txMap[rTx.Key()].priority - } - - require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) - } - - // reap by gas capacity only - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, 50) - - // reap by transaction bytes only - reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.GreaterOrEqual(t, len(reapedTxs), 16) - - // Reap by both transaction bytes and gas, where the size yields 31 reaped - // transactions and the gas limit reaps 25 transactions. - reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, 25) -} - -func TestTxMempool_ReapMaxTxs(t *testing.T) { - txmp := setup(t, 0) - tTxs := checkTxs(t, txmp, 100, 0) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - - txMap := make(map[types.TxKey]testTx) - priorities := make([]int64, len(tTxs)) - for i, tTx := range tTxs { - txMap[tTx.tx.Key()] = tTx - priorities[i] = tTx.priority - } - - sort.Slice(priorities, func(i, j int) bool { - // sort by priority, i.e. decreasing order - return priorities[i] > priorities[j] - }) - - ensurePrioritized := func(reapedTxs types.Txs) { - reapedPriorities := make([]int64, len(reapedTxs)) - for i, rTx := range reapedTxs { - reapedPriorities[i] = txMap[rTx.Key()].priority - } - - require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) - } - - // reap all transactions - reapedTxs := txmp.ReapMaxTxs(-1) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, len(tTxs)) - - // reap a single transaction - reapedTxs = txmp.ReapMaxTxs(1) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, 1) - - // reap half of the transactions - reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2) - ensurePrioritized(reapedTxs) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5690), txmp.SizeBytes()) - require.Len(t, reapedTxs, len(tTxs)/2) -} - -func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { - txmp := setup(t, 0) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - tx := make([]byte, txmp.config.MaxTxBytes+1) - _, err := rng.Read(tx) - require.NoError(t, err) - - require.Error(t, txmp.CheckTx(tx, nil, TxInfo{SenderID: 0})) - - tx = make([]byte, txmp.config.MaxTxBytes-1) - _, err = rng.Read(tx) - require.NoError(t, err) - - require.NoError(t, txmp.CheckTx(tx, nil, TxInfo{SenderID: 0})) -} - -func TestTxMempool_CheckTxSamePeer(t *testing.T) { - txmp := setup(t, 100) - peerID := uint16(1) - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - prefix := make([]byte, 20) - _, err := rng.Read(prefix) - require.NoError(t, err) - - tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50)) - - require.NoError(t, txmp.CheckTx(tx, nil, TxInfo{SenderID: peerID})) - require.Error(t, txmp.CheckTx(tx, nil, TxInfo{SenderID: peerID})) -} - -func TestTxMempool_CheckTxSameSender(t *testing.T) { - txmp := setup(t, 100) - peerID := uint16(1) - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - prefix1 := make([]byte, 20) - _, err := rng.Read(prefix1) - require.NoError(t, err) - - prefix2 := make([]byte, 20) - _, err = rng.Read(prefix2) - require.NoError(t, err) - - tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50)) - tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50)) - - require.NoError(t, txmp.CheckTx(tx1, nil, TxInfo{SenderID: peerID})) - require.Equal(t, 1, txmp.Size()) - require.NoError(t, txmp.CheckTx(tx2, nil, TxInfo{SenderID: peerID})) - require.Equal(t, 1, txmp.Size()) -} - -func TestTxMempool_ConcurrentTxs(t *testing.T) { - txmp := setup(t, 100) - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - checkTxDone := make(chan struct{}) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - for i := 0; i < 20; i++ { - _ = checkTxs(t, txmp, 100, 0) - dur := rng.Intn(1000-500) + 500 - time.Sleep(time.Duration(dur) * time.Millisecond) - } - - wg.Done() - close(checkTxDone) - }() - - wg.Add(1) - go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer wg.Done() - - var height int64 = 1 - - for range ticker.C { - reapedTxs := txmp.ReapMaxTxs(200) - if len(reapedTxs) > 0 { - responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) - for i := 0; i < len(responses); i++ { - var code uint32 - - if i%10 == 0 { - code = 100 - } else { - code = abci.CodeTypeOK - } - - responses[i] = &abci.ResponseDeliverTx{Code: code} - } - - txmp.Lock() - require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil)) - txmp.Unlock() - - height++ - } else { - // only return once we know we finished the CheckTx loop - select { - case <-checkTxDone: - return - default: - } - } - } - }() - - wg.Wait() - require.Zero(t, txmp.Size()) - require.Zero(t, txmp.SizeBytes()) -} - -func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { - txmp := setup(t, 500) - txmp.height = 100 - txmp.config.TTLNumBlocks = 10 - - tTxs := checkTxs(t, txmp, 100, 0) - require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, 100, txmp.heightIndex.Size()) - - // reap 5 txs at the next height -- no txs should expire - reapedTxs := txmp.ReapMaxTxs(5) - responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) - for i := 0; i < len(responses); i++ { - responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} - } - - txmp.Lock() - require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil)) - txmp.Unlock() - - require.Equal(t, 95, txmp.Size()) - require.Equal(t, 95, txmp.heightIndex.Size()) - - // check more txs at height 101 - _ = checkTxs(t, txmp, 50, 1) - require.Equal(t, 145, txmp.Size()) - require.Equal(t, 145, txmp.heightIndex.Size()) - - // Reap 5 txs at a height that would expire all the transactions from before - // the previous Update (height 100). - // - // NOTE: When we reap txs below, we do not know if we're picking txs from the - // initial CheckTx calls or from the second round of CheckTx calls. Thus, we - // cannot guarantee that all 95 txs are remaining that should be expired and - // removed. However, we do know that that at most 95 txs can be expired and - // removed. - reapedTxs = txmp.ReapMaxTxs(5) - responses = make([]*abci.ResponseDeliverTx, len(reapedTxs)) - for i := 0; i < len(responses); i++ { - responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} - } - - txmp.Lock() - require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil)) - txmp.Unlock() - - require.GreaterOrEqual(t, txmp.Size(), 45) - require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45) -} - -func TestTxMempool_CheckTxPostCheckError(t *testing.T) { - cases := []struct { - name string - err error - }{ - { - name: "error", - err: errors.New("test error"), - }, - { - name: "no error", - err: nil, - }, - } - for _, tc := range cases { - testCase := tc - t.Run(testCase.name, func(t *testing.T) { - postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error { - return testCase.err - } - txmp := setup(t, 0, WithPostCheck(postCheckFn)) - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - tx := make([]byte, txmp.config.MaxTxBytes-1) - _, err := rng.Read(tx) - require.NoError(t, err) - - callback := func(res *abci.Response) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - require.True(t, ok) - expectedErrString := "" - if testCase.err != nil { - expectedErrString = testCase.err.Error() - } - require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError) - } - require.NoError(t, txmp.CheckTx(tx, callback, TxInfo{SenderID: 0})) - }) - } -} - -// A cleanupFunc cleans up any config / test files created for a particular -// test. -type cleanupFunc func() - -func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, cleanupFunc) { - return newMempoolWithAppAndConfig(cc, config.ResetTestRoot("mempool_test")) -} - -func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *config.Config) (*TxMempool, cleanupFunc) { - appConnMem, _ := cc.NewABCIClient() - appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - err := appConnMem.Start() - if err != nil { - panic(err) - - } - - mempool := NewTxMempool(log.TestingLogger(), config.Mempool, appConnMem, 0) - return mempool, func() { os.RemoveAll(config.RootDir) } -} - -// func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { -// timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) -// select { -// case <-ch: -// t.Fatal("Expected not to fire") -// case <-timer.C: -// } -// } - -// func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { -// timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) -// select { -// case <-ch: -// case <-timer.C: -// t.Fatal("Expected to fire") -// } -// } - -// func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs { -// txs := make(types.Txs, count) -// txInfo := TxInfo{SenderID: peerID} -// for i := 0; i < count; i++ { -// txBytes := make([]byte, 20) -// txs[i] = txBytes -// _, err := rand.Read(txBytes) -// if err != nil { -// t.Error(err) -// } -// if err := mempool.CheckTx(txBytes, nil, txInfo); err != nil { -// // Skip invalid txs. -// // TestMempoolFilters will fail otherwise. It asserts a number of txs -// // returned. -// if IsPreCheckError(err) { -// continue -// } -// t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i) -// } -// } -// return txs -// } - -// func TestReapMaxBytesMaxGas(t *testing.T) { -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// mempool, cleanup := newMempoolWithApp(cc) -// defer cleanup() - -// // Ensure gas calculation behaves as expected -// checkTxs(t, mempool, 1, UnknownPeerID) -// tx0 := mempool.TxsFront().Value.(*mempoolTx) -// // assert that kv store has gas wanted = 1. -// require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1") -// require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly") -// // ensure each tx is 20 bytes long -// require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes") -// mempool.Flush() - -// // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. -// // each tx has 20 bytes -// tests := []struct { -// numTxsToCreate int -// maxBytes int64 -// maxGas int64 -// expectedNumTxs int -// }{ -// {20, -1, -1, 20}, -// {20, -1, 0, 0}, -// {20, -1, 10, 10}, -// {20, -1, 30, 20}, -// {20, 0, -1, 0}, -// {20, 0, 10, 0}, -// {20, 10, 10, 0}, -// {20, 24, 10, 1}, -// {20, 240, 5, 5}, -// {20, 240, -1, 10}, -// {20, 240, 10, 10}, -// {20, 240, 15, 10}, -// {20, 20000, -1, 20}, -// {20, 20000, 5, 5}, -// {20, 20000, 30, 20}, -// } -// for tcIndex, tt := range tests { -// checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) -// got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas) -// assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d", -// len(got), tt.expectedNumTxs, tcIndex) -// mempool.Flush() -// } -// } - -// func TestMempoolFilters(t *testing.T) { -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// mempool, cleanup := newMempoolWithApp(cc) -// defer cleanup() -// emptyTxArr := []types.Tx{[]byte{}} - -// nopPreFilter := func(tx types.Tx) error { return nil } -// nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } - -// // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. -// // each tx has 20 bytes -// tests := []struct { -// numTxsToCreate int -// preFilter PreCheckFunc -// postFilter PostCheckFunc -// expectedNumTxs int -// }{ -// {10, nopPreFilter, nopPostFilter, 10}, -// {10, PreCheckMaxBytes(10), nopPostFilter, 0}, -// {10, PreCheckMaxBytes(22), nopPostFilter, 10}, -// {10, nopPreFilter, PostCheckMaxGas(-1), 10}, -// {10, nopPreFilter, PostCheckMaxGas(0), 0}, -// {10, nopPreFilter, PostCheckMaxGas(1), 10}, -// {10, nopPreFilter, PostCheckMaxGas(3000), 10}, -// {10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0}, -// {10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10}, -// {10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10}, -// {10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0}, -// } -// for tcIndex, tt := range tests { -// err := mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) -// require.NoError(t, err) -// checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) -// require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) -// mempool.Flush() -// } -// } - -// func TestMempoolUpdate(t *testing.T) { -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// mempool, cleanup := newMempoolWithApp(cc) -// defer cleanup() - -// // 1. Adds valid txs to the cache -// { -// err := mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) -// require.NoError(t, err) -// err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) -// if assert.Error(t, err) { -// assert.Equal(t, ErrTxInCache, err) -// } -// } - -// // 2. Removes valid txs from the mempool -// { -// err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) -// require.NoError(t, err) -// err = mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) -// require.NoError(t, err) -// assert.Zero(t, mempool.Size()) -// } - -// // 3. Removes invalid transactions from the cache and the mempool (if present) -// { -// err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) -// require.NoError(t, err) -// err = mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) -// require.NoError(t, err) -// assert.Zero(t, mempool.Size()) - -// err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) -// require.NoError(t, err) -// } -// } - -// func TestMempool_KeepInvalidTxsInCache(t *testing.T) { -// app := counter.NewApplication(true) -// cc := proxy.NewLocalClientCreator(app) -// wcfg := cfg.DefaultConfig() -// wcfg.Mempool.KeepInvalidTxsInCache = true -// mempool, cleanup := newMempoolWithAppAndConfig(cc, wcfg) -// defer cleanup() - -// // 1. An invalid transaction must remain in the cache after Update -// { -// a := make([]byte, 8) -// binary.BigEndian.PutUint64(a, 0) - -// b := make([]byte, 8) -// binary.BigEndian.PutUint64(b, 1) - -// err := mempool.CheckTx(b, nil, TxInfo{}) -// require.NoError(t, err) - -// // simulate new block -// _ = app.DeliverTx(abci.RequestDeliverTx{Tx: a}) -// _ = app.DeliverTx(abci.RequestDeliverTx{Tx: b}) -// err = mempool.Update(1, []types.Tx{a, b}, -// []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil) -// require.NoError(t, err) - -// // a must be added to the cache -// err = mempool.CheckTx(a, nil, TxInfo{}) -// if assert.Error(t, err) { -// assert.Equal(t, ErrTxInCache, err) -// } - -// // b must remain in the cache -// err = mempool.CheckTx(b, nil, TxInfo{}) -// if assert.Error(t, err) { -// assert.Equal(t, ErrTxInCache, err) -// } -// } - -// // 2. An invalid transaction must remain in the cache -// { -// a := make([]byte, 8) -// binary.BigEndian.PutUint64(a, 0) - -// // remove a from the cache to test (2) -// mempool.cache.Remove(a) - -// err := mempool.CheckTx(a, nil, TxInfo{}) -// require.NoError(t, err) - -// err = mempool.CheckTx(a, nil, TxInfo{}) -// if assert.Error(t, err) { -// assert.Equal(t, ErrTxInCache, err) -// } -// } -// } - -// func TestTxsAvailable(t *testing.T) { -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// mempool, cleanup := newMempoolWithApp(cc) -// defer cleanup() -// mempool.EnableTxsAvailable() - -// timeoutMS := 500 - -// // with no txs, it shouldnt fire -// ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - -// // send a bunch of txs, it should only fire once -// txs := checkTxs(t, mempool, 100, UnknownPeerID) -// ensureFire(t, mempool.TxsAvailable(), timeoutMS) -// ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - -// // call update with half the txs. -// // it should fire once now for the new height -// // since there are still txs left -// committedTxs, txs := txs[:50], txs[50:] -// if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { -// t.Error(err) -// } -// ensureFire(t, mempool.TxsAvailable(), timeoutMS) -// ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - -// // send a bunch more txs. we already fired for this height so it shouldnt fire again -// moreTxs := checkTxs(t, mempool, 50, UnknownPeerID) -// ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - -// // now call update with all the txs. it should not fire as there are no txs left -// committedTxs = append(txs, moreTxs...) //nolint: gocritic -// if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { -// t.Error(err) -// } -// ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - -// // send a bunch more txs, it should only fire once -// checkTxs(t, mempool, 100, UnknownPeerID) -// ensureFire(t, mempool.TxsAvailable(), timeoutMS) -// ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) -// } - -// func TestSerialReap(t *testing.T) { -// app := counter.NewApplication(true) -// app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"}) -// cc := proxy.NewLocalClientCreator(app) - -// mempool, cleanup := newMempoolWithApp(cc) -// defer cleanup() - -// appConnCon, _ := cc.NewABCIClient() -// appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) -// err := appConnCon.Start() -// require.Nil(t, err) - -// cacheMap := make(map[string]struct{}) -// deliverTxsRange := func(start, end int) { -// // Deliver some txs. -// for i := start; i < end; i++ { - -// // This will succeed -// txBytes := make([]byte, 8) -// binary.BigEndian.PutUint64(txBytes, uint64(i)) -// err := mempool.CheckTx(txBytes, nil, TxInfo{}) -// _, cached := cacheMap[string(txBytes)] -// if cached { -// require.NotNil(t, err, "expected error for cached tx") -// } else { -// require.Nil(t, err, "expected no err for uncached tx") -// } -// cacheMap[string(txBytes)] = struct{}{} - -// // Duplicates are cached and should return error -// err = mempool.CheckTx(txBytes, nil, TxInfo{}) -// require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") -// } -// } - -// reapCheck := func(exp int) { -// txs := mempool.ReapMaxBytesMaxGas(-1, -1) -// require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs))) -// } - -// updateRange := func(start, end int) { -// txs := make([]types.Tx, 0) -// for i := start; i < end; i++ { -// txBytes := make([]byte, 8) -// binary.BigEndian.PutUint64(txBytes, uint64(i)) -// txs = append(txs, txBytes) -// } -// if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { -// t.Error(err) -// } -// } - -// commitRange := func(start, end int) { -// // Deliver some txs. -// for i := start; i < end; i++ { -// txBytes := make([]byte, 8) -// binary.BigEndian.PutUint64(txBytes, uint64(i)) -// res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes}) -// if err != nil { -// t.Errorf("client error committing tx: %v", err) -// } -// if res.IsErr() { -// t.Errorf("error committing tx. Code:%v result:%X log:%v", -// res.Code, res.Data, res.Log) -// } -// } -// res, err := appConnCon.CommitSync() -// if err != nil { -// t.Errorf("client error committing: %v", err) -// } -// if len(res.Data) != 8 { -// t.Errorf("error committing. Hash:%X", res.Data) -// } -// } - -// //---------------------------------------- - -// // Deliver some txs. -// deliverTxsRange(0, 100) - -// // Reap the txs. -// reapCheck(100) - -// // Reap again. We should get the same amount -// reapCheck(100) - -// // Deliver 0 to 999, we should reap 900 new txs -// // because 100 were already counted. -// deliverTxsRange(0, 1000) - -// // Reap the txs. -// reapCheck(1000) - -// // Reap again. We should get the same amount -// reapCheck(1000) - -// // Commit from the conensus AppConn -// commitRange(0, 500) -// updateRange(0, 500) - -// // We should have 500 left. -// reapCheck(500) - -// // Deliver 100 invalid txs and 100 valid txs -// deliverTxsRange(900, 1100) - -// // We should have 600 now. -// reapCheck(600) -// } - -// func TestMempoolCloseWAL(t *testing.T) { -// // 1. Create the temporary directory for mempool and WAL testing. -// rootDir, err := ioutil.TempDir("", "mempool-test") -// require.Nil(t, err, "expecting successful tmpdir creation") - -// // 2. Ensure that it doesn't contain any elements -- Sanity check -// m1, err := filepath.Glob(filepath.Join(rootDir, "*")) -// require.Nil(t, err, "successful globbing expected") -// require.Equal(t, 0, len(m1), "no matches yet") - -// // 3. Create the mempool -// wcfg := cfg.DefaultConfig() -// wcfg.Mempool.RootDir = rootDir -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// mempool, cleanup := newMempoolWithAppAndConfig(cc, wcfg) -// defer cleanup() -// mempool.height = 10 -// err = mempool.InitWAL() -// require.NoError(t, err) - -// // 4. Ensure that the directory contains the WAL file -// m2, err := filepath.Glob(filepath.Join(rootDir, "*")) -// require.Nil(t, err, "successful globbing expected") -// require.Equal(t, 1, len(m2), "expecting the wal match in") - -// // 5. Write some contents to the WAL -// err = mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{}) -// require.NoError(t, err) -// walFilepath := mempool.wal.Path -// sum1 := checksumFile(walFilepath, t) - -// // 6. Sanity check to ensure that the written TX matches the expectation. -// require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written") - -// // 7. Invoke CloseWAL() and ensure it discards the -// // WAL thus any other write won't go through. -// mempool.CloseWAL() -// err = mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{}) -// require.NoError(t, err) -// sum2 := checksumFile(walFilepath, t) -// require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded") - -// // 8. Sanity check to ensure that the WAL file still exists -// m3, err := filepath.Glob(filepath.Join(rootDir, "*")) -// require.Nil(t, err, "successful globbing expected") -// require.Equal(t, 1, len(m3), "expecting the wal match in") -// } - -// func TestMempool_CheckTxChecksTxSize(t *testing.T) { -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// mempl, cleanup := newMempoolWithApp(cc) -// defer cleanup() - -// maxTxSize := mempl.config.MaxTxBytes - -// testCases := []struct { -// len int -// err bool -// }{ -// // check small txs. no error -// 0: {10, false}, -// 1: {1000, false}, -// 2: {1000000, false}, - -// // check around maxTxSize -// 3: {maxTxSize - 1, false}, -// 4: {maxTxSize, false}, -// 5: {maxTxSize + 1, true}, -// } - -// for i, testCase := range testCases { -// caseString := fmt.Sprintf("case %d, len %d", i, testCase.len) - -// tx := tmrand.Bytes(testCase.len) - -// err := mempl.CheckTx(tx, nil, TxInfo{}) -// bv := gogotypes.BytesValue{Value: tx} -// bz, err2 := bv.Marshal() -// require.NoError(t, err2) -// require.Equal(t, len(bz), proto.Size(&bv), caseString) - -// if !testCase.err { -// require.NoError(t, err, caseString) -// } else { -// require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString) -// } -// } -// } - -// func TestMempoolTxsBytes(t *testing.T) { -// app := kvstore.NewApplication() -// cc := proxy.NewLocalClientCreator(app) -// config := cfg.ResetTestRoot("mempool_test") -// config.Mempool.MaxTxsBytes = 10 -// mempool, cleanup := newMempoolWithAppAndConfig(cc, config) -// defer cleanup() - -// // 1. zero by default -// assert.EqualValues(t, 0, mempool.TxsBytes()) - -// // 2. len(tx) after CheckTx -// err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) -// require.NoError(t, err) -// assert.EqualValues(t, 1, mempool.TxsBytes()) - -// // 3. zero again after tx is removed by Update -// err = mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) -// require.NoError(t, err) -// assert.EqualValues(t, 0, mempool.TxsBytes()) - -// // 4. zero after Flush -// err = mempool.CheckTx([]byte{0x02, 0x03}, nil, TxInfo{}) -// require.NoError(t, err) -// assert.EqualValues(t, 2, mempool.TxsBytes()) - -// mempool.Flush() -// assert.EqualValues(t, 0, mempool.TxsBytes()) - -// // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. -// err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil, TxInfo{}) -// require.NoError(t, err) -// err = mempool.CheckTx([]byte{0x05}, nil, TxInfo{}) -// if assert.Error(t, err) { -// assert.IsType(t, ErrMempoolIsFull{}, err) -// } - -// // 6. zero after tx is rechecked and removed due to not being valid anymore -// app2 := counter.NewApplication(true) -// cc = proxy.NewLocalClientCreator(app2) -// mempool, cleanup = newMempoolWithApp(cc) -// defer cleanup() - -// txBytes := make([]byte, 8) -// binary.BigEndian.PutUint64(txBytes, uint64(0)) - -// err = mempool.CheckTx(txBytes, nil, TxInfo{}) -// require.NoError(t, err) -// assert.EqualValues(t, 8, mempool.TxsBytes()) - -// appConnCon, _ := cc.NewABCIClient() -// appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) -// err = appConnCon.Start() -// require.Nil(t, err) -// t.Cleanup(func() { -// if err := appConnCon.Stop(); err != nil { -// t.Error(err) -// } -// }) -// res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes}) -// require.NoError(t, err) -// require.EqualValues(t, 0, res.Code) -// res2, err := appConnCon.CommitSync() -// require.NoError(t, err) -// require.NotEmpty(t, res2.Data) - -// // Pretend like we committed nothing so txBytes gets rechecked and removed. -// err = mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) -// require.NoError(t, err) -// assert.EqualValues(t, 0, mempool.TxsBytes()) - -// // 7. Test RemoveTxByKey function -// err = mempool.CheckTx([]byte{0x06}, nil, TxInfo{}) -// require.NoError(t, err) -// assert.EqualValues(t, 1, mempool.TxsBytes()) -// mempool.RemoveTxByKey(TxKey([]byte{0x07}), true) -// assert.EqualValues(t, 1, mempool.TxsBytes()) -// mempool.RemoveTxByKey(TxKey([]byte{0x06}), true) -// assert.EqualValues(t, 0, mempool.TxsBytes()) - -// } - -// // This will non-deterministically catch some concurrency failures like -// // https://github.com/tendermint/tendermint/issues/3509 -// // TODO: all of the tests should probably also run using the remote proxy app -// // since otherwise we're not actually testing the concurrency of the mempool here! -// func TestMempoolRemoteAppConcurrency(t *testing.T) { -// sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) -// app := kvstore.NewApplication() -// cc, server := newRemoteApp(t, sockPath, app) -// t.Cleanup(func() { -// if err := server.Stop(); err != nil { -// t.Error(err) -// } -// }) -// config := cfg.ResetTestRoot("mempool_test") -// mempool, cleanup := newMempoolWithAppAndConfig(cc, config) -// defer cleanup() - -// // generate small number of txs -// nTxs := 10 -// txLen := 200 -// txs := make([]types.Tx, nTxs) -// for i := 0; i < nTxs; i++ { -// txs[i] = tmrand.Bytes(txLen) -// } - -// // simulate a group of peers sending them over and over -// N := config.Mempool.Size -// maxPeers := 5 -// for i := 0; i < N; i++ { -// peerID := mrand.Intn(maxPeers) -// txNum := mrand.Intn(nTxs) -// tx := txs[txNum] - -// // this will err with ErrTxInCache many times ... -// mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)}) //nolint: errcheck // will error -// } -// err := mempool.FlushAppConn() -// require.NoError(t, err) -// } - -// // caller must close server -// func newRemoteApp( -// t *testing.T, -// addr string, -// app abci.Application, -// ) ( -// clientCreator proxy.ClientCreator, -// server service.Service, -// ) { -// clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true) - -// // Start server -// server = abciserver.NewSocketServer(addr, app) -// server.SetLogger(log.TestingLogger().With("module", "abci-server")) -// if err := server.Start(); err != nil { -// t.Fatalf("Error starting socket server: %v", err.Error()) -// } -// return clientCreator, server -// } -// func checksumIt(data []byte) string { -// h := sha256.New() -// h.Write(data) -// return fmt.Sprintf("%x", h.Sum(nil)) -// } - -// func checksumFile(p string, t *testing.T) string { -// data, err := ioutil.ReadFile(p) -// require.Nil(t, err, "expecting successful read of %q", p) -// return checksumIt(data) -// } - -// func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx { -// responses := make([]*abci.ResponseDeliverTx, 0, n) -// for i := 0; i < n; i++ { -// responses = append(responses, &abci.ResponseDeliverTx{Code: code}) -// } -// return responses -// } diff --git a/mempool_bak/doc.go b/mempool_bak/doc.go deleted file mode 100644 index 7e6363e12..000000000 --- a/mempool_bak/doc.go +++ /dev/null @@ -1,23 +0,0 @@ -// The mempool pushes new txs onto the proxyAppConn. -// It gets a stream of (req, res) tuples from the proxy. -// The mempool stores good txs in a concurrent linked-list. - -// Multiple concurrent go-routines can traverse this linked-list -// safely by calling .NextWait() on each element. - -// So we have several go-routines: -// 1. Consensus calling Update() and ReapMaxBytesMaxGas() synchronously -// 2. Many mempool reactor's peer routines calling CheckTx() -// 3. Many mempool reactor's peer routines traversing the txs linked list - -// To manage these goroutines, there are three methods of locking. -// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) -// 2. Mutations to the linked-list elements are atomic -// 3. CheckTx() and/or ReapMaxBytesMaxGas() calls can be paused upon Update(), protected by .updateMtx - -// Garbage collection of old elements from mempool.txs is handlde via the -// DetachPrev() call, which makes old elements not reachable by peer -// broadcastTxRoutine(). - -// TODO: Better handle abci client errors. (make it automatically handle connection errors) -package mempool diff --git a/mempool_bak/errors.go b/mempool_bak/errors.go deleted file mode 100644 index e33e14ca3..000000000 --- a/mempool_bak/errors.go +++ /dev/null @@ -1,52 +0,0 @@ -package mempool - -import ( - "errors" - "fmt" -) - -var ( - // ErrTxInCache is returned to the client if we saw tx earlier - ErrTxInCache = errors.New("tx already exists in cache") -) - -// ErrTxTooLarge means the tx is too big to be sent in a message to other peers -type ErrTxTooLarge struct { - max int - actual int -} - -func (e ErrTxTooLarge) Error() string { - return fmt.Sprintf("Tx too large. Max size is %d, but got %d", e.max, e.actual) -} - -// ErrMempoolIsFull means Tendermint & an application can't handle that much load -type ErrMempoolIsFull struct { - numTxs int - maxTxs int - - txsBytes int64 - maxTxsBytes int64 -} - -func (e ErrMempoolIsFull) Error() string { - return fmt.Sprintf( - "mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", - e.numTxs, e.maxTxs, - e.txsBytes, e.maxTxsBytes) -} - -// ErrPreCheck is returned when tx is too big -type ErrPreCheck struct { - Reason error -} - -func (e ErrPreCheck) Error() string { - return e.Reason.Error() -} - -// IsPreCheckError returns true if err is due to pre check failure. -func IsPreCheckError(err error) bool { - _, ok := err.(ErrPreCheck) - return ok -} diff --git a/mempool_bak/mempool.go b/mempool_bak/mempool.go deleted file mode 100644 index eece0cf1b..000000000 --- a/mempool_bak/mempool.go +++ /dev/null @@ -1,127 +0,0 @@ -package mempool - -import ( - "fmt" - - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -// Mempool defines the mempool interface. -// -// Updates to the mempool need to be synchronized with committing a block so -// apps can reset their transient state on Commit. -type Mempool interface { - // CheckTx executes a new transaction against the application to determine - // its validity and whether it should be added to the mempool. - CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error - - // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes - // bytes total with the condition that the total gasWanted must be less than - // maxGas. - // If both maxes are negative, there is no cap on the size of all returned - // transactions (~ all available transactions). - ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs - - // ReapMaxTxs reaps up to max transactions from the mempool. - // If max is negative, there is no cap on the size of all returned - // transactions (~ all available transactions). - ReapMaxTxs(max int) types.Txs - - // Lock locks the mempool. The consensus must be able to hold lock to safely update. - Lock() - - // Unlock unlocks the mempool. - Unlock() - - // Update informs the mempool that the given txs were committed and can be discarded. - // NOTE: this should be called *after* block is committed by consensus. - // NOTE: Lock/Unlock must be managed by caller - Update( - blockHeight int64, - blockTxs types.Txs, - deliverTxResponses []*abci.ResponseDeliverTx, - newPreFn PreCheckFunc, - newPostFn PostCheckFunc, - ) error - - // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are - // done. E.g. from CheckTx. - // NOTE: Lock/Unlock must be managed by caller - FlushAppConn() error - - // Flush removes all transactions from the mempool and cache - Flush() - - // TxsAvailable returns a channel which fires once for every height, - // and only when transactions are available in the mempool. - // NOTE: the returned channel may be nil if EnableTxsAvailable was not called. - TxsAvailable() <-chan struct{} - - // EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will - // trigger once every height when transactions are available. - EnableTxsAvailable() - - // Size returns the number of transactions in the mempool. - Size() int - - // SizeBytes returns the total size of all txs in the mempool. - SizeBytes() int64 -} - -//-------------------------------------------------------------------------------- - -// PreCheckFunc is an optional filter executed before CheckTx and rejects -// transaction if false is returned. An example would be to ensure that a -// transaction doesn't exceeded the block size. -type PreCheckFunc func(types.Tx) error - -// PostCheckFunc is an optional filter executed after CheckTx and rejects -// transaction if false is returned. An example would be to ensure a -// transaction doesn't require more gas than available for the block. -type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error - -// TxInfo are parameters that get passed when attempting to add a tx to the -// mempool. -type TxInfo struct { - // SenderID is the internal peer ID used in the mempool to identify the - // sender, storing 2 bytes with each tx instead of 20 bytes for the p2p.ID. - SenderID uint16 - // SenderP2PID is the actual p2p.ID of the sender, used e.g. for logging. - SenderP2PID p2p.ID -} - -//-------------------------------------------------------------------------------- - -// PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes. -func PreCheckMaxBytes(maxBytes int64) PreCheckFunc { - return func(tx types.Tx) error { - txSize := types.ComputeProtoSizeForTxs([]types.Tx{tx}) - - if txSize > maxBytes { - return fmt.Errorf("tx size is too big: %d, max: %d", - txSize, maxBytes) - } - return nil - } -} - -// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed -// maxGas. Returns nil if maxGas is -1. -func PostCheckMaxGas(maxGas int64) PostCheckFunc { - return func(tx types.Tx, res *abci.ResponseCheckTx) error { - if maxGas == -1 { - return nil - } - if res.GasWanted < 0 { - return fmt.Errorf("gas wanted %d is negative", - res.GasWanted) - } - if res.GasWanted > maxGas { - return fmt.Errorf("gas wanted %d is greater than max gas %d", - res.GasWanted, maxGas) - } - return nil - } -} diff --git a/mempool_bak/metrics.go b/mempool_bak/metrics.go deleted file mode 100644 index 5d3022e80..000000000 --- a/mempool_bak/metrics.go +++ /dev/null @@ -1,108 +0,0 @@ -package mempool - -import ( - "github.com/go-kit/kit/metrics" - "github.com/go-kit/kit/metrics/discard" - "github.com/go-kit/kit/metrics/prometheus" - stdprometheus "github.com/prometheus/client_golang/prometheus" -) - -const ( - // MetricsSubsystem is a subsystem shared by all metrics exposed by this - // package. - MetricsSubsystem = "mempool" -) - -// Metrics contains metrics exposed by this package. -// see MetricsProvider for descriptions. -type Metrics struct { - // Size of the mempool. - Size metrics.Gauge - - // Histogram of transaction sizes, in bytes. - TxSizeBytes metrics.Histogram - - // Number of failed transactions. - FailedTxs metrics.Counter - - // RejectedTxs defines the number of rejected transactions. These are - // transactions that passed CheckTx but failed to make it into the mempool - // due to resource limits, e.g. mempool is full and no lower priority - // transactions exist in the mempool. - RejectedTxs metrics.Counter - - // EvictedTxs defines the number of evicted transactions. These are valid - // transactions that passed CheckTx and existed in the mempool but were later - // evicted to make room for higher priority valid transactions that passed - // CheckTx. - EvictedTxs metrics.Counter - - // Number of times transactions are rechecked in the mempool. - RecheckTimes metrics.Counter -} - -// PrometheusMetrics returns Metrics build using Prometheus client library. -// Optionally, labels can be provided along with their values ("foo", -// "fooValue"). -func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { - labels := []string{} - for i := 0; i < len(labelsAndValues); i += 2 { - labels = append(labels, labelsAndValues[i]) - } - return &Metrics{ - Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "size", - Help: "Size of the mempool (number of uncommitted transactions).", - }, labels).With(labelsAndValues...), - - TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "tx_size_bytes", - Help: "Transaction sizes in bytes.", - Buckets: stdprometheus.ExponentialBuckets(1, 3, 17), - }, labels).With(labelsAndValues...), - - FailedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "failed_txs", - Help: "Number of failed transactions.", - }, labels).With(labelsAndValues...), - - RejectedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "rejected_txs", - Help: "Number of rejected transactions.", - }, labels).With(labelsAndValues...), - - EvictedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "evicted_txs", - Help: "Number of evicted transactions.", - }, labels).With(labelsAndValues...), - - RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "recheck_times", - Help: "Number of times transactions are rechecked in the mempool.", - }, labels).With(labelsAndValues...), - } -} - -// NopMetrics returns no-op Metrics. -func NopMetrics() *Metrics { - return &Metrics{ - Size: discard.NewGauge(), - TxSizeBytes: discard.NewHistogram(), - FailedTxs: discard.NewCounter(), - RejectedTxs: discard.NewCounter(), - EvictedTxs: discard.NewCounter(), - RecheckTimes: discard.NewCounter(), - } -} diff --git a/mempool_bak/mock/mempool.go b/mempool_bak/mock/mempool.go deleted file mode 100644 index 95c33b7ec..000000000 --- a/mempool_bak/mock/mempool.go +++ /dev/null @@ -1,48 +0,0 @@ -package mock - -import ( - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/clist" - mempl "github.com/tendermint/tendermint/mempool" - "github.com/tendermint/tendermint/types" -) - -// Mempool is an empty implementation of a Mempool, useful for testing. -type Mempool struct{} - -var _ mempl.Mempool = Mempool{} - -func (Mempool) Lock() {} -func (Mempool) Unlock() {} -func (Mempool) Size() int { return 0 } -func (Mempool) SizeBytes() int64 { return 0 } -func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { - return nil -} -func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } -func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } -func (Mempool) Update( - _ int64, - _ types.Txs, - _ []*abci.ResponseDeliverTx, - _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, -) error { - return nil -} -func (Mempool) Flush() {} -func (Mempool) FlushAppConn() error { return nil } -func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } -func (Mempool) EnableTxsAvailable() {} -func (Mempool) TxsBytes() int64 { return 0 } - -func (Mempool) TxsFront() *clist.CElement { return nil } -func (Mempool) TxsWaitChan() <-chan struct{} { return nil } - -func (Mempool) InitWAL() error { return nil } -func (Mempool) CloseWAL() {} - -// RemoveTxByKey implements mempool.Mempool -func (Mempool) RemoveTxByKey(txKey types.TxKey) error { - return nil -} diff --git a/mempool_bak/priority_queue.go b/mempool_bak/priority_queue.go deleted file mode 100644 index f381f23e8..000000000 --- a/mempool_bak/priority_queue.go +++ /dev/null @@ -1,159 +0,0 @@ -package mempool - -import ( - "container/heap" - "sort" - - tmsync "github.com/tendermint/tendermint/libs/sync" -) - -var _ heap.Interface = (*TxPriorityQueue)(nil) - -// TxPriorityQueue defines a thread-safe priority queue for valid transactions. -type TxPriorityQueue struct { - mtx tmsync.RWMutex - txs []*WrappedTx -} - -func NewTxPriorityQueue() *TxPriorityQueue { - pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), - } - - heap.Init(pq) - - return pq -} - -// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be -// evicted to make room for another *WrappedTx with higher priority. If no such -// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx -// indicate that these transactions can be removed due to them being of lower -// priority and that their total sum in size allows room for the incoming -// transaction according to the mempool's configured limits. -func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - txs := make([]*WrappedTx, len(pq.txs)) - copy(txs, pq.txs) - - sort.Slice(txs, func(i, j int) bool { - return txs[i].priority < txs[j].priority - }) - - var ( - toEvict []*WrappedTx - i int - ) - - currSize := totalSize - - // Loop over all transactions in ascending priority order evaluating those - // that are only of less priority than the provided argument. We continue - // evaluating transactions until there is sufficient capacity for the new - // transaction (size) as defined by txSize. - for i < len(txs) && txs[i].priority < priority { - toEvict = append(toEvict, txs[i]) - currSize -= int64(txs[i].Size()) - - if currSize+txSize <= cap { - return toEvict - } - - i++ - } - - return nil -} - -// NumTxs returns the number of transactions in the priority queue. It is -// thread safe. -func (pq *TxPriorityQueue) NumTxs() int { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - return len(pq.txs) -} - -// RemoveTx removes a specific transaction from the priority queue. -func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) - } -} - -// PushTx adds a valid transaction to the priority queue. It is thread safe. -func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - heap.Push(pq, tx) -} - -// PopTx removes the top priority transaction from the queue. It is thread safe. -func (pq *TxPriorityQueue) PopTx() *WrappedTx { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - x := heap.Pop(pq) - if x != nil { - return x.(*WrappedTx) - } - - return nil -} - -// Push implements the Heap interface. -// -// NOTE: A caller should never call Push. Use PushTx instead. -func (pq *TxPriorityQueue) Push(x interface{}) { - n := len(pq.txs) - item := x.(*WrappedTx) - item.heapIndex = n - pq.txs = append(pq.txs, item) -} - -// Pop implements the Heap interface. -// -// NOTE: A caller should never call Pop. Use PopTx instead. -func (pq *TxPriorityQueue) Pop() interface{} { - old := pq.txs - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.heapIndex = -1 // for safety - pq.txs = old[0 : n-1] - return item -} - -// Len implements the Heap interface. -// -// NOTE: A caller should never call Len. Use NumTxs instead. -func (pq *TxPriorityQueue) Len() int { - return len(pq.txs) -} - -// Less implements the Heap interface. It returns true if the transaction at -// position i in the queue is of less priority than the transaction at position j. -func (pq *TxPriorityQueue) Less(i, j int) bool { - // If there exists two transactions with the same priority, consider the one - // that we saw the earliest as the higher priority transaction. - if pq.txs[i].priority == pq.txs[j].priority { - return pq.txs[i].timestamp.Before(pq.txs[j].timestamp) - } - - // We want Pop to give us the highest, not lowest, priority so we use greater - // than here. - return pq.txs[i].priority > pq.txs[j].priority -} - -// Swap implements the Heap interface. It swaps two transactions in the queue. -func (pq *TxPriorityQueue) Swap(i, j int) { - pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] - pq.txs[i].heapIndex = i - pq.txs[j].heapIndex = j -} diff --git a/mempool_bak/reactor.go b/mempool_bak/reactor.go deleted file mode 100644 index f3ea8dc06..000000000 --- a/mempool_bak/reactor.go +++ /dev/null @@ -1,323 +0,0 @@ -package mempool - -import ( - "errors" - "fmt" - "math" - "time" - - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/clist" - "github.com/tendermint/tendermint/libs/log" - tmsync "github.com/tendermint/tendermint/libs/sync" - "github.com/tendermint/tendermint/p2p" - protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" - "github.com/tendermint/tendermint/types" -) - -const ( - MempoolChannel = byte(0x30) - - peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount - - // UnknownPeerID is the peer ID to use when running CheckTx when there is - // no peer (e.g. RPC) - UnknownPeerID uint16 = 0 - - maxActiveIDs = math.MaxUint16 -) - -// Reactor handles mempool tx broadcasting amongst peers. -// It maintains a map from peer ID to counter, to prevent gossiping txs to the -// peers you received it from. -type Reactor struct { - p2p.BaseReactor - config *cfg.MempoolConfig - mempool *TxMempool - ids *mempoolIDs -} - -type mempoolIDs struct { - mtx tmsync.RWMutex - peerMap map[p2p.ID]uint16 - nextID uint16 // assumes that a node will never have over 65536 active peers - activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter -} - -// Reserve searches for the next unused ID and assigns it to the -// peer. -func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - curID := ids.nextPeerID() - ids.peerMap[peer.ID()] = curID - ids.activeIDs[curID] = struct{}{} -} - -// nextPeerID returns the next unused peer ID to use. -// This assumes that ids's mutex is already locked. -func (ids *mempoolIDs) nextPeerID() uint16 { - if len(ids.activeIDs) == maxActiveIDs { - panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) - } - - _, idExists := ids.activeIDs[ids.nextID] - for idExists { - ids.nextID++ - _, idExists = ids.activeIDs[ids.nextID] - } - curID := ids.nextID - ids.nextID++ - return curID -} - -// Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - removedID, ok := ids.peerMap[peer.ID()] - if ok { - delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer.ID()) - } -} - -// GetForPeer returns an ID reserved for the peer. -func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { - ids.mtx.RLock() - defer ids.mtx.RUnlock() - - return ids.peerMap[peer.ID()] -} - -func newMempoolIDs() *mempoolIDs { - return &mempoolIDs{ - peerMap: make(map[p2p.ID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - } -} - -// NewReactor returns a new Reactor with the given config and mempool. -func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor { - memR := &Reactor{ - config: config, - mempool: mempool, - ids: newMempoolIDs(), - } - memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) - return memR -} - -// InitPeer implements Reactor by creating a state for the peer. -func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { - memR.ids.ReserveForPeer(peer) - return peer -} - -// SetLogger sets the Logger on the reactor and the underlying mempool. -func (memR *Reactor) SetLogger(l log.Logger) { - memR.Logger = l - memR.mempool.SetLogger(l) -} - -// OnStart implements p2p.BaseReactor. -func (memR *Reactor) OnStart() error { - if !memR.config.Broadcast { - memR.Logger.Info("Tx broadcasting is disabled") - } - return nil -} - -// GetChannels implements Reactor by returning the list of channels for this -// reactor. -func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - largestTx := make([]byte, memR.config.MaxTxBytes) - batchMsg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, - }, - } - - return []*p2p.ChannelDescriptor{ - { - ID: MempoolChannel, - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - }, - } -} - -// AddPeer implements Reactor. -// It starts a broadcast routine ensuring all txs are forwarded to the given peer. -func (memR *Reactor) AddPeer(peer p2p.Peer) { - if memR.config.Broadcast { - go memR.broadcastTxRoutine(peer) - } -} - -// RemovePeer implements Reactor. -func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { - memR.ids.Reclaim(peer) - // broadcast routine checks if peer is gone and returns -} - -// Receive implements Reactor. -// It adds any received transactions to the mempool. -func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := memR.decodeMsg(msgBytes) - if err != nil { - memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - memR.Switch.StopPeerForError(src, err) - return - } - memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - - txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)} - if src != nil { - txInfo.SenderP2PID = src.ID() - } - for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) - if err == ErrTxInCache { - memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) - } - } - // broadcasting happens from go routines per peer -} - -// PeerState describes the state of a peer. -type PeerState interface { - GetHeight() int64 -} - -// Send new mempool txs to peer. -func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { - peerID := memR.ids.GetForPeer(peer) - var next *clist.CElement - - for { - // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time - if !memR.IsRunning() || !peer.IsRunning() { - return - } - - // This happens because the CElement we were looking at got garbage - // collected (removed). That is, .NextWait() returned nil. Go ahead and - // start from the beginning. - if next == nil { - select { - case <-memR.mempool.WaitForNextTx(): // Wait until a tx is available - if next = memR.mempool.NextGossipTx(); next == nil { - continue - } - - case <-peer.Quit(): - return - - case <-memR.Quit(): - return - } - } - - // Make sure the peer is up to date. - peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - // Peer does not have a state yet. We set it in the consensus reactor, but - // when we add peer in Switch, the order we call reactors#AddPeer is - // different every time due to us using a map. Sometimes other reactors - // will be initialized before the consensus reactor. We should wait a few - // milliseconds and retry. - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - - // Allow for a lag of 1 block. - memTx := next.Value.(*WrappedTx) - if peerState.GetHeight() < memTx.height-1 { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - - // NOTE: Transaction batching was disabled due to - // https://github.com/tendermint/tendermint/issues/5796 - if ok := memR.mempool.txStore.TxHasPeer(memTx.hash, peerID); !ok { - msg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, - }, - } - - bz, err := msg.Marshal() - if err != nil { - panic(err) - } - - success := peer.Send(MempoolChannel, bz) - if !success { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - } - - select { - case <-next.NextWaitChan(): - // see the start of the for loop for nil check - next = next.Next() - - case <-peer.Quit(): - return - - case <-memR.Quit(): - return - } - } -} - -//----------------------------------------------------------------------------- -// Messages - -func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) { - msg := protomem.Message{} - err := msg.Unmarshal(bz) - if err != nil { - return TxsMessage{}, err - } - - var message TxsMessage - - if i, ok := msg.Sum.(*protomem.Message_Txs); ok { - txs := i.Txs.GetTxs() - - if len(txs) == 0 { - return message, errors.New("empty TxsMessage") - } - - decoded := make([]types.Tx, len(txs)) - for j, tx := range txs { - decoded[j] = types.Tx(tx) - } - - message = TxsMessage{ - Txs: decoded, - } - return message, nil - } - return message, fmt.Errorf("msg type: %T is not supported", msg) -} - -//------------------------------------- - -// TxsMessage is a Message containing transactions. -type TxsMessage struct { - Txs []types.Tx -} - -// String returns a string representation of the TxsMessage. -func (m *TxsMessage) String() string { - return fmt.Sprintf("[TxsMessage %v]", m.Txs) -} diff --git a/mempool_bak/reactor_test.go b/mempool_bak/reactor_test.go deleted file mode 100644 index de7945f1c..000000000 --- a/mempool_bak/reactor_test.go +++ /dev/null @@ -1,387 +0,0 @@ -package mempool - -import ( - "encoding/hex" - "errors" - "net" - "sync" - "testing" - "time" - - "github.com/fortytw2/leaktest" - "github.com/go-kit/log/term" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/abci/example/kvstore" - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/p2p/mock" - memproto "github.com/tendermint/tendermint/proto/tendermint/mempool" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" -) - -const ( - numTxs = 1000 - timeout = 120 * time.Second // ridiculously high because CircleCI is slow -) - -type peerState struct { - height int64 -} - -func (ps peerState) GetHeight() int64 { - return ps.height -} - -// Send a bunch of txs to the first reactor's mempool and wait for them all to -// be received in the others. -func TestReactorBroadcastTxsMessage(t *testing.T) { - config := cfg.TestConfig() - // if there were more than two reactors, the order of transactions could not be - // asserted in waitForTxsOnReactors (due to transactions gossiping). If we - // replace Connect2Switches (full mesh) with a func, which connects first - // reactor to others and nothing else, this test should also pass with >2 reactors. - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } - } - - txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID) - waitForTxsOnReactors(t, txs, reactors) -} - -// // regression test for https://github.com/tendermint/tendermint/issues/5408 -// func TestReactorConcurrency(t *testing.T) { -// config := cfg.TestConfig() -// const N = 2 -// reactors := makeAndConnectReactors(config, N) -// defer func() { -// for _, r := range reactors { -// if err := r.Stop(); err != nil { -// assert.NoError(t, err) -// } -// } -// }() -// for _, r := range reactors { -// for _, peer := range r.Switch.Peers().List() { -// peer.Set(types.PeerStateKey, peerState{1}) -// } -// } -// var wg sync.WaitGroup - -// const numTxs = 5 - -// for i := 0; i < 1000; i++ { -// wg.Add(2) - -// // 1. submit a bunch of txs -// // 2. update the whole mempool -// txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID) -// go func() { -// defer wg.Done() - -// reactors[0].mempool.Lock() -// defer reactors[0].mempool.Unlock() - -// deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) -// for i := range txs { -// deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} -// } - -// err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil) -// assert.NoError(t, err) -// }() - -// // 1. submit a bunch of txs -// // 2. update none -// _ = checkTxs(t, reactors[1].mempool, numTxs, UnknownPeerID) -// go func() { -// defer wg.Done() - -// reactors[1].mempool.Lock() -// defer reactors[1].mempool.Unlock() -// err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) -// assert.NoError(t, err) -// }() - -// // 1. flush the mempool -// reactors[1].mempool.Flush() -// } - -// wg.Wait() -// } - -// Send a bunch of txs to the first reactor's mempool, claiming it came from peer -// ensure peer gets no txs. -func TestReactorNoBroadcastToSender(t *testing.T) { - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } - } - - const peerID = 1 - checkTxs(t, reactors[0].mempool, numTxs, peerID) - ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) -} - -// func TestReactor_MaxTxBytes(t *testing.T) { -// config := cfg.TestConfig() - -// const N = 2 -// reactors := makeAndConnectReactors(config, N) -// defer func() { -// for _, r := range reactors { -// if err := r.Stop(); err != nil { -// assert.NoError(t, err) -// } -// } -// }() -// for _, r := range reactors { -// for _, peer := range r.Switch.Peers().List() { -// peer.Set(types.PeerStateKey, peerState{1}) -// } -// } - -// // Broadcast a tx, which has the max size -// // => ensure it's received by the second reactor. -// tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) -// err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) -// require.NoError(t, err) -// waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) - -// reactors[0].mempool.Flush() -// reactors[1].mempool.Flush() - -// // Broadcast a tx, which is beyond the max size -// // => ensure it's not sent -// tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) -// err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) -// require.Error(t, err) -// } - -func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - - // stop peer - sw := reactors[1].Switch - sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) - - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when peer is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} - -func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - - // stop reactors - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when reactor is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} - -func TestMempoolIDsBasic(t *testing.T) { - ids := newMempoolIDs() - - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - - ids.ReserveForPeer(peer) - assert.EqualValues(t, 1, ids.GetForPeer(peer)) - ids.Reclaim(peer) - - ids.ReserveForPeer(peer) - assert.EqualValues(t, 2, ids.GetForPeer(peer)) - ids.Reclaim(peer) -} - -func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { - if testing.Short() { - return - } - - // 0 is already reserved for UnknownPeerID - ids := newMempoolIDs() - - for i := 0; i < maxActiveIDs-1; i++ { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - } - - assert.Panics(t, func() { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - }) -} - -func TestDontExhaustMaxActiveIDs(t *testing.T) { - config := cfg.TestConfig() - const N = 1 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - reactor := reactors[0] - - for i := 0; i < maxActiveIDs+1; i++ { - peer := mock.NewPeer(nil) - reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) - reactor.AddPeer(peer) - } -} - -// mempoolLogger is a TestingLogger which uses a different -// color for each validator ("validator" key must exist). -func mempoolLogger() log.Logger { - return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { - for i := 0; i < len(keyvals)-1; i += 2 { - if keyvals[i] == "validator" { - return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} - } - } - return term.FgBgColor{} - }) -} - -// connect N mempool reactors through N switches -func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { - reactors := make([]*Reactor, n) - logger := mempoolLogger() - for i := 0; i < n; i++ { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mempool, cleanup := newMempoolWithApp(cc) - defer cleanup() - - reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states - reactors[i].SetLogger(logger.With("validator", i)) - } - - p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("MEMPOOL", reactors[i]) - return s - - }, p2p.Connect2Switches) - return reactors -} - -func waitForTxsOnReactors(t *testing.T, txs []testTx, reactors []*Reactor) { - // wait for the txs in all mempools - wg := new(sync.WaitGroup) - for i, reactor := range reactors { - wg.Add(1) - go func(r *Reactor, reactorIndex int) { - defer wg.Done() - waitForTxsOnReactor(t, txs, r, reactorIndex) - }(reactor, i) - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - timer := time.After(timeout) - select { - case <-timer: - t.Fatal("Timed out waiting for txs") - case <-done: - } -} - -func waitForTxsOnReactor(t *testing.T, txs []testTx, reactor *Reactor, reactorIndex int) { - mempool := reactor.mempool - for mempool.Size() < len(txs) { - time.Sleep(time.Millisecond * 100) - } - - reapedTxs := mempool.ReapMaxTxs(len(txs)) - for i, tx := range txs { - assert.Equalf(t, tx, reapedTxs[i], - "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) - } -} - -// ensure no txs on reactor after some timeout -func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { - time.Sleep(timeout) // wait for the txs in all mempools - assert.Zero(t, reactor.mempool.Size()) -} - -func TestMempoolVectors(t *testing.T) { - testCases := []struct { - testName string - tx []byte - expBytes string - }{ - {"tx 1", []byte{123}, "0a030a017b"}, - {"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"}, - } - - for _, tc := range testCases { - tc := tc - - msg := memproto.Message{ - Sum: &memproto.Message_Txs{ - Txs: &memproto.Txs{Txs: [][]byte{tc.tx}}, - }, - } - bz, err := msg.Marshal() - require.NoError(t, err, tc.testName) - - require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) - } -} diff --git a/mempool_bak/tx.go b/mempool_bak/tx.go deleted file mode 100644 index f8d02f0e9..000000000 --- a/mempool_bak/tx.go +++ /dev/null @@ -1,281 +0,0 @@ -package mempool - -import ( - "sort" - "time" - - "github.com/tendermint/tendermint/libs/clist" - tmsync "github.com/tendermint/tendermint/libs/sync" - "github.com/tendermint/tendermint/types" -) - -// WrappedTx defines a wrapper around a raw transaction with additional metadata -// that is used for indexing. -type WrappedTx struct { - // tx represents the raw binary transaction data - tx types.Tx - - // hash defines the transaction hash and the primary key used in the mempool - hash types.TxKey - - // height defines the height at which the transaction was validated at - height int64 - - // gasWanted defines the amount of gas the transaction sender requires - gasWanted int64 - - // priority defines the transaction's priority as specified by the application - // in the ResponseCheckTx response. - priority int64 - - // sender defines the transaction's sender as specified by the application in - // the ResponseCheckTx response. - sender string - - // timestamp is the time at which the node first received the transaction from - // a peer. It is used as a second dimension is prioritizing transactions when - // two transactions have the same priority. - timestamp time.Time - - // peers records a mapping of all peers that sent a given transaction - peers map[uint16]struct{} - - // heapIndex defines the index of the item in the heap - heapIndex int - - // gossipEl references the linked-list element in the gossip index - gossipEl *clist.CElement - - // removed marks the transaction as removed from the mempool. This is set - // during RemoveTx and is needed due to the fact that a given existing - // transaction in the mempool can be evicted when it is simultaneously having - // a reCheckTx callback executed. - removed bool -} - -func (wtx *WrappedTx) Size() int { - return len(wtx.tx) -} - -// TxStore implements a thread-safe mapping of valid transaction(s). -// -// NOTE: -// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative -// access is not allowed. Regardless, it is not expected for the mempool to -// need mutative access. -type TxStore struct { - mtx tmsync.RWMutex - hashTxs map[types.TxKey]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application -} - -func NewTxStore() *TxStore { - return &TxStore{ - senderTxs: make(map[string]*WrappedTx), - hashTxs: make(map[types.TxKey]*WrappedTx), - } -} - -// Size returns the total number of transactions in the store. -func (txs *TxStore) Size() int { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return len(txs.hashTxs) -} - -// GetAllTxs returns all the transactions currently in the store. -func (txs *TxStore) GetAllTxs() []*WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wTxs := make([]*WrappedTx, len(txs.hashTxs)) - i := 0 - for _, wtx := range txs.hashTxs { - wTxs[i] = wtx - i++ - } - - return wTxs -} - -// GetTxBySender returns a *WrappedTx by the transaction's sender property -// defined by the ABCI application. -func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.senderTxs[sender] -} - -// GetTxByHash returns a *WrappedTx by the transaction's hash. -func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.hashTxs[hash] -} - -// IsTxRemoved returns true if a transaction by hash is marked as removed and -// false otherwise. -func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx, ok := txs.hashTxs[hash] - if ok { - return wtx.removed - } - - return false -} - -// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a -// non-empty sender, we additionally store the transaction by the sender as -// defined by the ABCI application. -func (txs *TxStore) SetTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - txs.senderTxs[wtx.sender] = wtx - } - - txs.hashTxs[wtx.tx.Key()] = wtx -} - -// RemoveTx removes a *WrappedTx from the transaction store. It deletes all -// indexes of the transaction. -func (txs *TxStore) RemoveTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - delete(txs.senderTxs, wtx.sender) - } - - delete(txs.hashTxs, wtx.tx.Key()) - wtx.removed = true -} - -// TxHasPeer returns true if a transaction by hash has a given peer ID and false -// otherwise. If the transaction does not exist, false is returned. -func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return false - } - - _, ok := wtx.peers[peerID] - return ok -} - -// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the -// given peerID to the WrappedTx's set of peers that sent us this transaction. -// We return true if we've already recorded the given peer for this transaction -// and false otherwise. If the transaction does not exist by hash, we return -// (nil, false). -func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return nil, false - } - - if wtx.peers == nil { - wtx.peers = make(map[uint16]struct{}) - } - - if _, ok := wtx.peers[peerID]; ok { - return wtx, true - } - - wtx.peers[peerID] = struct{}{} - return wtx, false -} - -// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be -// used to build generic transaction indexes in the mempool. It accepts a -// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx -// references which is used during Insert in order to determine sorted order. If -// less returns true, a <= b. -type WrappedTxList struct { - mtx tmsync.RWMutex - txs []*WrappedTx - less func(*WrappedTx, *WrappedTx) bool -} - -func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList { - return &WrappedTxList{ - txs: make([]*WrappedTx, 0), - less: less, - } -} - -// Size returns the number of WrappedTx objects in the list. -func (wtl *WrappedTxList) Size() int { - wtl.mtx.RLock() - defer wtl.mtx.RUnlock() - - return len(wtl.txs) -} - -// Reset resets the list of transactions to an empty list. -func (wtl *WrappedTxList) Reset() { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - wtl.txs = make([]*WrappedTx, 0) -} - -// Insert inserts a WrappedTx reference into the sorted list based on the list's -// comparator function. -func (wtl *WrappedTxList) Insert(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - if i == len(wtl.txs) { - // insert at the end - wtl.txs = append(wtl.txs, wtx) - return - } - - // Make space for the inserted element by shifting values at the insertion - // index up one index. - // - // NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs). - wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...) - wtl.txs[i] = wtx -} - -// Remove attempts to remove a WrappedTx from the sorted list. -func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - // Since the list is sorted, we evaluate all elements starting at i. Note, if - // the element does not exist, we may potentially evaluate the entire remainder - // of the list. However, a caller should not be expected to call Remove with a - // non-existing element. - for i < len(wtl.txs) { - if wtl.txs[i] == wtx { - wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...) - return - } - - i++ - } -} diff --git a/test/fuzz/mempool/v0/testdata/cases/empty b/test/fuzz/mempool/v0/testdata/cases/empty new file mode 100644 index 000000000..e69de29bb diff --git a/test/fuzz/mempool/v1/testdata/cases/empty b/test/fuzz/mempool/v1/testdata/cases/empty new file mode 100644 index 000000000..e69de29bb