Files
tendermint/internal/mempool/v1/mempool_test.go
William Banfield 84c15857e4 mempool: return mempool errors to the abci client (#6740)
This changes adds an `MempoolError` field to the `ResponseCheckTx`. This will allow clients to understand that their transaction was rejected from the mempool despite passing the ABCI check. 

This change also updates the code to make use of early returns to prevent highly nested code blocks. Namely, it returns when the type assertion fails at the beginning of the method, instead of wrapping the entire method in a large if statement. This has a somewhat large effect on the diff as rendered by github.

addresses: #3546
2021-07-22 14:52:29 +00:00

528 lines
14 KiB
Go

package v1
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/code"
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
// application extends the KV store application by overriding CheckTx to provide
// transaction priority based on the value in the key/value pair.
type application struct {
*kvstore.Application
}
type testTx struct {
tx types.Tx
priority int64
}
func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
var (
priority int64
sender string
)
// infer the priority from the raw transaction value (sender=key=value)
parts := bytes.Split(req.Tx, []byte("="))
if len(parts) == 3 {
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
return abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}
}
priority = v
sender = string(parts[0])
} else {
return abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}
}
return abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Code: code.CodeTypeOK,
GasWanted: 1,
}
}
func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
t.Helper()
app := &application{kvstore.NewApplication()}
cc := proxy.NewLocalClientCreator(app)
cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
cfg.Mempool.CacheSize = cacheSize
appConnMem, err := cc.NewABCIClient()
require.NoError(t, err)
require.NoError(t, appConnMem.Start())
t.Cleanup(func() {
os.RemoveAll(cfg.RootDir)
require.NoError(t, appConnMem.Stop())
})
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
}
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
txs := make([]testTx, numTxs)
txInfo := mempool.TxInfo{SenderID: peerID}
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numTxs; i++ {
prefix := make([]byte, 20)
_, err := rng.Read(prefix)
require.NoError(t, err)
priority := int64(rng.Intn(9999-1000) + 1000)
txs[i] = testTx{
tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)),
priority: priority,
}
require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo))
}
return txs
}
func TestTxMempool_TxsAvailable(t *testing.T) {
txmp := setup(t, 0)
txmp.EnableTxsAvailable()
ensureNoTxFire := func() {
timer := time.NewTimer(500 * time.Millisecond)
select {
case <-txmp.TxsAvailable():
require.Fail(t, "unexpected transactions event")
case <-timer.C:
}
}
ensureTxFire := func() {
timer := time.NewTimer(500 * time.Millisecond)
select {
case <-txmp.TxsAvailable():
case <-timer.C:
require.Fail(t, "expected transactions event")
}
}
// ensure no event as we have not executed any transactions yet
ensureNoTxFire()
// Execute CheckTx for some transactions and ensure TxsAvailable only fires
// once.
txs := checkTxs(t, txmp, 100, 0)
ensureTxFire()
ensureNoTxFire()
rawTxs := make([]types.Tx, len(txs))
for i, tx := range txs {
rawTxs[i] = tx.tx
}
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
// commit half the transactions and ensure we fire an event
txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
txmp.Unlock()
ensureTxFire()
ensureNoTxFire()
// Execute CheckTx for more transactions and ensure we do not fire another
// event as we're still on the same height (1).
_ = checkTxs(t, txmp, 100, 0)
ensureNoTxFire()
}
func TestTxMempool_Size(t *testing.T) {
txmp := setup(t, 0)
txs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
rawTxs := make([]types.Tx, len(txs))
for i, tx := range txs {
rawTxs[i] = tx.tx
}
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
txmp.Unlock()
require.Equal(t, len(rawTxs)/2, txmp.Size())
require.Equal(t, int64(2850), txmp.SizeBytes())
}
func TestTxMempool_Flush(t *testing.T) {
txmp := setup(t, 0)
txs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
rawTxs := make([]types.Tx, len(txs))
for i, tx := range txs {
rawTxs[i] = tx.tx
}
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
txmp.Unlock()
txmp.Flush()
require.Zero(t, txmp.Size())
require.Equal(t, int64(0), txmp.SizeBytes())
}
func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
txmp := setup(t, 0)
tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
txMap := make(map[[mempool.TxKeySize]byte]testTx)
priorities := make([]int64, len(tTxs))
for i, tTx := range tTxs {
txMap[mempool.TxKey(tTx.tx)] = tTx
priorities[i] = tTx.priority
}
sort.Slice(priorities, func(i, j int) bool {
// sort by priority, i.e. decreasing order
return priorities[i] > priorities[j]
})
ensurePrioritized := func(reapedTxs types.Txs) {
reapedPriorities := make([]int64, len(reapedTxs))
for i, rTx := range reapedTxs {
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
}
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
}
// reap by gas capacity only
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 50)
// reap by transaction bytes only
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.GreaterOrEqual(t, len(reapedTxs), 16)
// Reap by both transaction bytes and gas, where the size yields 31 reaped
// transactions and the gas limit reaps 25 transactions.
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 25)
}
func TestTxMempool_ReapMaxTxs(t *testing.T) {
txmp := setup(t, 0)
tTxs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
txMap := make(map[[mempool.TxKeySize]byte]testTx)
priorities := make([]int64, len(tTxs))
for i, tTx := range tTxs {
txMap[mempool.TxKey(tTx.tx)] = tTx
priorities[i] = tTx.priority
}
sort.Slice(priorities, func(i, j int) bool {
// sort by priority, i.e. decreasing order
return priorities[i] > priorities[j]
})
ensurePrioritized := func(reapedTxs types.Txs) {
reapedPriorities := make([]int64, len(reapedTxs))
for i, rTx := range reapedTxs {
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
}
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
}
// reap all transactions
reapedTxs := txmp.ReapMaxTxs(-1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs))
// reap a single transaction
reapedTxs = txmp.ReapMaxTxs(1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 1)
// reap half of the transactions
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs)/2)
}
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
txmp := setup(t, 0)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes+1)
_, err := rng.Read(tx)
require.NoError(t, err)
require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
tx = make([]byte, txmp.config.MaxTxBytes-1)
_, err = rng.Read(tx)
require.NoError(t, err)
require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
}
func TestTxMempool_CheckTxSamePeer(t *testing.T) {
txmp := setup(t, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
prefix := make([]byte, 20)
_, err := rng.Read(prefix)
require.NoError(t, err)
tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
}
func TestTxMempool_CheckTxSameSender(t *testing.T) {
txmp := setup(t, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
prefix1 := make([]byte, 20)
_, err := rng.Read(prefix1)
require.NoError(t, err)
prefix2 := make([]byte, 20)
_, err = rng.Read(prefix2)
require.NoError(t, err)
tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
require.NoError(t, txmp.CheckTx(context.Background(), tx1, nil, mempool.TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.Size())
require.NoError(t, txmp.CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.Size())
}
func TestTxMempool_ConcurrentTxs(t *testing.T) {
txmp := setup(t, 100)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
checkTxDone := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
_ = checkTxs(t, txmp, 100, 0)
dur := rng.Intn(1000-500) + 500
time.Sleep(time.Duration(dur) * time.Millisecond)
}
wg.Done()
close(checkTxDone)
}()
wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer wg.Done()
var height int64 = 1
for range ticker.C {
reapedTxs := txmp.ReapMaxTxs(200)
if len(reapedTxs) > 0 {
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
for i := 0; i < len(responses); i++ {
var code uint32
if i%10 == 0 {
code = 100
} else {
code = abci.CodeTypeOK
}
responses[i] = &abci.ResponseDeliverTx{Code: code}
}
txmp.Lock()
require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil))
txmp.Unlock()
height++
} else {
// only return once we know we finished the CheckTx loop
select {
case <-checkTxDone:
return
default:
}
}
}
}()
wg.Wait()
require.Zero(t, txmp.Size())
require.Zero(t, txmp.SizeBytes())
}
func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp := setup(t, 500)
txmp.height = 100
txmp.config.TTLNumBlocks = 10
tTxs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, 100, txmp.heightIndex.Size())
// reap 5 txs at the next height -- no txs should expire
reapedTxs := txmp.ReapMaxTxs(5)
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil))
txmp.Unlock()
require.Equal(t, 95, txmp.Size())
require.Equal(t, 95, txmp.heightIndex.Size())
// check more txs at height 101
_ = checkTxs(t, txmp, 50, 1)
require.Equal(t, 145, txmp.Size())
require.Equal(t, 145, txmp.heightIndex.Size())
// Reap 5 txs at a height that would expire all the transactions from before
// the previous Update (height 100).
//
// NOTE: When we reap txs below, we do not know if we're picking txs from the
// initial CheckTx calls or from the second round of CheckTx calls. Thus, we
// cannot guarantee that all 95 txs are remaining that should be expired and
// removed. However, we do know that that at most 95 txs can be expired and
// removed.
reapedTxs = txmp.ReapMaxTxs(5)
responses = make([]*abci.ResponseDeliverTx, len(reapedTxs))
for i := 0; i < len(responses); i++ {
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
}
txmp.Lock()
require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil))
txmp.Unlock()
require.GreaterOrEqual(t, txmp.Size(), 45)
require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
}
func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
cases := []struct {
name string
err error
}{
{
name: "error",
err: errors.New("test error"),
},
{
name: "no error",
err: nil,
},
}
for _, tc := range cases {
testCase := tc
t.Run(testCase.name, func(t *testing.T) {
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
return testCase.err
}
txmp := setup(t, 0, WithPostCheck(postCheckFn))
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes-1)
_, err := rng.Read(tx)
require.NoError(t, err)
callback := func(res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
require.True(t, ok)
expectedErrString := ""
if testCase.err != nil {
expectedErrString = testCase.err.Error()
}
require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError)
}
require.NoError(t, txmp.CheckTx(context.Background(), tx, callback, mempool.TxInfo{SenderID: 0}))
})
}
}