mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 05:46:32 +00:00
This is a manual forward-port of #8944 and related fixes from v0.35.x. One difference of note is that the CheckTx response messages no longer have a field to record an error from the ABCI application. The code is set up so that these could be reported directly to the CheckTx caller, but it would be an API change, and right now a bunch of test plumbing depends on the existing semantics. Also fix up tests relying on implementation-specific mempool behavior. - Commit was setting the expected mempool size incorrectly. - Fix sequence test not to depend on the incorrect size.
359 lines
12 KiB
Go
359 lines
12 KiB
Go
package consensus
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
dbm "github.com/tendermint/tm-db"
|
|
|
|
"github.com/tendermint/tendermint/abci/example/code"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/internal/mempool"
|
|
sm "github.com/tendermint/tendermint/internal/state"
|
|
"github.com/tendermint/tendermint/internal/store"
|
|
"github.com/tendermint/tendermint/internal/test/factory"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
// for testing
|
|
func assertMempool(t *testing.T, txn txNotifier) mempool.Mempool {
|
|
t.Helper()
|
|
mp, ok := txn.(mempool.Mempool)
|
|
require.True(t, ok)
|
|
return mp
|
|
}
|
|
|
|
func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
baseConfig := configSetup(t)
|
|
|
|
config, err := ResetConfig(t.TempDir(), "consensus_mempool_txs_available_test")
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
|
|
|
|
config.Consensus.CreateEmptyBlocks = false
|
|
state, privVals := makeGenesisState(ctx, t, baseConfig, genesisStateArgs{
|
|
Validators: 1,
|
|
Power: 10,
|
|
Params: factory.ConsensusParams()})
|
|
cs := newStateWithConfig(ctx, t, log.NewNopLogger(), config, state, privVals[0], NewCounterApplication())
|
|
assertMempool(t, cs.txNotifier).EnableTxsAvailable()
|
|
height, round := cs.Height, cs.Round
|
|
newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock)
|
|
startTestRound(ctx, cs, height, round)
|
|
|
|
ensureNewEventOnChannel(t, newBlockCh) // first block gets committed
|
|
ensureNoNewEventOnChannel(t, newBlockCh)
|
|
checkTxsRange(ctx, t, cs, 0, 1)
|
|
ensureNewEventOnChannel(t, newBlockCh) // commit txs
|
|
ensureNewEventOnChannel(t, newBlockCh) // commit updated app hash
|
|
ensureNoNewEventOnChannel(t, newBlockCh)
|
|
}
|
|
|
|
func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
|
|
baseConfig := configSetup(t)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
config, err := ResetConfig(t.TempDir(), "consensus_mempool_txs_available_test")
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
|
|
|
|
config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
|
|
state, privVals := makeGenesisState(ctx, t, baseConfig, genesisStateArgs{
|
|
Validators: 1,
|
|
Power: 10,
|
|
Params: factory.ConsensusParams()})
|
|
cs := newStateWithConfig(ctx, t, log.NewNopLogger(), config, state, privVals[0], NewCounterApplication())
|
|
|
|
assertMempool(t, cs.txNotifier).EnableTxsAvailable()
|
|
|
|
newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock)
|
|
startTestRound(ctx, cs, cs.Height, cs.Round)
|
|
|
|
ensureNewEventOnChannel(t, newBlockCh) // first block gets committed
|
|
ensureNoNewEventOnChannel(t, newBlockCh) // then we dont make a block ...
|
|
ensureNewEventOnChannel(t, newBlockCh) // until the CreateEmptyBlocksInterval has passed
|
|
}
|
|
|
|
func TestMempoolProgressInHigherRound(t *testing.T) {
|
|
baseConfig := configSetup(t)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
config, err := ResetConfig(t.TempDir(), "consensus_mempool_txs_available_test")
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
|
|
|
|
config.Consensus.CreateEmptyBlocks = false
|
|
state, privVals := makeGenesisState(ctx, t, baseConfig, genesisStateArgs{
|
|
Validators: 1,
|
|
Power: 10,
|
|
Params: factory.ConsensusParams()})
|
|
cs := newStateWithConfig(ctx, t, log.NewNopLogger(), config, state, privVals[0], NewCounterApplication())
|
|
assertMempool(t, cs.txNotifier).EnableTxsAvailable()
|
|
height, round := cs.Height, cs.Round
|
|
newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock)
|
|
newRoundCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewRound)
|
|
timeoutCh := subscribe(ctx, t, cs.eventBus, types.EventQueryTimeoutPropose)
|
|
cs.setProposal = func(proposal *types.Proposal, recvTime time.Time) error {
|
|
if cs.Height == 2 && cs.Round == 0 {
|
|
// dont set the proposal in round 0 so we timeout and
|
|
// go to next round
|
|
return nil
|
|
}
|
|
return cs.defaultSetProposal(proposal, recvTime)
|
|
}
|
|
startTestRound(ctx, cs, height, round)
|
|
|
|
ensureNewRound(t, newRoundCh, height, round) // first round at first height
|
|
ensureNewEventOnChannel(t, newBlockCh) // first block gets committed
|
|
|
|
height++ // moving to the next height
|
|
round = 0
|
|
|
|
ensureNewRound(t, newRoundCh, height, round) // first round at next height
|
|
checkTxsRange(ctx, t, cs, 0, 1) // we deliver txs, but don't set a proposal so we get the next round
|
|
ensureNewTimeout(t, timeoutCh, height, round, cs.state.ConsensusParams.Timeout.ProposeTimeout(round).Nanoseconds())
|
|
|
|
round++ // moving to the next round
|
|
ensureNewRound(t, newRoundCh, height, round) // wait for the next round
|
|
ensureNewEventOnChannel(t, newBlockCh) // now we can commit the block
|
|
}
|
|
|
|
func checkTxsRange(ctx context.Context, t *testing.T, cs *State, start, end int) {
|
|
t.Helper()
|
|
// Deliver some txs.
|
|
for i := start; i < end; i++ {
|
|
txBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
|
var rCode uint32
|
|
err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) { rCode = r.Code }, mempool.TxInfo{})
|
|
require.NoError(t, err, "error after checkTx")
|
|
require.Equal(t, code.CodeTypeOK, rCode, "checkTx code is error, txBytes %X", txBytes)
|
|
}
|
|
}
|
|
|
|
func TestMempoolTxConcurrentWithCommit(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
config := configSetup(t)
|
|
logger := log.NewNopLogger()
|
|
state, privVals := makeGenesisState(ctx, t, config, genesisStateArgs{
|
|
Validators: 1,
|
|
Power: 10,
|
|
Params: factory.ConsensusParams(),
|
|
})
|
|
stateStore := sm.NewStore(dbm.NewMemDB())
|
|
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
|
|
|
cs := newStateWithConfigAndBlockStore(
|
|
ctx,
|
|
t,
|
|
logger, config, state, privVals[0], NewCounterApplication(), blockStore)
|
|
|
|
err := stateStore.Save(state)
|
|
require.NoError(t, err)
|
|
newBlockHeaderCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlockHeader)
|
|
|
|
const numTxs int64 = 3000
|
|
go checkTxsRange(ctx, t, cs, 0, int(numTxs))
|
|
|
|
startTestRound(ctx, cs, cs.Height, cs.Round)
|
|
for n := int64(0); n < numTxs; {
|
|
select {
|
|
case msg := <-newBlockHeaderCh:
|
|
headerEvent := msg.Data().(types.EventDataNewBlockHeader)
|
|
n += headerEvent.NumTxs
|
|
logger.Info("new transactions", "nTxs", headerEvent.NumTxs, "total", n)
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatal("Timed out waiting 30s to commit blocks with transactions")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMempoolRmBadTx(t *testing.T) {
|
|
config := configSetup(t)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
state, privVals := makeGenesisState(ctx, t, config, genesisStateArgs{
|
|
Validators: 1,
|
|
Power: 10,
|
|
Params: factory.ConsensusParams()})
|
|
app := NewCounterApplication()
|
|
stateStore := sm.NewStore(dbm.NewMemDB())
|
|
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
|
cs := newStateWithConfigAndBlockStore(ctx, t, log.NewNopLogger(), config, state, privVals[0], app, blockStore)
|
|
err := stateStore.Save(state)
|
|
require.NoError(t, err)
|
|
|
|
// increment the counter by 1
|
|
txBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(txBytes, uint64(0))
|
|
|
|
resFinalize, err := app.FinalizeBlock(ctx, &abci.RequestFinalizeBlock{Txs: [][]byte{txBytes}})
|
|
require.NoError(t, err)
|
|
assert.False(t, resFinalize.TxResults[0].IsErr(), fmt.Sprintf("expected no error. got %v", resFinalize))
|
|
assert.True(t, len(resFinalize.AppHash) > 0)
|
|
|
|
_, err = app.Commit(ctx)
|
|
require.NoError(t, err)
|
|
|
|
emptyMempoolCh := make(chan struct{})
|
|
checkTxRespCh := make(chan struct{})
|
|
go func() {
|
|
// Try to send an out-of-sequence transaction through the mempool.
|
|
// CheckTx should not err, but the app should return a bad abci code
|
|
// and the tx should get removed from the pool
|
|
binary.BigEndian.PutUint64(txBytes, uint64(5))
|
|
err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) {
|
|
if r.Code != code.CodeTypeBadNonce {
|
|
t.Errorf("expected checktx to return bad nonce, got %#v", r)
|
|
return
|
|
}
|
|
checkTxRespCh <- struct{}{}
|
|
}, mempool.TxInfo{})
|
|
if err != nil {
|
|
t.Errorf("error after CheckTx: %v", err)
|
|
return
|
|
}
|
|
|
|
// check for the tx
|
|
for {
|
|
txs := assertMempool(t, cs.txNotifier).ReapMaxBytesMaxGas(int64(len(txBytes)), -1)
|
|
if len(txs) == 0 {
|
|
emptyMempoolCh <- struct{}{}
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
}()
|
|
|
|
// Wait until the tx returns
|
|
ticker := time.After(time.Second * 5)
|
|
select {
|
|
case <-checkTxRespCh:
|
|
// success
|
|
case <-ticker:
|
|
t.Errorf("timed out waiting for tx to return")
|
|
return
|
|
}
|
|
|
|
// Wait until the tx is removed
|
|
ticker = time.After(time.Second * 5)
|
|
select {
|
|
case <-emptyMempoolCh:
|
|
// success
|
|
case <-ticker:
|
|
t.Errorf("timed out waiting for tx to be removed")
|
|
return
|
|
}
|
|
}
|
|
|
|
// CounterApplication that maintains a mempool state and resets it upon commit
|
|
type CounterApplication struct {
|
|
abci.BaseApplication
|
|
|
|
txCount int
|
|
mempoolTxCount int
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func NewCounterApplication() *CounterApplication {
|
|
return &CounterApplication{}
|
|
}
|
|
|
|
func (app *CounterApplication) Info(_ context.Context, req *abci.RequestInfo) (*abci.ResponseInfo, error) {
|
|
app.mu.Lock()
|
|
defer app.mu.Unlock()
|
|
|
|
return &abci.ResponseInfo{Data: fmt.Sprintf("txs:%v", app.txCount)}, nil
|
|
}
|
|
|
|
func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
|
|
app.mu.Lock()
|
|
defer app.mu.Unlock()
|
|
|
|
respTxs := make([]*abci.ExecTxResult, len(req.Txs))
|
|
for i, tx := range req.Txs {
|
|
txValue := txAsUint64(tx)
|
|
if txValue != uint64(app.txCount) {
|
|
respTxs[i] = &abci.ExecTxResult{
|
|
Code: code.CodeTypeBadNonce,
|
|
Log: fmt.Sprintf("Invalid nonce. Expected %d, got %d", app.txCount, txValue),
|
|
}
|
|
continue
|
|
}
|
|
app.txCount++
|
|
respTxs[i] = &abci.ExecTxResult{Code: code.CodeTypeOK}
|
|
}
|
|
|
|
res := &abci.ResponseFinalizeBlock{TxResults: respTxs}
|
|
|
|
if app.txCount > 0 {
|
|
res.AppHash = make([]byte, 8)
|
|
binary.BigEndian.PutUint64(res.AppHash, uint64(app.txCount))
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
|
|
app.mu.Lock()
|
|
defer app.mu.Unlock()
|
|
if req.Type == abci.CheckTxType_New {
|
|
txValue := txAsUint64(req.Tx)
|
|
if txValue != uint64(app.mempoolTxCount) {
|
|
return &abci.ResponseCheckTx{
|
|
Code: code.CodeTypeBadNonce,
|
|
}, nil
|
|
}
|
|
app.mempoolTxCount++
|
|
}
|
|
return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil
|
|
}
|
|
|
|
func txAsUint64(tx []byte) uint64 {
|
|
tx8 := make([]byte, 8)
|
|
copy(tx8[len(tx8)-len(tx):], tx)
|
|
return binary.BigEndian.Uint64(tx8)
|
|
}
|
|
|
|
func (app *CounterApplication) Commit(context.Context) (*abci.ResponseCommit, error) {
|
|
app.mu.Lock()
|
|
defer app.mu.Unlock()
|
|
return &abci.ResponseCommit{}, nil
|
|
}
|
|
|
|
func (app *CounterApplication) PrepareProposal(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
|
|
trs := make([]*abci.TxRecord, 0, len(req.Txs))
|
|
var totalBytes int64
|
|
for _, tx := range req.Txs {
|
|
totalBytes += int64(len(tx))
|
|
if totalBytes > req.MaxTxBytes {
|
|
break
|
|
}
|
|
trs = append(trs, &abci.TxRecord{
|
|
Action: abci.TxRecord_UNMODIFIED,
|
|
Tx: tx,
|
|
})
|
|
}
|
|
return &abci.ResponsePrepareProposal{TxRecords: trs}, nil
|
|
}
|
|
|
|
func (app *CounterApplication) ProcessProposal(_ context.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
|
|
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil
|
|
}
|