feat: v0.34.x Prioritized Mempool (#8695)

* Updated mocks

* add reactor tests

* add v1 reactor tests

* Fix fuzz test for priority mempool

* e2e adapted to mempool v1; prio pool is default now

* Reverted default mempool to be fifo

* Changed buf version

* Added priority mempool to ci testnet

* Fixed linter

* Updated makefile

* Aligned makefile changes to v0.34.x

* Added go install for proto

* Add log message to warn about prioritized mempool bug

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Changelog message

Co-authored-by: Jasmina Malicevic <jasmina.dustinac@gmail.com>
Co-authored-by: Callum Waters <cmwaters19@gmail.com>
Co-authored-by: Sam Kleinman <garen@tychoish.com>
Co-authored-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
Aleksandr Bezobchuk
2022-06-27 05:34:28 -04:00
committed by GitHub
parent 25101d1116
commit 6b7d30cf37
55 changed files with 4433 additions and 1002 deletions

View File

@@ -23,12 +23,15 @@ import (
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/evidence"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/light"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
"github.com/tendermint/tendermint/privval"
@@ -209,7 +212,7 @@ type Node struct {
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for fast-syncing
mempoolReactor *mempl.Reactor // for gossipping transactions
mempoolReactor p2p.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
@@ -361,25 +364,64 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
return bytes.Equal(pubKey.Address(), addr)
}
func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) {
func createMempoolAndMempoolReactor(
config *cfg.Config,
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempl.Metrics,
logger log.Logger,
) (mempl.Mempool, p2p.Reactor) {
mempool := mempl.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger)
switch config.Mempool.Version {
case cfg.MempoolV1:
// TODO(thane): Remove log once https://github.com/tendermint/tendermint/issues/8775 is resolved.
logger.Error("While the prioritized mempool API is stable, there is a critical bug in it that is currently under investigation. See https://github.com/tendermint/tendermint/issues/8775 for details")
mp := mempoolv1.NewTxMempool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
if config.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
reactor := mempoolv1.NewReactor(
config.Mempool,
mp,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
return mp, reactor
case cfg.MempoolV0:
mp := mempoolv0.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)
mp.SetLogger(logger)
mp.SetLogger(logger)
reactor := mempoolv0.NewReactor(
config.Mempool,
mp,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
return mp, reactor
default:
return nil, nil
}
return mempoolReactor, mempool
}
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
@@ -425,7 +467,7 @@ func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool *mempl.CListMempool,
mempool mempl.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
@@ -527,7 +569,7 @@ func createSwitch(config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
peerFilters []p2p.PeerFilterFunc,
mempoolReactor *mempl.Reactor,
mempoolReactor p2p.Reactor,
bcReactor p2p.Reactor,
stateSyncReactor *statesync.Reactor,
consensusReactor *cs.Reactor,
@@ -752,7 +794,7 @@ func NewNode(config *cfg.Config,
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
// Make MempoolReactor
mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
// Make Evidence Reactor
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
@@ -930,13 +972,6 @@ func (n *Node) OnStart() error {
n.isListening = true
if n.config.Mempool.WalEnabled() {
err = n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
@@ -984,11 +1019,6 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
// stop mempool WAL
if n.config.Mempool.WalEnabled() {
n.mempool.CloseWAL()
}
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}
@@ -1224,7 +1254,7 @@ func (n *Node) ConsensusReactor() *cs.Reactor {
}
// MempoolReactor returns the Node's mempool reactor.
func (n *Node) MempoolReactor() *mempl.Reactor {
func (n *Node) MempoolReactor() p2p.Reactor {
return n.mempoolReactor
}

View File

@@ -21,6 +21,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
@@ -242,16 +244,27 @@ func TestCreateProposalBlock(t *testing.T) {
proposerAddr, _ := state.Validators.GetByIndex(0)
// Make Mempool
memplMetrics := mempl.PrometheusMetrics("node_test_1")
mempool := mempl.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)
memplMetrics := mempl.NopMetrics()
var mempool mempl.Mempool
switch config.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1:
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
}
// Make EvidencePool
evidenceDB := dbm.NewMemDB()
@@ -334,16 +347,26 @@ func TestMaxProposalBlockSize(t *testing.T) {
proposerAddr, _ := state.Validators.GetByIndex(0)
// Make Mempool
memplMetrics := mempl.PrometheusMetrics("node_test_2")
mempool := mempl.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)
memplMetrics := mempl.NopMetrics()
var mempool mempl.Mempool
switch config.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1:
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
}
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))