This commit is contained in:
Aleksandr Bezobchuk
2022-06-14 10:30:57 -04:00
parent d33b440098
commit be56f1dd7c
11 changed files with 41 additions and 57 deletions

View File

@@ -64,8 +64,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
mempool := mempl.NewTxMempool(log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0)
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}

View File

@@ -394,8 +394,8 @@ func newStateWithConfigAndBlockStore(
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
mempool := mempl.NewTxMempool(log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0)
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}

View File

@@ -158,8 +158,8 @@ func TestReactorWithEvidence(t *testing.T) {
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
mempool := mempl.NewTxMempool(log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0)
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}

View File

@@ -15,9 +15,10 @@ type emptyMempool struct{}
var _ mempl.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
return nil
}

View File

@@ -361,10 +361,15 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
return bytes.Equal(pubKey.Address(), addr)
}
func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) {
mempool := mempl.NewCListMempool(
func createMempoolAndMempoolReactor(
config *cfg.Config,
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempl.Metrics,
logger log.Logger,
) (*mempl.Reactor, *mempl.TxMempool) {
mempool := mempl.NewTxMempool(
logger.With("module", "mempool"),
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
@@ -372,13 +377,13 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger)
if config.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
return mempoolReactor, mempool
}
@@ -425,7 +430,7 @@ func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool *mempl.CListMempool,
mempool *mempl.TxMempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
@@ -930,13 +935,6 @@ func (n *Node) OnStart() error {
n.isListening = true
if n.config.Mempool.WalEnabled() {
err = n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
@@ -984,11 +982,6 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
// stop mempool WAL
if n.config.Mempool.WalEnabled() {
n.mempool.CloseWAL()
}
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}

View File

@@ -243,7 +243,8 @@ func TestCreateProposalBlock(t *testing.T) {
// Make Mempool
memplMetrics := mempl.PrometheusMetrics("node_test_1")
mempool := mempl.NewCListMempool(
mempool := mempl.NewTxMempool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
@@ -251,7 +252,6 @@ func TestCreateProposalBlock(t *testing.T) {
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)
// Make EvidencePool
evidenceDB := dbm.NewMemDB()
@@ -335,7 +335,8 @@ func TestMaxProposalBlockSize(t *testing.T) {
// Make Mempool
memplMetrics := mempl.PrometheusMetrics("node_test_2")
mempool := mempl.NewCListMempool(
mempool := mempl.NewTxMempool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
@@ -343,7 +344,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))

View File

@@ -388,7 +388,7 @@ func TestUnconfirmedTxs(t *testing.T) {
assert.Equal(t, 1, res.Count)
assert.Equal(t, 1, res.Total)
assert.Equal(t, mempool.TxsBytes(), res.TotalBytes)
assert.Equal(t, mempool.SizeBytes(), res.TotalBytes)
assert.Exactly(t, types.Txs{tx}, types.Txs(res.Txs))
}
@@ -419,7 +419,7 @@ func TestNumUnconfirmedTxs(t *testing.T) {
assert.Equal(t, mempoolSize, res.Count)
assert.Equal(t, mempoolSize, res.Total)
assert.Equal(t, mempool.TxsBytes(), res.TotalBytes)
assert.Equal(t, mempool.SizeBytes(), res.TotalBytes)
}
mempool.Flush()

View File

@@ -158,7 +158,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi
return &ctypes.ResultUnconfirmedTxs{
Count: len(txs),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.TxsBytes(),
TotalBytes: env.Mempool.SizeBytes(),
Txs: txs}, nil
}
@@ -168,7 +168,7 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err
return &ctypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.TxsBytes()}, nil
TotalBytes: env.Mempool.SizeBytes()}, nil
}
// CheckTx checks the transaction without executing it. The transaction won't

View File

@@ -3,6 +3,7 @@ package checktx
import (
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
)
@@ -21,7 +22,7 @@ func init() {
cfg := config.DefaultMempoolConfig()
cfg.Broadcast = false
mempool = mempl.NewCListMempool(cfg, appConnMem, 0)
mempool = mempl.NewTxMempool(log.NewNopLogger(), cfg, appConnMem, 0)
}
func Fuzz(data []byte) int {

View File

@@ -15,9 +15,10 @@ type emptyMempool struct{}
var _ mempl.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
return nil
}

View File

@@ -378,9 +378,10 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) {
state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.TxMempool) {
mempool := mempl.NewCListMempool(
mempool := mempl.NewTxMempool(
logger.With("module", "mempool"),
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
@@ -388,9 +389,8 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger)
if config.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
@@ -441,7 +441,7 @@ func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool *mempl.CListMempool,
mempool *mempl.TxMempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *consensus.Metrics,
@@ -947,13 +947,6 @@ func (n *Node) OnStart() error {
n.isListening = true
if n.config.Mempool.WalEnabled() {
err = n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
@@ -1001,11 +994,6 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
// stop mempool WAL
if n.config.Mempool.WalEnabled() {
n.mempool.CloseWAL()
}
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}