Files
tendermint/internal/mempool/v1/mempool_test.go
Sam Kleinman 851d2e3bde mempool,rpc: add removetx rpc method (#7047)
Addresses one of the concerns with #7041.

Provides a mechanism (via the RPC interface) to delete a single transaction, described by its hash, from the mempool. The method returns an error if the transaction cannot be found. Once the transaction is removed it remains in the cache and cannot be resubmitted until the cache is cleared or it expires from the cache.
2021-10-05 20:23:15 +00:00

529 lines
14 KiB
Go

package v1
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/code"
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// application extends the KV store application by overriding CheckTx to provide
// transaction priority based on the value in the key/value pair.
type application struct {
*kvstore.Application
}
type testTx struct {
tx types.Tx
priority int64
}
func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
var (
priority int64
sender string
)
// infer the priority from the raw transaction value (sender=key=value)
parts := bytes.Split(req.Tx, []byte("="))
if len(parts) == 3 {
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
return abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}
}
priority = v
sender = string(parts[0])
} else {
return abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}
}
return abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Code: code.CodeTypeOK,
GasWanted: 1,
}
}
func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
t.Helper()
app := &application{kvstore.NewApplication()}
cc := abciclient.NewLocalCreator(app)
cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
cfg.Mempool.CacheSize = cacheSize
appConnMem, err := cc()
require.NoError(t, err)
require.NoError(t, appConnMem.Start())
t.Cleanup(func() {
os.RemoveAll(cfg.RootDir)
require.NoError(t, appConnMem.Stop())
})
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
}
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
txs := make([]testTx, numTxs)
txInfo := mempool.TxInfo{SenderID: peerID}
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numTxs; i++ {
prefix := make([]byte, 20)
_, err := rng.Read(prefix)
require.NoError(t, err)
priority := int64(rng.Intn(9999-1000) + 1000)
txs[i] = testTx{
tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)),
priority: priority,
}
require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo))
}
return txs
}
func TestTxMempool_TxsAvailable(t *testing.T) {
txmp := setup(t, 0)
txmp.EnableTxsAvailable()
ensureNoTxFire := func() {
timer := time.NewTimer(500 * time.Millisecond)
select {
case <-txmp.TxsAvailable():
require.Fail(t, "unexpected transactions event")
case <-timer.C:
}
}
ensureTxFire := func() {
timer := time.NewTimer(500 * time.Millisecond)
select {
case <-txmp.TxsAvailable():
case <-timer.C:
require.Fail(t, "expected transactions event")
}
}
// ensure no event as we have not executed any transactions yet
ensureNoTxFire()
// Execute CheckTx for some transactions and ensure TxsAvailable only fires
// once.
txs := checkTxs(t, txmp, 100, 0)
ensureTxFire()
ensureNoTxFire()
rawTxs := make([]types.Tx, len(txs))
for i, tx := range txs {
rawTxs[i] = tx.tx
}
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
// commit half the transactions and ensure we fire an event
txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
txmp.Unlock()
ensureTxFire()
ensureNoTxFire()
// Execute CheckTx for more transactions and ensure we do not fire another
// event as we're still on the same height (1).
_ = checkTxs(t, txmp, 100, 0)
ensureNoTxFire()
}
func TestTxMempool_Size(t *testing.T) {
txmp := setup(t, 0)
txs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
rawTxs := make([]types.Tx, len(txs))
for i, tx := range txs {
rawTxs[i] = tx.tx
}
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
txmp.Unlock()
require.Equal(t, len(rawTxs)/2, txmp.Size())
require.Equal(t, int64(2850), txmp.SizeBytes())
}
func TestTxMempool_Flush(t *testing.T) {
txmp := setup(t, 0)
txs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
rawTxs := make([]types.Tx, len(txs))
for i, tx := range txs {
rawTxs[i] = tx.tx
}
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
txmp.Unlock()
txmp.Flush()
require.Zero(t, txmp.Size())
require.Equal(t, int64(0), txmp.SizeBytes())
}
func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
txmp := setup(t, 0)
tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
txMap := make(map[types.TxKey]testTx)
priorities := make([]int64, len(tTxs))
for i, tTx := range tTxs {
txMap[tTx.tx.Key()] = tTx
priorities[i] = tTx.priority
}
sort.Slice(priorities, func(i, j int) bool {
// sort by priority, i.e. decreasing order
return priorities[i] > priorities[j]
})
ensurePrioritized := func(reapedTxs types.Txs) {
reapedPriorities := make([]int64, len(reapedTxs))
for i, rTx := range reapedTxs {
reapedPriorities[i] = txMap[rTx.Key()].priority
}
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
}
// reap by gas capacity only
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 50)
// reap by transaction bytes only
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.GreaterOrEqual(t, len(reapedTxs), 16)
// Reap by both transaction bytes and gas, where the size yields 31 reaped
// transactions and the gas limit reaps 25 transactions.
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 25)
}
func TestTxMempool_ReapMaxTxs(t *testing.T) {
txmp := setup(t, 0)
tTxs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
txMap := make(map[types.TxKey]testTx)
priorities := make([]int64, len(tTxs))
for i, tTx := range tTxs {
txMap[tTx.tx.Key()] = tTx
priorities[i] = tTx.priority
}
sort.Slice(priorities, func(i, j int) bool {
// sort by priority, i.e. decreasing order
return priorities[i] > priorities[j]
})
ensurePrioritized := func(reapedTxs types.Txs) {
reapedPriorities := make([]int64, len(reapedTxs))
for i, rTx := range reapedTxs {
reapedPriorities[i] = txMap[rTx.Key()].priority
}
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
}
// reap all transactions
reapedTxs := txmp.ReapMaxTxs(-1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs))
// reap a single transaction
reapedTxs = txmp.ReapMaxTxs(1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 1)
// reap half of the transactions
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs)/2)
}
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
txmp := setup(t, 0)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes+1)
_, err := rng.Read(tx)
require.NoError(t, err)
require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
tx = make([]byte, txmp.config.MaxTxBytes-1)
_, err = rng.Read(tx)
require.NoError(t, err)
require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
}
func TestTxMempool_CheckTxSamePeer(t *testing.T) {
txmp := setup(t, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
prefix := make([]byte, 20)
_, err := rng.Read(prefix)
require.NoError(t, err)
tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
}
func TestTxMempool_CheckTxSameSender(t *testing.T) {
txmp := setup(t, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
prefix1 := make([]byte, 20)
_, err := rng.Read(prefix1)
require.NoError(t, err)
prefix2 := make([]byte, 20)
_, err = rng.Read(prefix2)
require.NoError(t, err)
tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
require.NoError(t, txmp.CheckTx(context.Background(), tx1, nil, mempool.TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.Size())
require.NoError(t, txmp.CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.Size())
}
func TestTxMempool_ConcurrentTxs(t *testing.T) {
txmp := setup(t, 100)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
checkTxDone := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
_ = checkTxs(t, txmp, 100, 0)
dur := rng.Intn(1000-500) + 500
time.Sleep(time.Duration(dur) * time.Millisecond)
}
wg.Done()
close(checkTxDone)
}()
wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer wg.Done()
var height int64 = 1
for range ticker.C {
reapedTxs := txmp.ReapMaxTxs(200)
if len(reapedTxs) > 0 {
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
for i := 0; i < len(responses); i++ {
var code uint32
if i%10 == 0 {
code = 100
} else {
code = abci.CodeTypeOK
}
responses[i] = &abci.ResponseDeliverTx{Code: code}
}
txmp.Lock()
require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil))
txmp.Unlock()
height++
} else {
// only return once we know we finished the CheckTx loop
select {
case <-checkTxDone:
return
default:
}
}
}
}()
wg.Wait()
require.Zero(t, txmp.Size())
require.Zero(t, txmp.SizeBytes())
}
func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp := setup(t, 500)
txmp.height = 100
txmp.config.TTLNumBlocks = 10
tTxs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, 100, txmp.heightIndex.Size())
// reap 5 txs at the next height -- no txs should expire
reapedTxs := txmp.ReapMaxTxs(5)
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil))
txmp.Unlock()
require.Equal(t, 95, txmp.Size())
require.Equal(t, 95, txmp.heightIndex.Size())
// check more txs at height 101
_ = checkTxs(t, txmp, 50, 1)
require.Equal(t, 145, txmp.Size())
require.Equal(t, 145, txmp.heightIndex.Size())
// Reap 5 txs at a height that would expire all the transactions from before
// the previous Update (height 100).
//
// NOTE: When we reap txs below, we do not know if we're picking txs from the
// initial CheckTx calls or from the second round of CheckTx calls. Thus, we
// cannot guarantee that all 95 txs are remaining that should be expired and
// removed. However, we do know that that at most 95 txs can be expired and
// removed.
reapedTxs = txmp.ReapMaxTxs(5)
responses = make([]*abci.ResponseDeliverTx, len(reapedTxs))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil))
txmp.Unlock()
require.GreaterOrEqual(t, txmp.Size(), 45)
require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
}
func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
cases := []struct {
name string
err error
}{
{
name: "error",
err: errors.New("test error"),
},
{
name: "no error",
err: nil,
},
}
for _, tc := range cases {
testCase := tc
t.Run(testCase.name, func(t *testing.T) {
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
return testCase.err
}
txmp := setup(t, 0, WithPostCheck(postCheckFn))
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes-1)
_, err := rng.Read(tx)
require.NoError(t, err)
callback := func(res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
require.True(t, ok)
expectedErrString := ""
if testCase.err != nil {
expectedErrString = testCase.err.Error()
}
require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError)
}
require.NoError(t, txmp.CheckTx(context.Background(), tx, callback, mempool.TxInfo{SenderID: 0}))
})
}
}