From 9c21d4140bb563e412f1676db28781d1a725731b Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 8 Dec 2021 09:29:13 -0500 Subject: [PATCH] mempool: avoid arbitrary background contexts (#7409) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- internal/consensus/replay_stubs.go | 11 ++++++----- internal/mempool/mempool.go | 10 +++++----- internal/mempool/mempool_test.go | 12 ++++++------ internal/mempool/mock/mempool.go | 11 ++++++----- internal/mempool/reactor_test.go | 4 ++-- internal/mempool/types.go | 3 ++- internal/state/execution.go | 6 ++++-- rpc/client/mocks/client.go | 14 -------------- 8 files changed, 31 insertions(+), 40 deletions(-) diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index 8672f8e1e..649e4387b 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -29,6 +29,7 @@ func (emptyMempool) RemoveTxByKey(txKey types.TxKey) error { return nil } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (emptyMempool) Update( + _ context.Context, _ int64, _ types.Txs, _ []*abci.ResponseDeliverTx, @@ -37,11 +38,11 @@ func (emptyMempool) Update( ) error { return nil } -func (emptyMempool) Flush() {} -func (emptyMempool) FlushAppConn() error { return nil } -func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } -func (emptyMempool) EnableTxsAvailable() {} -func (emptyMempool) SizeBytes() int64 { return 0 } +func (emptyMempool) Flush() {} +func (emptyMempool) FlushAppConn(ctx context.Context) error { return nil } +func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } +func (emptyMempool) EnableTxsAvailable() {} +func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) TxsFront() *clist.CElement { return nil } func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index ec7ef2e15..f5d1c926d 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -175,8 +175,8 @@ func (txmp *TxMempool) SizeBytes() int64 { // FlushAppConn executes FlushSync on the mempool's proxyAppConn. // // NOTE: The caller must obtain a write-lock prior to execution. -func (txmp *TxMempool) FlushAppConn() error { - return txmp.proxyAppConn.FlushSync(context.Background()) +func (txmp *TxMempool) FlushAppConn(ctx context.Context) error { + return txmp.proxyAppConn.FlushSync(ctx) } // WaitForNextTx returns a blocking channel that will be closed when the next @@ -428,6 +428,7 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { // NOTE: // - The caller must explicitly acquire a write-lock. func (txmp *TxMempool) Update( + ctx context.Context, blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, @@ -472,7 +473,7 @@ func (txmp *TxMempool) Update( "num_txs", txmp.Size(), "height", blockHeight, ) - txmp.updateReCheckTxs() + txmp.updateReCheckTxs(ctx) } else { txmp.notifyTxsAvailable() } @@ -713,14 +714,13 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) // // NOTE: // - The caller must have a write-lock when executing updateReCheckTxs. -func (txmp *TxMempool) updateReCheckTxs() { +func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { if txmp.Size() == 0 { panic("attempted to update re-CheckTx txs when mempool is empty") } txmp.recheckCursor = txmp.gossipIndex.Front() txmp.recheckEnd = txmp.gossipIndex.Back() - ctx := context.Background() for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { wtx := e.Value.(*WrappedTx) diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 1613dce98..a0dc658a4 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -179,7 +179,7 @@ func TestTxMempool_TxsAvailable(t *testing.T) { // commit half the transactions and ensure we fire an event txmp.Lock() - require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + require.NoError(t, txmp.Update(ctx, 1, rawTxs[:50], responses, nil, nil)) txmp.Unlock() ensureTxFire() ensureNoTxFire() @@ -210,7 +210,7 @@ func TestTxMempool_Size(t *testing.T) { } txmp.Lock() - require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + require.NoError(t, txmp.Update(ctx, 1, rawTxs[:50], responses, nil, nil)) txmp.Unlock() require.Equal(t, len(rawTxs)/2, txmp.Size()) @@ -237,7 +237,7 @@ func TestTxMempool_Flush(t *testing.T) { } txmp.Lock() - require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + require.NoError(t, txmp.Update(ctx, 1, rawTxs[:50], responses, nil, nil)) txmp.Unlock() txmp.Flush() @@ -460,7 +460,7 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) { } txmp.Lock() - require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil)) + require.NoError(t, txmp.Update(ctx, height, reapedTxs, responses, nil, nil)) txmp.Unlock() height++ @@ -500,7 +500,7 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { } txmp.Lock() - require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil)) + require.NoError(t, txmp.Update(ctx, txmp.height+1, reapedTxs, responses, nil, nil)) txmp.Unlock() require.Equal(t, 95, txmp.Size()) @@ -526,7 +526,7 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { } txmp.Lock() - require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil)) + require.NoError(t, txmp.Update(ctx, txmp.height+10, reapedTxs, responses, nil, nil)) txmp.Unlock() require.GreaterOrEqual(t, txmp.Size(), 45) diff --git a/internal/mempool/mock/mempool.go b/internal/mempool/mock/mempool.go index 8344220a0..2b32a7ce6 100644 --- a/internal/mempool/mock/mempool.go +++ b/internal/mempool/mock/mempool.go @@ -24,6 +24,7 @@ func (Mempool) RemoveTxByKey(txKey types.TxKey) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (Mempool) Update( + _ context.Context, _ int64, _ types.Txs, _ []*abci.ResponseDeliverTx, @@ -32,11 +33,11 @@ func (Mempool) Update( ) error { return nil } -func (Mempool) Flush() {} -func (Mempool) FlushAppConn() error { return nil } -func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } -func (Mempool) EnableTxsAvailable() {} -func (Mempool) SizeBytes() int64 { return 0 } +func (Mempool) Flush() {} +func (Mempool) FlushAppConn(ctx context.Context) error { return nil } +func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } +func (Mempool) EnableTxsAvailable() {} +func (Mempool) SizeBytes() int64 { return 0 } func (Mempool) TxsFront() *clist.CElement { return nil } func (Mempool) TxsWaitChan() <-chan struct{} { return nil } diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 096544910..f75809744 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -249,7 +249,7 @@ func TestReactorConcurrency(t *testing.T) { deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} } - require.NoError(t, mempool.Update(1, convertTex(txs), deliverTxResponses, nil, nil)) + require.NoError(t, mempool.Update(ctx, 1, convertTex(txs), deliverTxResponses, nil, nil)) }() // 1. submit a bunch of txs @@ -263,7 +263,7 @@ func TestReactorConcurrency(t *testing.T) { mempool.Lock() defer mempool.Unlock() - err := mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) + err := mempool.Update(ctx, 1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) require.NoError(t, err) }() diff --git a/internal/mempool/types.go b/internal/mempool/types.go index 6e3955dc3..05d4ba3e3 100644 --- a/internal/mempool/types.go +++ b/internal/mempool/types.go @@ -63,6 +63,7 @@ type Mempool interface { // 1. This should be called *after* block is committed by consensus. // 2. Lock/Unlock must be managed by the caller. Update( + ctx context.Context, blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, @@ -75,7 +76,7 @@ type Mempool interface { // // NOTE: // 1. Lock/Unlock must be managed by caller. - FlushAppConn() error + FlushAppConn(context.Context) error // Flush removes all transactions from the mempool and caches. Flush() diff --git a/internal/state/execution.go b/internal/state/execution.go index 85e96b017..9d385c956 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -202,7 +202,7 @@ func (blockExec *BlockExecutor) ApplyBlock( } // Lock mempool, commit app state, update mempoool. - appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs) + appHash, retainHeight, err := blockExec.Commit(ctx, state, block, abciResponses.DeliverTxs) if err != nil { return state, fmt.Errorf("commit failed for application: %v", err) } @@ -247,6 +247,7 @@ func (blockExec *BlockExecutor) ApplyBlock( // typically reset on Commit and old txs must be replayed against committed // state before new txs are run in the mempool, lest they be invalid. func (blockExec *BlockExecutor) Commit( + ctx context.Context, state State, block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, @@ -256,7 +257,7 @@ func (blockExec *BlockExecutor) Commit( // while mempool is Locked, flush to ensure all async requests have completed // in the ABCI app before Commit. - err := blockExec.mempool.FlushAppConn() + err := blockExec.mempool.FlushAppConn(ctx) if err != nil { blockExec.logger.Error("client error during mempool.FlushAppConn", "err", err) return nil, 0, err @@ -279,6 +280,7 @@ func (blockExec *BlockExecutor) Commit( // Update mempool. err = blockExec.mempool.Update( + ctx, block.Height, block.Txs, deliverTxResponses, diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index 3c3ebd443..e638980a8 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -637,20 +637,6 @@ func (_m *Client) Status(_a0 context.Context) (*coretypes.ResultStatus, error) { return r0, r1 } -// Stop provides a mock function with given fields: -func (_m *Client) Stop() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - // Subscribe provides a mock function with given fields: ctx, subscriber, query, outCapacity func (_m *Client) Subscribe(ctx context.Context, subscriber string, query string, outCapacity ...int) (<-chan coretypes.ResultEvent, error) { _va := make([]interface{}, len(outCapacity))