mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-19 19:22:52 +00:00
Compare commits
5 Commits
master
...
wb/remove-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8750c7c03b | ||
|
|
1ee3b70698 | ||
|
|
543929a715 | ||
|
|
1af9e4f618 | ||
|
|
3b197a79c2 |
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -450,12 +449,35 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
||||
case *abci.Response_CheckTx:
|
||||
tx := req.GetCheckTx().Tx
|
||||
memTx := mem.recheckCursor.Value.(*mempoolTx)
|
||||
if !bytes.Equal(tx, memTx.tx) {
|
||||
panic(fmt.Sprintf(
|
||||
"Unexpected tx response from proxy during recheck\nExpected %X, got %X",
|
||||
memTx.tx,
|
||||
tx))
|
||||
|
||||
// Search through the remaining list of tx to recheck for a transaction that matches
|
||||
// the one we received from the ABCI application.
|
||||
for {
|
||||
if bytes.Equal(tx, memTx.tx) {
|
||||
// We've found a tx in the recheck list that matches the tx that we
|
||||
// received from the ABCI application.
|
||||
// Break, and use this transaction for further checks.
|
||||
break
|
||||
}
|
||||
|
||||
mem.logger.Error(
|
||||
"re-CheckTx transaction mismatch",
|
||||
"got", types.Tx(tx),
|
||||
"expected", memTx.tx,
|
||||
)
|
||||
|
||||
if mem.recheckCursor == mem.recheckEnd {
|
||||
// we reached the end of the recheckTx list without finding a tx
|
||||
// matching the one we received from the ABCI application.
|
||||
// Return without processing any tx.
|
||||
mem.recheckCursor = nil
|
||||
return
|
||||
}
|
||||
|
||||
mem.recheckCursor = mem.recheckCursor.Next()
|
||||
memTx = mem.recheckCursor.Value.(*mempoolTx)
|
||||
}
|
||||
|
||||
var postCheckErr error
|
||||
if mem.postCheck != nil {
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
|
||||
@@ -13,9 +13,11 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
gogotypes "github.com/gogo/protobuf/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abciclimocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
abciserver "github.com/tendermint/tendermint/abci/server"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
@@ -214,6 +216,57 @@ func TestMempoolUpdate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
|
||||
var callback abciclient.Callback
|
||||
mockClient := new(abciclimocks.Client)
|
||||
mockClient.On("Start").Return(nil)
|
||||
mockClient.On("SetLogger", mock.Anything)
|
||||
|
||||
mockClient.On("Error").Return(nil).Times(4)
|
||||
mockClient.On("FlushAsync", mock.Anything).Return(abciclient.NewReqRes(abci.ToRequestFlush()), nil)
|
||||
mockClient.On("SetResponseCallback", mock.MatchedBy(func(cb abciclient.Callback) bool { callback = cb; return true }))
|
||||
|
||||
cc := func() (abciclient.Client, error) {
|
||||
return mockClient, nil
|
||||
}
|
||||
|
||||
mp, cleanup := newMempoolWithApp(cc)
|
||||
defer cleanup()
|
||||
|
||||
// Add 4 transactions to the mempool by calling the mempool's `CheckTx` on each of them.
|
||||
txs := []types.Tx{[]byte{0x01}, []byte{0x02}, []byte{0x03}, []byte{0x04}}
|
||||
for _, tx := range txs {
|
||||
reqRes := abciclient.NewReqRes(abci.ToRequestCheckTx(abci.RequestCheckTx{Tx: tx}))
|
||||
reqRes.Response = abci.ToResponseCheckTx(abci.ResponseCheckTx{Code: abci.CodeTypeOK})
|
||||
// SetDone allows the ReqRes to process its callback synchronously.
|
||||
// This simulates the Response being ready for the client immediately.
|
||||
reqRes.SetDone()
|
||||
|
||||
mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil)
|
||||
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Calling update to remove the first transaction from the mempool.
|
||||
// This call also triggers the mempool to recheck its remaining transactions.
|
||||
err := mp.Update(0, []types.Tx{txs[0]}, abciResponses(1, abci.CodeTypeOK), nil, nil)
|
||||
require.Nil(t, err)
|
||||
|
||||
// The mempool has now sent its requests off to the client to be rechecked
|
||||
// and is waiting for the corresponding callbacks to be called.
|
||||
// We now call the mempool-supplied callback on the first and third transaction.
|
||||
// This simulates the client dropping the second request.
|
||||
// Previous versions of this code panicked when the ABCI application missed
|
||||
// a recheck-tx request.
|
||||
resp := abci.ResponseCheckTx{Code: abci.CodeTypeOK}
|
||||
req := abci.RequestCheckTx{Tx: txs[1]}
|
||||
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
|
||||
|
||||
req = abci.RequestCheckTx{Tx: txs[3]}
|
||||
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
|
||||
mockClient.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
|
||||
app := kvstore.NewApplication()
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -639,58 +640,88 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
|
||||
txmp.metrics.RecheckTimes.Add(1)
|
||||
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if ok {
|
||||
tx := req.GetCheckTx().Tx
|
||||
wtx := txmp.recheckCursor.Value.(*WrappedTx)
|
||||
if !bytes.Equal(tx, wtx.tx) {
|
||||
panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), types.Tx(tx).Key()))
|
||||
}
|
||||
|
||||
// Only evaluate transactions that have not been removed. This can happen
|
||||
// if an existing transaction is evicted during CheckTx and while this
|
||||
// callback is being executed for the same evicted transaction.
|
||||
if !txmp.txStore.IsTxRemoved(wtx.hash) {
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(tx, checkTxRes.CheckTx)
|
||||
}
|
||||
|
||||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.priority = checkTxRes.CheckTx.Priority
|
||||
} else {
|
||||
txmp.logger.Debug(
|
||||
"existing transaction no longer valid; failed re-CheckTx callback",
|
||||
"priority", wtx.priority,
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
)
|
||||
|
||||
if wtx.gossipEl != txmp.recheckCursor {
|
||||
panic("corrupted reCheckTx cursor")
|
||||
}
|
||||
|
||||
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
|
||||
}
|
||||
}
|
||||
|
||||
// move reCheckTx cursor to next element
|
||||
if txmp.recheckCursor == txmp.recheckEnd {
|
||||
txmp.recheckCursor = nil
|
||||
} else {
|
||||
txmp.recheckCursor = txmp.recheckCursor.Next()
|
||||
}
|
||||
|
||||
if txmp.recheckCursor == nil {
|
||||
txmp.logger.Debug("finished rechecking transactions")
|
||||
|
||||
if txmp.Size() > 0 {
|
||||
txmp.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
|
||||
txmp.metrics.Size.Set(float64(txmp.Size()))
|
||||
if !ok {
|
||||
txmp.logger.Error("received incorrect type in mempool callback",
|
||||
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
|
||||
"got", reflect.TypeOf(res.Value).Name(),
|
||||
)
|
||||
return
|
||||
}
|
||||
tx := req.GetCheckTx().Tx
|
||||
wtx := txmp.recheckCursor.Value.(*WrappedTx)
|
||||
|
||||
// Search through the remaining list of tx to recheck for a transaction that matches
|
||||
// the one we received from the ABCI application.
|
||||
for {
|
||||
if bytes.Equal(tx, wtx.tx) {
|
||||
// We've found a tx in the recheck list that matches the tx that we
|
||||
// received from the ABCI application.
|
||||
// Break, and use this transaction for further checks.
|
||||
break
|
||||
}
|
||||
|
||||
txmp.logger.Error(
|
||||
"re-CheckTx transaction mismatch",
|
||||
"got", wtx.tx.Hash(),
|
||||
"expected", types.Tx(tx).Key(),
|
||||
)
|
||||
|
||||
if txmp.recheckCursor == txmp.recheckEnd {
|
||||
// we reached the end of the recheckTx list without finding a tx
|
||||
// matching the one we received from the ABCI application.
|
||||
// Return without processing any tx.
|
||||
txmp.recheckCursor = nil
|
||||
return
|
||||
}
|
||||
|
||||
txmp.recheckCursor = txmp.recheckCursor.Next()
|
||||
wtx = txmp.recheckCursor.Value.(*WrappedTx)
|
||||
}
|
||||
|
||||
// Only evaluate transactions that have not been removed. This can happen
|
||||
// if an existing transaction is evicted during CheckTx and while this
|
||||
// callback is being executed for the same evicted transaction.
|
||||
if !txmp.txStore.IsTxRemoved(wtx.hash) {
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(tx, checkTxRes.CheckTx)
|
||||
}
|
||||
|
||||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.priority = checkTxRes.CheckTx.Priority
|
||||
} else {
|
||||
txmp.logger.Debug(
|
||||
"existing transaction no longer valid; failed re-CheckTx callback",
|
||||
"priority", wtx.priority,
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
)
|
||||
|
||||
if wtx.gossipEl != txmp.recheckCursor {
|
||||
panic("corrupted reCheckTx cursor")
|
||||
}
|
||||
|
||||
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
|
||||
}
|
||||
}
|
||||
|
||||
// move reCheckTx cursor to next element
|
||||
if txmp.recheckCursor == txmp.recheckEnd {
|
||||
txmp.recheckCursor = nil
|
||||
} else {
|
||||
txmp.recheckCursor = txmp.recheckCursor.Next()
|
||||
}
|
||||
|
||||
if txmp.recheckCursor == nil {
|
||||
txmp.logger.Debug("finished rechecking transactions")
|
||||
|
||||
if txmp.Size() > 0 {
|
||||
txmp.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
|
||||
txmp.metrics.Size.Set(float64(txmp.Size()))
|
||||
}
|
||||
|
||||
// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
|
||||
|
||||
Reference in New Issue
Block a user