node: move handshake out of constructor (#8264)

This commit is contained in:
Sam Kleinman
2022-04-07 11:21:10 -04:00
committed by GitHub
parent 681cdf8347
commit 6ed3f2d98d
4 changed files with 81 additions and 69 deletions

View File

@@ -124,6 +124,7 @@ type State struct {
stateStore sm.Store
initialStatePopulated bool
skipBootstrapping bool
// create and execute blocks
blockExec *sm.BlockExecutor
@@ -185,6 +186,12 @@ type State struct {
// StateOption sets an optional parameter on the State.
type StateOption func(*State)
// SkipStateStoreBootstrap is a state option forces the constructor to
// skip state bootstrapping during construction.
func SkipStateStoreBootstrap(sm *State) {
sm.skipBootstrapping = true
}
// NewState returns a new State.
func NewState(
ctx context.Context,
@@ -223,16 +230,21 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
if err := cs.updateStateFromStore(ctx); err != nil {
return nil, err
}
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(logger, "State", cs)
for _, option := range options {
option(cs)
}
// this is not ideal, but it lets the consensus tests start
// node-fragments gracefully while letting the nodes
// themselves avoid this.
if !cs.skipBootstrapping {
if err := cs.updateStateFromStore(ctx); err != nil {
return nil, err
}
}
return cs, nil
}

View File

@@ -61,17 +61,18 @@ type nodeImpl struct {
// services
eventSinks []indexer.EventSink
initialState sm.State
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
evPool *evidence.Pool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
services []service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
indexerService *indexer.Service
services []service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
}
// newDefaultNode returns a Tendermint node with default settings for the
@@ -157,20 +158,8 @@ func makeNode(
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy)
if err := proxyApp.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %w", err)
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped or crashed after it saved the block
// but before it indexed the txs)
eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(ctx); err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
var eventLog *eventlog.Log
if w := cfg.RPC.EventLogWindowSize; w > 0 {
@@ -185,13 +174,11 @@ func makeNode(
}
}
indexerService, eventSinks, err := createAndStartIndexerService(
ctx, cfg, dbProvider, eventBus,
logger, genDoc.ChainID, nodeMetrics.indexer)
indexerService, eventSinks, err := createIndexerService(
cfg, dbProvider, eventBus, logger, genDoc.ChainID, nodeMetrics.indexer)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
closers = append(closers, func() error { indexerService.Stop(); return nil })
privValidator, err := createPrivval(ctx, logger, cfg, genDoc, filePrivval)
if err != nil {
@@ -213,34 +200,6 @@ func makeNode(
}
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if err := consensus.NewHandshaker(
logger.With("module", "handshaker"),
stateStore, state, blockStore, eventBus, genDoc,
).Handshake(ctx, proxyApp); err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, combineCloseError(
fmt.Errorf("cannot load state: %w", err),
makeCloser(closers))
}
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
closers = append(closers, peerCloser)
if err != nil {
@@ -257,15 +216,15 @@ func makeNode(
privValidator: privValidator,
peerManager: peerManager,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
eventSinks: eventSinks,
eventSinks: eventSinks,
indexerService: indexerService,
services: []service.Service{eventBus},
services: []service.Service{eventBus},
stateStore: stateStore,
blockStore: blockStore,
initialState: state,
stateStore: stateStore,
blockStore: blockStore,
shutdownOps: makeCloser(closers),
@@ -408,6 +367,48 @@ func makeNode(
// OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart(ctx context.Context) error {
if err := n.rpcEnv.ProxyApp.Start(ctx); err != nil {
return fmt.Errorf("error starting proxy app connections: %w", err)
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped or crashed after it saved the block
// but before it indexed the txs)
if err := n.rpcEnv.EventBus.Start(ctx); err != nil {
return err
}
if err := n.indexerService.Start(ctx); err != nil {
return err
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if err := consensus.NewHandshaker(n.logger.With("module", "handshaker"),
n.stateStore, n.initialState, n.blockStore, n.rpcEnv.EventBus, n.genesisDoc,
).Handshake(ctx, n.rpcEnv.ProxyApp); err != nil {
return err
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err := n.stateStore.Load()
if err != nil {
return fmt.Errorf("cannot load state: %w", err)
}
logNodeStartupInfo(state, n.rpcEnv.PubKey, n.logger, n.config.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
n.nodeInfo, err = makeNodeInfo(n.config, n.nodeKey, n.eventSinks, n.genesisDoc, state.Version.Consensus)
if err != nil {
return err
}
// Start Internal Services
if n.config.RPC.PprofListenAddress != "" {
rpcCtx, rpcCancel := context.WithCancel(ctx)
srv := &http.Server{Addr: n.config.RPC.PprofListenAddress, Handler: nil}
@@ -445,7 +446,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
state, err := n.stateStore.Load()
state, err = n.stateStore.Load()
if err != nil {
return err
}

View File

@@ -62,12 +62,13 @@ func TestNodeStartStop(t *testing.T) {
require.NoError(t, n.Start(ctx))
// wait for the node to produce a block
tctx, cancel := context.WithTimeout(ctx, time.Second)
tctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
blocksSub, err := n.EventBus().SubscribeWithArgs(tctx, pubsub.SubscribeArgs{
ClientID: "node_test",
Query: types.EventQueryNewBlock,
Limit: 1000,
})
require.NoError(t, err)
_, err = blocksSub.Next(tctx)
@@ -138,6 +139,8 @@ func TestNodeSetAppVersion(t *testing.T) {
// create node
n := getTestNode(ctx, t, cfg, logger)
require.NoError(t, n.Start(ctx))
// default config uses the kvstore app
appVersion := kvstore.ProtocolVersion
@@ -624,7 +627,7 @@ func TestNodeSetEventSink(t *testing.T) {
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
indexService, eventSinks, err := createAndStartIndexerService(ctx, cfg,
indexService, eventSinks, err := createIndexerService(cfg,
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID,
indexer.NopMetrics())
require.NoError(t, err)

View File

@@ -95,8 +95,7 @@ func initDBs(
return blockStore, stateDB, makeCloser(closers), nil
}
func createAndStartIndexerService(
ctx context.Context,
func createIndexerService(
cfg *config.Config,
dbProvider config.DBProvider,
eventBus *eventbus.EventBus,
@@ -116,10 +115,6 @@ func createAndStartIndexerService(
Metrics: metrics,
})
if err := indexerService.Start(ctx); err != nil {
return nil, nil, err
}
return indexerService, eventSinks, nil
}
@@ -264,6 +259,7 @@ func createConsensusReactor(
evidencePool,
eventBus,
consensus.StateMetrics(csMetrics),
consensus.SkipStateStoreBootstrap,
)
if err != nil {
return nil, nil, err