diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 670d0c42b..6ddc968ff 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -124,6 +124,7 @@ type State struct { stateStore sm.Store initialStatePopulated bool + skipBootstrapping bool // create and execute blocks blockExec *sm.BlockExecutor @@ -185,6 +186,12 @@ type State struct { // StateOption sets an optional parameter on the State. type StateOption func(*State) +// SkipStateStoreBootstrap is a state option forces the constructor to +// skip state bootstrapping during construction. +func SkipStateStoreBootstrap(sm *State) { + sm.skipBootstrapping = true +} + // NewState returns a new State. func NewState( ctx context.Context, @@ -223,16 +230,21 @@ func NewState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal - if err := cs.updateStateFromStore(ctx); err != nil { - return nil, err - } - // NOTE: we do not call scheduleRound0 yet, we do that upon Start() cs.BaseService = *service.NewBaseService(logger, "State", cs) for _, option := range options { option(cs) } + // this is not ideal, but it lets the consensus tests start + // node-fragments gracefully while letting the nodes + // themselves avoid this. + if !cs.skipBootstrapping { + if err := cs.updateStateFromStore(ctx); err != nil { + return nil, err + } + } + return cs, nil } diff --git a/node/node.go b/node/node.go index 9b608d6f0..3ee75cfcf 100644 --- a/node/node.go +++ b/node/node.go @@ -61,17 +61,18 @@ type nodeImpl struct { // services eventSinks []indexer.EventSink + initialState sm.State stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk evPool *evidence.Pool stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - - services []service.Service - rpcListeners []net.Listener // rpc servers - shutdownOps closer - rpcEnv *rpccore.Environment - prometheusSrv *http.Server + indexerService *indexer.Service + services []service.Service + rpcListeners []net.Listener // rpc servers + shutdownOps closer + rpcEnv *rpccore.Environment + prometheusSrv *http.Server } // newDefaultNode returns a Tendermint node with default settings for the @@ -157,20 +158,8 @@ func makeNode( nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) - // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy) - if err := proxyApp.Start(ctx); err != nil { - return nil, fmt.Errorf("error starting proxy app connections: %w", err) - } - - // EventBus and IndexerService must be started before the handshake because - // we might need to index the txs of the replayed block as this might not have happened - // when the node stopped last time (i.e. the node stopped or crashed after it saved the block - // but before it indexed the txs) eventBus := eventbus.NewDefault(logger.With("module", "events")) - if err := eventBus.Start(ctx); err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } var eventLog *eventlog.Log if w := cfg.RPC.EventLogWindowSize; w > 0 { @@ -185,13 +174,11 @@ func makeNode( } } - indexerService, eventSinks, err := createAndStartIndexerService( - ctx, cfg, dbProvider, eventBus, - logger, genDoc.ChainID, nodeMetrics.indexer) + indexerService, eventSinks, err := createIndexerService( + cfg, dbProvider, eventBus, logger, genDoc.ChainID, nodeMetrics.indexer) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } - closers = append(closers, func() error { indexerService.Stop(); return nil }) privValidator, err := createPrivval(ctx, logger, cfg, genDoc, filePrivval) if err != nil { @@ -213,34 +200,6 @@ func makeNode( } } - // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, - // and replays any blocks as necessary to sync tendermint with the app. - if err := consensus.NewHandshaker( - logger.With("module", "handshaker"), - stateStore, state, blockStore, eventBus, genDoc, - ).Handshake(ctx, proxyApp); err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } - - // Reload the state. It will have the Version.Consensus.App set by the - // Handshake, and may have other modifications as well (ie. depending on - // what happened during block replay). - state, err = stateStore.Load() - if err != nil { - return nil, combineCloseError( - fmt.Errorf("cannot load state: %w", err), - makeCloser(closers)) - } - - logNodeStartupInfo(state, pubKey, logger, cfg.Mode) - - // TODO: Fetch and provide real options and do proper p2p bootstrapping. - // TODO: Use a persistent peer database. - nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus) - if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } - peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID) closers = append(closers, peerCloser) if err != nil { @@ -257,15 +216,15 @@ func makeNode( privValidator: privValidator, peerManager: peerManager, - nodeInfo: nodeInfo, nodeKey: nodeKey, - eventSinks: eventSinks, + eventSinks: eventSinks, + indexerService: indexerService, + services: []service.Service{eventBus}, - services: []service.Service{eventBus}, - - stateStore: stateStore, - blockStore: blockStore, + initialState: state, + stateStore: stateStore, + blockStore: blockStore, shutdownOps: makeCloser(closers), @@ -408,6 +367,48 @@ func makeNode( // OnStart starts the Node. It implements service.Service. func (n *nodeImpl) OnStart(ctx context.Context) error { + if err := n.rpcEnv.ProxyApp.Start(ctx); err != nil { + return fmt.Errorf("error starting proxy app connections: %w", err) + } + + // EventBus and IndexerService must be started before the handshake because + // we might need to index the txs of the replayed block as this might not have happened + // when the node stopped last time (i.e. the node stopped or crashed after it saved the block + // but before it indexed the txs) + if err := n.rpcEnv.EventBus.Start(ctx); err != nil { + return err + } + + if err := n.indexerService.Start(ctx); err != nil { + return err + } + + // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, + // and replays any blocks as necessary to sync tendermint with the app. + if err := consensus.NewHandshaker(n.logger.With("module", "handshaker"), + n.stateStore, n.initialState, n.blockStore, n.rpcEnv.EventBus, n.genesisDoc, + ).Handshake(ctx, n.rpcEnv.ProxyApp); err != nil { + return err + } + + // Reload the state. It will have the Version.Consensus.App set by the + // Handshake, and may have other modifications as well (ie. depending on + // what happened during block replay). + state, err := n.stateStore.Load() + if err != nil { + return fmt.Errorf("cannot load state: %w", err) + } + + logNodeStartupInfo(state, n.rpcEnv.PubKey, n.logger, n.config.Mode) + + // TODO: Fetch and provide real options and do proper p2p bootstrapping. + // TODO: Use a persistent peer database. + n.nodeInfo, err = makeNodeInfo(n.config, n.nodeKey, n.eventSinks, n.genesisDoc, state.Version.Consensus) + if err != nil { + return err + } + // Start Internal Services + if n.config.RPC.PprofListenAddress != "" { rpcCtx, rpcCancel := context.WithCancel(ctx) srv := &http.Server{Addr: n.config.RPC.PprofListenAddress, Handler: nil} @@ -445,7 +446,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } - state, err := n.stateStore.Load() + state, err = n.stateStore.Load() if err != nil { return err } diff --git a/node/node_test.go b/node/node_test.go index 1a1fa6f81..2736ca818 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -62,12 +62,13 @@ func TestNodeStartStop(t *testing.T) { require.NoError(t, n.Start(ctx)) // wait for the node to produce a block - tctx, cancel := context.WithTimeout(ctx, time.Second) + tctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() blocksSub, err := n.EventBus().SubscribeWithArgs(tctx, pubsub.SubscribeArgs{ ClientID: "node_test", Query: types.EventQueryNewBlock, + Limit: 1000, }) require.NoError(t, err) _, err = blocksSub.Next(tctx) @@ -138,6 +139,8 @@ func TestNodeSetAppVersion(t *testing.T) { // create node n := getTestNode(ctx, t, cfg, logger) + require.NoError(t, n.Start(ctx)) + // default config uses the kvstore app appVersion := kvstore.ProtocolVersion @@ -624,7 +627,7 @@ func TestNodeSetEventSink(t *testing.T) { genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) - indexService, eventSinks, err := createAndStartIndexerService(ctx, cfg, + indexService, eventSinks, err := createIndexerService(cfg, config.DefaultDBProvider, eventBus, logger, genDoc.ChainID, indexer.NopMetrics()) require.NoError(t, err) diff --git a/node/setup.go b/node/setup.go index e87fac79c..1057fb6fa 100644 --- a/node/setup.go +++ b/node/setup.go @@ -95,8 +95,7 @@ func initDBs( return blockStore, stateDB, makeCloser(closers), nil } -func createAndStartIndexerService( - ctx context.Context, +func createIndexerService( cfg *config.Config, dbProvider config.DBProvider, eventBus *eventbus.EventBus, @@ -116,10 +115,6 @@ func createAndStartIndexerService( Metrics: metrics, }) - if err := indexerService.Start(ctx); err != nil { - return nil, nil, err - } - return indexerService, eventSinks, nil } @@ -264,6 +259,7 @@ func createConsensusReactor( evidencePool, eventBus, consensus.StateMetrics(csMetrics), + consensus.SkipStateStoreBootstrap, ) if err != nil { return nil, nil, err