diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index f5e372ae2..d71f9003e 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -64,8 +64,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0) - mempool.SetLogger(log.TestingLogger().With("module", "mempool")) + mempool := mempl.NewTxMempool(log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0) + if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } diff --git a/consensus/common_test.go b/consensus/common_test.go index 71d2c0783..1c3660e80 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -394,8 +394,8 @@ func newStateWithConfigAndBlockStore( proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0) - mempool.SetLogger(log.TestingLogger().With("module", "mempool")) + mempool := mempl.NewTxMempool(log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0) + if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index f23ec727d..0e2741021 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -158,8 +158,8 @@ func TestReactorWithEvidence(t *testing.T) { proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0) - mempool.SetLogger(log.TestingLogger().With("module", "mempool")) + mempool := mempl.NewTxMempool(log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0) + if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 08974a67e..ed5aa5626 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -15,9 +15,10 @@ type emptyMempool struct{} var _ mempl.Mempool = emptyMempool{} -func (emptyMempool) Lock() {} -func (emptyMempool) Unlock() {} -func (emptyMempool) Size() int { return 0 } +func (emptyMempool) Lock() {} +func (emptyMempool) Unlock() {} +func (emptyMempool) Size() int { return 0 } +func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { return nil } diff --git a/node/node.go b/node/node.go index 18fde0062..52dad73aa 100644 --- a/node/node.go +++ b/node/node.go @@ -361,10 +361,15 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { return bytes.Equal(pubKey.Address(), addr) } -func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, - state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) { - - mempool := mempl.NewCListMempool( +func createMempoolAndMempoolReactor( + config *cfg.Config, + proxyApp proxy.AppConns, + state sm.State, + memplMetrics *mempl.Metrics, + logger log.Logger, +) (*mempl.Reactor, *mempl.TxMempool) { + mempool := mempl.NewTxMempool( + logger.With("module", "mempool"), config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, @@ -372,13 +377,13 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, mempl.WithPreCheck(sm.TxPreCheck(state)), mempl.WithPostCheck(sm.TxPostCheck(state)), ) - mempoolLogger := logger.With("module", "mempool") + mempoolReactor := mempl.NewReactor(config.Mempool, mempool) - mempoolReactor.SetLogger(mempoolLogger) if config.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } + return mempoolReactor, mempool } @@ -425,7 +430,7 @@ func createConsensusReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, - mempool *mempl.CListMempool, + mempool *mempl.TxMempool, evidencePool *evidence.Pool, privValidator types.PrivValidator, csMetrics *cs.Metrics, @@ -930,13 +935,6 @@ func (n *Node) OnStart() error { n.isListening = true - if n.config.Mempool.WalEnabled() { - err = n.mempool.InitWAL() - if err != nil { - return fmt.Errorf("init mempool WAL: %w", err) - } - } - // Start the switch (the P2P server). err = n.sw.Start() if err != nil { @@ -984,11 +982,6 @@ func (n *Node) OnStop() { n.Logger.Error("Error closing switch", "err", err) } - // stop mempool WAL - if n.config.Mempool.WalEnabled() { - n.mempool.CloseWAL() - } - if err := n.transport.Close(); err != nil { n.Logger.Error("Error closing transport", "err", err) } diff --git a/node/node_test.go b/node/node_test.go index 55e555341..3eb707a73 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -243,7 +243,8 @@ func TestCreateProposalBlock(t *testing.T) { // Make Mempool memplMetrics := mempl.PrometheusMetrics("node_test_1") - mempool := mempl.NewCListMempool( + mempool := mempl.NewTxMempool( + logger, config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, @@ -251,7 +252,6 @@ func TestCreateProposalBlock(t *testing.T) { mempl.WithPreCheck(sm.TxPreCheck(state)), mempl.WithPostCheck(sm.TxPostCheck(state)), ) - mempool.SetLogger(logger) // Make EvidencePool evidenceDB := dbm.NewMemDB() @@ -335,7 +335,8 @@ func TestMaxProposalBlockSize(t *testing.T) { // Make Mempool memplMetrics := mempl.PrometheusMetrics("node_test_2") - mempool := mempl.NewCListMempool( + mempool := mempl.NewTxMempool( + logger, config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, @@ -343,7 +344,6 @@ func TestMaxProposalBlockSize(t *testing.T) { mempl.WithPreCheck(sm.TxPreCheck(state)), mempl.WithPostCheck(sm.TxPostCheck(state)), ) - mempool.SetLogger(logger) // fill the mempool with one txs just below the maximum size txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1)) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index fb2d441b3..79102a364 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -388,7 +388,7 @@ func TestUnconfirmedTxs(t *testing.T) { assert.Equal(t, 1, res.Count) assert.Equal(t, 1, res.Total) - assert.Equal(t, mempool.TxsBytes(), res.TotalBytes) + assert.Equal(t, mempool.SizeBytes(), res.TotalBytes) assert.Exactly(t, types.Txs{tx}, types.Txs(res.Txs)) } @@ -419,7 +419,7 @@ func TestNumUnconfirmedTxs(t *testing.T) { assert.Equal(t, mempoolSize, res.Count) assert.Equal(t, mempoolSize, res.Total) - assert.Equal(t, mempool.TxsBytes(), res.TotalBytes) + assert.Equal(t, mempool.SizeBytes(), res.TotalBytes) } mempool.Flush() diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 74212e9d1..f9fa4b37e 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -158,7 +158,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi return &ctypes.ResultUnconfirmedTxs{ Count: len(txs), Total: env.Mempool.Size(), - TotalBytes: env.Mempool.TxsBytes(), + TotalBytes: env.Mempool.SizeBytes(), Txs: txs}, nil } @@ -168,7 +168,7 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err return &ctypes.ResultUnconfirmedTxs{ Count: env.Mempool.Size(), Total: env.Mempool.Size(), - TotalBytes: env.Mempool.TxsBytes()}, nil + TotalBytes: env.Mempool.SizeBytes()}, nil } // CheckTx checks the transaction without executing it. The transaction won't diff --git a/test/fuzz/mempool/checktx.go b/test/fuzz/mempool/checktx.go index 3193b169d..30c93d007 100644 --- a/test/fuzz/mempool/checktx.go +++ b/test/fuzz/mempool/checktx.go @@ -3,6 +3,7 @@ package checktx import ( "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" ) @@ -21,7 +22,7 @@ func init() { cfg := config.DefaultMempoolConfig() cfg.Broadcast = false - mempool = mempl.NewCListMempool(cfg, appConnMem, 0) + mempool = mempl.NewTxMempool(log.NewNopLogger(), cfg, appConnMem, 0) } func Fuzz(data []byte) int { diff --git a/test/maverick/consensus/replay_stubs.go b/test/maverick/consensus/replay_stubs.go index 08974a67e..ed5aa5626 100644 --- a/test/maverick/consensus/replay_stubs.go +++ b/test/maverick/consensus/replay_stubs.go @@ -15,9 +15,10 @@ type emptyMempool struct{} var _ mempl.Mempool = emptyMempool{} -func (emptyMempool) Lock() {} -func (emptyMempool) Unlock() {} -func (emptyMempool) Size() int { return 0 } +func (emptyMempool) Lock() {} +func (emptyMempool) Unlock() {} +func (emptyMempool) Size() int { return 0 } +func (emptyMempool) SizeBytes() int64 { return 0 } func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { return nil } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 7383169dc..33ce51946 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -378,9 +378,10 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { } func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, - state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) { + state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.TxMempool) { - mempool := mempl.NewCListMempool( + mempool := mempl.NewTxMempool( + logger.With("module", "mempool"), config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, @@ -388,9 +389,8 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, mempl.WithPreCheck(sm.TxPreCheck(state)), mempl.WithPostCheck(sm.TxPostCheck(state)), ) - mempoolLogger := logger.With("module", "mempool") + mempoolReactor := mempl.NewReactor(config.Mempool, mempool) - mempoolReactor.SetLogger(mempoolLogger) if config.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() @@ -441,7 +441,7 @@ func createConsensusReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, - mempool *mempl.CListMempool, + mempool *mempl.TxMempool, evidencePool *evidence.Pool, privValidator types.PrivValidator, csMetrics *consensus.Metrics, @@ -947,13 +947,6 @@ func (n *Node) OnStart() error { n.isListening = true - if n.config.Mempool.WalEnabled() { - err = n.mempool.InitWAL() - if err != nil { - return fmt.Errorf("init mempool WAL: %w", err) - } - } - // Start the switch (the P2P server). err = n.sw.Start() if err != nil { @@ -1001,11 +994,6 @@ func (n *Node) OnStop() { n.Logger.Error("Error closing switch", "err", err) } - // stop mempool WAL - if n.config.Mempool.WalEnabled() { - n.mempool.CloseWAL() - } - if err := n.transport.Close(); err != nil { n.Logger.Error("Error closing transport", "err", err) }