This commit is contained in:
Aleksandr Bezobchuk
2022-06-14 14:02:47 -04:00
parent 726b346b95
commit e4d3edaf87

View File

@@ -1,7 +1,6 @@
package v0
import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
@@ -35,10 +34,7 @@ import (
type cleanupFunc func()
func newMempoolWithApp(cc abciclient.Creator) (*CListMempool, cleanupFunc, error) {
conf, err := config.ResetTestRoot("mempool_test")
if err != nil {
return nil, func() {}, err
}
conf := config.ResetTestRoot("mempool_test")
mp, cu := newMempoolWithAppAndConfig(cc, conf)
return mp, cu, nil
@@ -86,11 +82,11 @@ func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types.
if err != nil {
t.Error(err)
}
if err := mp.CheckTx(context.Background(), txBytes, nil, txInfo); err != nil {
if err := mp.CheckTx(txBytes, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
// returned.
if types.IsPreCheckError(err) {
if mempool.IsPreCheckError(err) {
continue
}
t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
@@ -200,13 +196,13 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mp.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.NoError(t, err)
err = mp.CheckTx(context.Background(), []byte{0x01}, nil, mempool.TxInfo{})
err = mp.CheckTx([]byte{0x01}, nil, mempool.TxInfo{})
require.NoError(t, err)
}
// 2. Removes valid txs from the mempool
{
err := mp.CheckTx(context.Background(), []byte{0x02}, nil, mempool.TxInfo{})
err := mp.CheckTx([]byte{0x02}, nil, mempool.TxInfo{})
require.NoError(t, err)
err = mp.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.NoError(t, err)
@@ -215,13 +211,13 @@ func TestMempoolUpdate(t *testing.T) {
// 3. Removes invalid transactions from the cache and the mempool (if present)
{
err := mp.CheckTx(context.Background(), []byte{0x03}, nil, mempool.TxInfo{})
err := mp.CheckTx([]byte{0x03}, nil, mempool.TxInfo{})
require.NoError(t, err)
err = mp.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
require.NoError(t, err)
assert.Zero(t, mp.Size())
err = mp.CheckTx(context.Background(), []byte{0x03}, nil, mempool.TxInfo{})
err = mp.CheckTx([]byte{0x03}, nil, mempool.TxInfo{})
require.NoError(t, err)
}
}
@@ -251,7 +247,7 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
reqRes.Response = abci.ToResponseCheckTx(abci.ResponseCheckTx{Code: abci.CodeTypeOK})
mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil)
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err := mp.CheckTx(tx, nil, mempool.TxInfo{})
require.NoError(t, err)
// ensure that the callback that the mempool sets on the ReqRes is run.
@@ -280,7 +276,7 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
cc := proxy.NewLocalClientCreator(app)
wcfg := config.DefaultConfig()
wcfg.Mempool.KeepInvalidTxsInCache = true
mp, cleanup := newMempoolWithAppAndConfig(cc, wcfg)
@@ -294,7 +290,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, 1)
err := mp.CheckTx(context.Background(), b, nil, mempool.TxInfo{})
err := mp.CheckTx(b, nil, mempool.TxInfo{})
require.NoError(t, err)
// simulate new block
@@ -305,11 +301,11 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
require.NoError(t, err)
// a must be added to the cache
err = mp.CheckTx(context.Background(), a, nil, mempool.TxInfo{})
err = mp.CheckTx(a, nil, mempool.TxInfo{})
require.NoError(t, err)
// b must remain in the cache
err = mp.CheckTx(context.Background(), b, nil, mempool.TxInfo{})
err = mp.CheckTx(b, nil, mempool.TxInfo{})
require.NoError(t, err)
}
@@ -321,14 +317,14 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
// remove a from the cache to test (2)
mp.cache.Remove(a)
err := mp.CheckTx(context.Background(), a, nil, mempool.TxInfo{})
err := mp.CheckTx(a, nil, mempool.TxInfo{})
require.NoError(t, err)
}
}
func TestTxsAvailable(t *testing.T) {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
cc := proxy.NewLocalClientCreator(app)
mp, cleanup, err := newMempoolWithApp(cc)
require.NoError(t, err)
defer cleanup()
@@ -373,13 +369,13 @@ func TestTxsAvailable(t *testing.T) {
func TestSerialReap(t *testing.T) {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
cc := proxy.NewLocalClientCreator(app)
mp, cleanup, err := newMempoolWithApp(cc)
require.NoError(t, err)
defer cleanup()
appConnCon, _ := cc()
appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err = appConnCon.Start()
require.Nil(t, err)
@@ -392,7 +388,7 @@ func TestSerialReap(t *testing.T) {
// This will succeed
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := mp.CheckTx(context.Background(), txBytes, nil, mempool.TxInfo{})
err := mp.CheckTx(txBytes, nil, mempool.TxInfo{})
_, cached := cacheMap[string(txBytes)]
if cached {
require.NotNil(t, err, "expected error for cached tx")
@@ -402,7 +398,7 @@ func TestSerialReap(t *testing.T) {
cacheMap[string(txBytes)] = struct{}{}
// Duplicates are cached and should return error
err = mp.CheckTx(context.Background(), txBytes, nil, mempool.TxInfo{})
err = mp.CheckTx(txBytes, nil, mempool.TxInfo{})
require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
}
}
@@ -425,12 +421,11 @@ func TestSerialReap(t *testing.T) {
}
commitRange := func(start, end int) {
ctx := context.Background()
// Deliver some txs.
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
res, err := appConnCon.DeliverTxSync(ctx, abci.RequestDeliverTx{Tx: txBytes})
res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes})
if err != nil {
t.Errorf("client error committing tx: %v", err)
}
@@ -439,7 +434,7 @@ func TestSerialReap(t *testing.T) {
res.Code, res.Data, res.Log)
}
}
res, err := appConnCon.CommitSync(ctx)
res, err := appConnCon.CommitSync()
if err != nil {
t.Errorf("client error committing: %v", err)
}
@@ -485,7 +480,8 @@ func TestSerialReap(t *testing.T) {
func TestMempool_CheckTxChecksTxSize(t *testing.T) {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
cc := proxy.NewLocalClientCreator(app)
mempl, cleanup, err := newMempoolWithApp(cc)
require.NoError(t, err)
defer cleanup()
@@ -512,7 +508,7 @@ func TestMempool_CheckTxChecksTxSize(t *testing.T) {
tx := tmrand.Bytes(testCase.len)
err := mempl.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err := mempl.CheckTx(tx, nil, mempool.TxInfo{})
bv := gogotypes.BytesValue{Value: tx}
bz, err2 := bv.Marshal()
require.NoError(t, err2)
@@ -521,7 +517,7 @@ func TestMempool_CheckTxChecksTxSize(t *testing.T) {
if !testCase.err {
require.NoError(t, err, caseString)
} else {
require.Equal(t, err, types.ErrTxTooLarge{
require.Equal(t, err, mempool.ErrTxTooLarge{
Max: maxTxSize,
Actual: testCase.len,
}, caseString)
@@ -531,9 +527,9 @@ func TestMempool_CheckTxChecksTxSize(t *testing.T) {
func TestMempoolTxsBytes(t *testing.T) {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
cfg, err := config.ResetTestRoot("mempool_test")
require.NoError(t, err)
cc := proxy.NewLocalClientCreator(app)
cfg := config.ResetTestRoot("mempool_test")
cfg.Mempool.MaxTxsBytes = 10
mp, cleanup := newMempoolWithAppAndConfig(cc, cfg)
@@ -543,7 +539,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mp.SizeBytes())
// 2. len(tx) after CheckTx
err = mp.CheckTx(context.Background(), []byte{0x01}, nil, mempool.TxInfo{})
err := mp.CheckTx([]byte{0x01}, nil, mempool.TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 1, mp.SizeBytes())
@@ -553,7 +549,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mp.SizeBytes())
// 4. zero after Flush
err = mp.CheckTx(context.Background(), []byte{0x02, 0x03}, nil, mempool.TxInfo{})
err = mp.CheckTx([]byte{0x02, 0x03}, nil, mempool.TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 2, mp.SizeBytes())
@@ -562,21 +558,21 @@ func TestMempoolTxsBytes(t *testing.T) {
// 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached.
err = mp.CheckTx(
context.Background(),
[]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04},
nil,
mempool.TxInfo{},
)
require.NoError(t, err)
err = mp.CheckTx(context.Background(), []byte{0x05}, nil, mempool.TxInfo{})
err = mp.CheckTx([]byte{0x05}, nil, mempool.TxInfo{})
if assert.Error(t, err) {
assert.IsType(t, types.ErrMempoolIsFull{}, err)
assert.IsType(t, mempool.ErrMempoolIsFull{}, err)
}
// 6. zero after tx is rechecked and removed due to not being valid anymore
app2 := kvstore.NewApplication()
cc = abciclient.NewLocalCreator(app2)
cc = proxy.NewLocalClientCreator(app2)
mp, cleanup, err = newMempoolWithApp(cc)
require.NoError(t, err)
defer cleanup()
@@ -584,11 +580,11 @@ func TestMempoolTxsBytes(t *testing.T) {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(0))
err = mp.CheckTx(context.Background(), txBytes, nil, mempool.TxInfo{})
err = mp.CheckTx(txBytes, nil, mempool.TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 8, mp.SizeBytes())
appConnCon, _ := cc()
appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err = appConnCon.Start()
require.Nil(t, err)
@@ -597,11 +593,12 @@ func TestMempoolTxsBytes(t *testing.T) {
t.Error(err)
}
})
ctx := context.Background()
res, err := appConnCon.DeliverTxSync(ctx, abci.RequestDeliverTx{Tx: txBytes})
res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes})
require.NoError(t, err)
require.EqualValues(t, 0, res.Code)
res2, err := appConnCon.CommitSync(ctx)
res2, err := appConnCon.CommitSync()
require.NoError(t, err)
require.NotEmpty(t, res2.Data)
@@ -611,7 +608,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 8, mp.SizeBytes())
// 7. Test RemoveTxByKey function
err = mp.CheckTx(context.Background(), []byte{0x06}, nil, mempool.TxInfo{})
err = mp.CheckTx([]byte{0x06}, nil, mempool.TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 9, mp.SizeBytes())
assert.Error(t, mp.RemoveTxByKey(types.Tx([]byte{0x07}).Key()))
@@ -659,8 +656,8 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) {
// this will err with ErrTxInCache many times ...
mp.CheckTx(tx, nil, mempool.TxInfo{SenderID: uint16(peerID)}) //nolint: errcheck // will error
}
err = mp.FlushAppConn()
require.NoError(t, err)
require.NoError(t, mp.FlushAppConn())
}
// caller must close server