diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index f4d69b8b0..cf1a10623 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -70,6 +70,8 @@ type Reactor struct { // immutable initialState sm.State + // store + stateStore sm.Store blockExec *sm.BlockExecutor store *store.BlockStore @@ -101,7 +103,7 @@ type Reactor struct { func NewReactor( ctx context.Context, logger log.Logger, - state sm.State, + stateStore sm.Store, blockExec *sm.BlockExecutor, store *store.BlockStore, consReactor consensusReactor, @@ -111,19 +113,6 @@ func NewReactor( metrics *consensus.Metrics, eventBus *eventbus.EventBus, ) (*Reactor, error) { - - if state.LastBlockHeight != store.Height() { - return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) - } - - startHeight := store.Height() + 1 - if startHeight == 1 { - startHeight = state.InitialHeight - } - - requestsCh := make(chan BlockRequest, maxTotalRequesters) - errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. - blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor()) if err != nil { return nil, err @@ -131,20 +120,16 @@ func NewReactor( r := &Reactor{ logger: logger, - initialState: state, + stateStore: stateStore, blockExec: blockExec, store: store, - pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh), consReactor: consReactor, blockSync: newAtomicBool(blockSync), - requestsCh: requestsCh, - errorsCh: errorsCh, blockSyncCh: blockSyncCh, blockSyncOutBridgeCh: make(chan p2p.Envelope), peerUpdates: peerUpdates, metrics: metrics, eventBus: eventBus, - syncStartTime: time.Time{}, } r.BaseService = *service.NewBaseService(logger, "BlockSync", r) @@ -159,6 +144,27 @@ func NewReactor( // If blockSync is enabled, we also start the pool and the pool processing // goroutine. If the pool fails to start, an error is returned. func (r *Reactor) OnStart(ctx context.Context) error { + state, err := r.stateStore.Load() + if err != nil { + return err + } + r.initialState = state + + if state.LastBlockHeight != r.store.Height() { + return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height()) + } + + startHeight := r.store.Height() + 1 + if startHeight == 1 { + startHeight = state.InitialHeight + } + + requestsCh := make(chan BlockRequest, maxTotalRequesters) + errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. + r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh) + r.requestsCh = requestsCh + r.errorsCh = errorsCh + if r.blockSync.IsSet() { if err := r.pool.Start(ctx); err != nil { return err diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 68656fbc3..00ab14a86 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -176,7 +176,7 @@ func (rts *reactorTestSuite) addNode( rts.reactors[nodeID], err = NewReactor( ctx, rts.logger.With("nodeID", nodeID), - state.Copy(), + stateStore, blockExec, blockStore, nil, diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 33e1dbf63..f0df502f9 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -82,7 +82,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, - 0, ) if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() @@ -95,7 +94,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) + cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool) + require.NoError(t, err) // set private validator pv := privVals[i] cs.SetPrivValidator(ctx, pv) @@ -105,14 +105,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { require.NoError(t, err) cs.SetEventBus(eventBus) evpool.SetEventBus(eventBus) - cs.SetTimeoutTicker(tickerFunc()) states[i] = cs }() } - rts := setup(ctx, t, nValidators, states, 100) // buffer must be large enough to not deadlock + rts := setup(ctx, t, nValidators, states, 512) // buffer must be large enough to not deadlock var bzNodeID types.NodeID @@ -238,8 +237,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } for _, reactor := range rts.reactors { - state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false) } // Evidence should be submitted and committed at the third height but @@ -248,20 +246,26 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { var wg sync.WaitGroup i := 0 + subctx, subcancel := context.WithCancel(ctx) + defer subcancel() for _, sub := range rts.subs { wg.Add(1) go func(j int, s eventbus.Subscription) { defer wg.Done() for { - if ctx.Err() != nil { + if subctx.Err() != nil { + return + } + + msg, err := s.Next(subctx) + if subctx.Err() != nil { return } - msg, err := s.Next(ctx) - assert.NoError(t, err) if err != nil { - cancel() + t.Errorf("waiting for subscription: %v", err) + subcancel() return } @@ -273,12 +277,18 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } } }(i, sub) - i++ } wg.Wait() + // don't run more assertions if we've encountered a timeout + select { + case <-subctx.Done(): + t.Fatal("encountered timeout") + default: + } + pubkey, err := bzNodeState.privValidator.GetPubKey(ctx) require.NoError(t, err) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index bd381ff7e..4a6b96b2d 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -469,7 +469,6 @@ func newStateWithConfigAndBlockStore( logger.With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, - 0, ) if thisConfig.Consensus.WaitForTxs() { @@ -484,15 +483,19 @@ func newStateWithConfigAndBlockStore( require.NoError(t, stateStore.Save(state)) blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(ctx, + cs, err := NewState(ctx, logger.With("module", "consensus"), thisConfig.Consensus, - state, + stateStore, blockExec, blockStore, mempool, evpool, ) + if err != nil { + t.Fatal(err) + } + cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(logger.With("module", "events")) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index f01d013b3..ea9238a22 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -461,6 +461,7 @@ func TestReactorWithEvidence(t *testing.T) { stateStore := sm.NewStore(stateDB) state, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) + require.NoError(t, stateStore.Save(state)) thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i)) require.NoError(t, err) @@ -483,7 +484,6 @@ func TestReactorWithEvidence(t *testing.T) { log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, - 0, ) if thisConfig.Consensus.WaitForTxs() { @@ -506,8 +506,9 @@ func TestReactorWithEvidence(t *testing.T) { blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(ctx, logger.With("validator", i, "module", "consensus"), - thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) + cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"), + thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2) + require.NoError(t, err) cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 310eb0ab6..96de5ef28 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -84,7 +84,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro return err } - pb := newPlayback(file, fp, cs, cs.state.Copy()) + pb := newPlayback(file, fp, cs, cs.stateStore) defer pb.fp.Close() var nextN int // apply N msgs in a row @@ -126,17 +126,17 @@ type playback struct { count int // how many lines/msgs into the file are we // replays can be reset to beginning - fileName string // so we can close/reopen the file - genesisState sm.State // so the replay session knows where to restart from + fileName string // so we can close/reopen the file + stateStore sm.Store } -func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback { +func newPlayback(fileName string, fp *os.File, cs *State, store sm.Store) *playback { return &playback{ - cs: cs, - fp: fp, - fileName: fileName, - genesisState: genState, - dec: NewWALDecoder(fp), + cs: cs, + fp: fp, + fileName: fileName, + stateStore: store, + dec: NewWALDecoder(fp), } } @@ -145,8 +145,11 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event pb.cs.Stop() pb.cs.Wait() - newCS := NewState(ctx, pb.cs.logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, + newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec, pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) + if err != nil { + return err + } newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -345,9 +348,11 @@ func newConsensusStateForReplay( mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec, + consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec, blockStore, mempool, evpool) - + if err != nil { + return nil, err + } consensusState.SetEventBus(eventBus) return consensusState, nil } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 7f2045dcd..220cc0741 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -121,6 +121,9 @@ type State struct { // store blocks and commits blockStore sm.BlockStore + stateStore sm.Store + initialStatePopulated bool + // create and execute blocks blockExec *sm.BlockExecutor @@ -189,18 +192,19 @@ func NewState( ctx context.Context, logger log.Logger, cfg *config.ConsensusConfig, - state sm.State, + store sm.Store, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, txNotifier txNotifier, evpool evidencePool, options ...StateOption, -) *State { +) (*State, error) { cs := &State{ logger: logger, config: cfg, blockExec: blockExec, blockStore: blockStore, + stateStore: store, txNotifier: txNotifier, peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), @@ -220,6 +224,31 @@ func NewState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal + if err := cs.updateStateFromStore(ctx); err != nil { + return nil, err + } + + // NOTE: we do not call scheduleRound0 yet, we do that upon Start() + cs.BaseService = *service.NewBaseService(logger, "State", cs) + for _, option := range options { + option(cs) + } + + return cs, nil +} + +func (cs *State) updateStateFromStore(ctx context.Context) error { + if cs.initialStatePopulated { + return nil + } + state, err := cs.stateStore.Load() + if err != nil { + return fmt.Errorf("loading state: %w", err) + } + if state.IsEmpty() { + return nil + } + // We have no votes, so reconstruct LastCommit from SeenCommit. if state.LastBlockHeight > 0 { cs.reconstructLastCommit(state) @@ -227,14 +256,8 @@ func NewState( cs.updateToState(ctx, state) - // NOTE: we do not call scheduleRound0 yet, we do that upon Start() - - cs.BaseService = *service.NewBaseService(logger, "State", cs) - for _, option := range options { - option(cs) - } - - return cs + cs.initialStatePopulated = true + return nil } // SetEventBus sets event bus. @@ -365,6 +388,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit { // OnStart loads the latest state via the WAL, and starts the timeout and // receive routines. func (cs *State) OnStart(ctx context.Context) error { + if err := cs.updateStateFromStore(ctx); err != nil { + return err + } + // We may set the WAL in testing before calling Start, so only OpenWAL if its // still the nilWAL. if _, ok := cs.wal.(nilWAL); ok { diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index b10feb828..493ec1840 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -83,7 +83,11 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) + consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool) + if err != nil { + t.Fatal(err) + } + consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { consensusState.SetPrivValidator(ctx, privValidator) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 21429721d..26f039de4 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -94,7 +94,6 @@ func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, proxyAppConn proxy.AppConnMempool, - height int64, options ...TxMempoolOption, ) *TxMempool { @@ -102,7 +101,7 @@ func NewTxMempool( logger: logger, config: cfg, proxyAppConn: proxyAppConn, - height: height, + height: -1, cache: NopTxCache{}, metrics: NopMetrics(), txStore: NewTxStore(), diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index e2cf12e07..68eb5731b 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -95,7 +95,7 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo appConnMem.Wait() }) - return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) + return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...) } func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { diff --git a/internal/state/execution.go b/internal/state/execution.go index cdd6e009b..c67d9795a 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -360,8 +360,8 @@ func (blockExec *BlockExecutor) Commit( block.Height, block.Txs, deliverTxResponses, - TxPreCheck(state), - TxPostCheck(state), + TxPreCheckForState(state), + TxPostCheckForState(state), ) return res.Data, res.RetainHeight, err diff --git a/internal/state/tx_filter.go b/internal/state/tx_filter.go index 871e08ae6..11dd9ce67 100644 --- a/internal/state/tx_filter.go +++ b/internal/state/tx_filter.go @@ -1,22 +1,85 @@ package state import ( + "sync" + "time" + + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/types" ) -// TxPreCheck returns a function to filter transactions before processing. -// The function limits the size of a transaction to the block's maximum data size. -func TxPreCheck(state State) mempool.PreCheckFunc { - maxDataBytes := types.MaxDataBytesNoEvidence( - state.ConsensusParams.Block.MaxBytes, - state.Validators.Size(), +func cachingStateFetcher(store Store) func() (State, error) { + const ttl = time.Second + + var ( + last time.Time + mutex = &sync.Mutex{} + cache State + err error ) - return mempool.PreCheckMaxBytes(maxDataBytes) + + return func() (State, error) { + mutex.Lock() + defer mutex.Unlock() + + if time.Since(last) < ttl && cache.ChainID != "" { + return cache, nil + } + + cache, err = store.Load() + if err != nil { + return State{}, err + } + last = time.Now() + + return cache, nil + } + } -// TxPostCheck returns a function to filter transactions after processing. -// The function limits the gas wanted by a transaction to the block's maximum total gas. -func TxPostCheck(state State) mempool.PostCheckFunc { - return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas) +// TxPreCheckFromStore returns a function to filter transactions before processing. +// The function limits the size of a transaction to the block's maximum data size. +func TxPreCheckFromStore(store Store) mempool.PreCheckFunc { + fetch := cachingStateFetcher(store) + + return func(tx types.Tx) error { + state, err := fetch() + if err != nil { + return err + } + + return TxPreCheckForState(state)(tx) + } +} + +func TxPreCheckForState(state State) mempool.PreCheckFunc { + return func(tx types.Tx) error { + maxDataBytes := types.MaxDataBytesNoEvidence( + state.ConsensusParams.Block.MaxBytes, + state.Validators.Size(), + ) + return mempool.PreCheckMaxBytes(maxDataBytes)(tx) + } + +} + +// TxPostCheckFromStore returns a function to filter transactions after processing. +// The function limits the gas wanted by a transaction to the block's maximum total gas. +func TxPostCheckFromStore(store Store) mempool.PostCheckFunc { + fetch := cachingStateFetcher(store) + + return func(tx types.Tx, resp *abci.ResponseCheckTx) error { + state, err := fetch() + if err != nil { + return err + } + return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp) + } +} + +func TxPostCheckForState(state State) mempool.PostCheckFunc { + return func(tx types.Tx, resp *abci.ResponseCheckTx) error { + return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp) + } } diff --git a/internal/state/tx_filter_test.go b/internal/state/tx_filter_test.go index 27af28a40..ac85543b2 100644 --- a/internal/state/tx_filter_test.go +++ b/internal/state/tx_filter_test.go @@ -31,7 +31,7 @@ func TestTxFilter(t *testing.T) { state, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) - f := sm.TxPreCheck(state) + f := sm.TxPreCheckForState(state) if tc.isErr { assert.NotNil(t, f(tc.tx), "#%v", i) } else { diff --git a/node/node.go b/node/node.go index 8970a29d7..3b30a2853 100644 --- a/node/node.go +++ b/node/node.go @@ -143,11 +143,8 @@ func makeNode( return nil, combineCloseError(err, makeCloser(closers)) } - err = genDoc.ValidateAndComplete() - if err != nil { - return nil, combineCloseError( - fmt.Errorf("error in genesis doc: %w", err), - makeCloser(closers)) + if err = genDoc.ValidateAndComplete(); err != nil { + return nil, combineCloseError(fmt.Errorf("error in genesis doc: %w", err), makeCloser(closers)) } state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) @@ -241,10 +238,6 @@ func makeNode( } } - // Determine whether we should do block sync. This must happen after the handshake, since the - // app may modify the validator set, specifying ourself as the only validator. - blockSync := !onlyValidatorIsUs(state, pubKey) - logNodeStartupInfo(state, pubKey, logger, cfg.Mode) // TODO: Fetch and provide real options and do proper p2p bootstrapping. @@ -271,14 +264,14 @@ func makeNode( } mpReactor, mp, err := createMempoolReactor(ctx, - cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger, + cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } evReactor, evPool, err := createEvidenceReactor(ctx, - cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus, + cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -295,8 +288,12 @@ func makeNode( sm.BlockExecutorWithMetrics(nodeMetrics.state), ) + // Determine whether we should do block sync. This must happen after the handshake, since the + // app may modify the validator set, specifying ourself as the only validator. + blockSync := !onlyValidatorIsUs(state, pubKey) + csReactor, csState, err := createConsensusReactor(ctx, - cfg, state, blockExec, blockStore, mp, evPool, + cfg, stateStore, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, peerManager, router, logger, ) @@ -308,7 +305,7 @@ func makeNode( // doing a state sync first. bcReactor, err := blocksync.NewReactor(ctx, logger.With("module", "blockchain"), - state.Copy(), + stateStore, blockExec, blockStore, csReactor, @@ -730,10 +727,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { // loadStateFromDBOrGenesisDocProvider attempts to load the state from the // database, or creates one using the given genesisDocProvider. On success this also // returns the genesis doc loaded through the given provider. -func loadStateFromDBOrGenesisDocProvider( - stateStore sm.Store, - genDoc *types.GenesisDoc, -) (sm.State, error) { +func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.GenesisDoc) (sm.State, error) { // 1. Attempt to load state form the database state, err := stateStore.Load() @@ -747,6 +741,12 @@ func loadStateFromDBOrGenesisDocProvider( if err != nil { return sm.State{}, err } + + // 3. save the gensis document to the state store so + // its fetchable by other callers. + if err := stateStore.Save(state); err != nil { + return sm.State{}, err + } } return state, nil diff --git a/node/node_test.go b/node/node_test.go index 41cb1b6a9..5fbf80e00 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -292,7 +292,6 @@ func TestCreateProposalBlock(t *testing.T) { logger.With("module", "mempool"), cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, ) // Make EvidencePool @@ -392,7 +391,6 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { logger.With("module", "mempool"), cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, ) // fill the mempool with one txs just below the maximum size @@ -457,7 +455,6 @@ func TestMaxProposalBlockSize(t *testing.T) { logger.With("module", "mempool"), cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, ) // fill the mempool with one txs just below the maximum size diff --git a/node/setup.go b/node/setup.go index e880cd5c4..bc071f3cf 100644 --- a/node/setup.go +++ b/node/setup.go @@ -172,7 +172,7 @@ func createMempoolReactor( ctx context.Context, cfg *config.Config, proxyApp proxy.AppConns, - state sm.State, + store sm.Store, memplMetrics *mempool.Metrics, peerManager *p2p.PeerManager, router *p2p.Router, @@ -184,10 +184,9 @@ func createMempoolReactor( logger, cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, mempool.WithMetrics(memplMetrics), - mempool.WithPreCheck(sm.TxPreCheck(state)), - mempool.WithPostCheck(sm.TxPostCheck(state)), + mempool.WithPreCheck(sm.TxPreCheckFromStore(store)), + mempool.WithPostCheck(sm.TxPostCheckFromStore(store)), ) reactor, err := mempool.NewReactor( @@ -214,7 +213,7 @@ func createEvidenceReactor( ctx context.Context, cfg *config.Config, dbProvider config.DBProvider, - stateDB dbm.DB, + store sm.Store, blockStore *store.BlockStore, peerManager *p2p.PeerManager, router *p2p.Router, @@ -229,7 +228,7 @@ func createEvidenceReactor( logger = logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics) + evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics) if err != nil { return nil, nil, fmt.Errorf("creating evidence pool: %w", err) } @@ -253,7 +252,7 @@ func createEvidenceReactor( func createConsensusReactor( ctx context.Context, cfg *config.Config, - state sm.State, + store sm.Store, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mp mempool.Mempool, @@ -268,16 +267,19 @@ func createConsensusReactor( ) (*consensus.Reactor, *consensus.State, error) { logger = logger.With("module", "consensus") - consensusState := consensus.NewState(ctx, + consensusState, err := consensus.NewState(ctx, logger, cfg.Consensus, - state.Copy(), + store, blockExec, blockStore, mp, evidencePool, consensus.StateMetrics(csMetrics), ) + if err != nil { + return nil, nil, err + } if privValidator != nil && cfg.Mode == config.ModeValidator { consensusState.SetPrivValidator(ctx, privValidator) diff --git a/test/fuzz/mempool/checktx.go b/test/fuzz/mempool/checktx.go index ba60d72cc..a6e7006d0 100644 --- a/test/fuzz/mempool/checktx.go +++ b/test/fuzz/mempool/checktx.go @@ -31,7 +31,6 @@ func init() { log.NewNopLogger(), cfg, appConnMem, - 0, ) }