node+statesync: normalize initialization (#8275)

This commit is contained in:
Sam Kleinman
2022-04-08 11:00:58 -04:00
committed by GitHub
parent 5df277caca
commit 90b951af72
3 changed files with 74 additions and 73 deletions

View File

@@ -143,6 +143,12 @@ type Reactor struct {
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
sendBlockError func(context.Context, p2p.PeerError) error
postSyncHook func(context.Context, sm.State) error
// when true, the reactor will, during startup perform a
// statesync for this node, and otherwise just provide
// snapshots to other nodes.
needsStateSync bool
// Dispatcher is used to multiplex light block requests and responses over multiple
// peers used by the p2p state provider and in reverse sync.
@@ -171,7 +177,6 @@ type Reactor struct {
// and querying, references to p2p Channels and a channel to listen for peer
// updates on. Note, the reactor will close all p2p Channels when stopping.
func NewReactor(
ctx context.Context,
chainID string,
initialHeight int64,
cfg config.StateSyncConfig,
@@ -184,23 +189,26 @@ func NewReactor(
tempDir string,
ssMetrics *Metrics,
eventBus *eventbus.EventBus,
postSyncHook func(context.Context, sm.State) error,
needsStateSync bool,
) *Reactor {
r := &Reactor{
logger: logger,
chainID: chainID,
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
chCreator: channelCreator,
peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
logger: logger,
chainID: chainID,
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
chCreator: channelCreator,
peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
postSyncHook: postSyncHook,
needsStateSync: needsStateSync,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
@@ -300,6 +308,14 @@ func (r *Reactor) OnStart(ctx context.Context) error {
go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
go r.processPeerUpdates(ctx, r.peerEvents(ctx))
if r.needsStateSync {
r.logger.Info("starting state sync")
if _, err := r.Sync(ctx); err != nil {
r.logger.Error("state sync failed; shutting down this node", "err", err)
return err
}
}
return nil
}
@@ -379,6 +395,12 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
}
}
if r.postSyncHook != nil {
if err := r.postSyncHook(ctx, state); err != nil {
return sm.State{}, err
}
}
return state, nil
}

View File

@@ -155,7 +155,6 @@ func setup(
logger := log.NewNopLogger()
rts.reactor = NewReactor(
ctx,
factory.DefaultTestChainID,
1,
*cfg,
@@ -167,7 +166,9 @@ func setup(
rts.blockStore,
"",
m,
nil, // eventbus can be nil
nil, // eventbus can be nil
nil, // post-sync-hook
false, // run Sync during Start()
)
rts.syncer = &syncer{

View File

@@ -60,19 +60,17 @@ type nodeImpl struct {
nodeKey types.NodeKey // our node privkey
// services
eventSinks []indexer.EventSink
initialState sm.State
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
evPool *evidence.Pool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
indexerService *indexer.Service
services []service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
eventSinks []indexer.EventSink
initialState sm.State
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
evPool *evidence.Pool
indexerService *indexer.Service
services []service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
}
// newDefaultNode returns a Tendermint node with default settings for the
@@ -331,13 +329,15 @@ func makeNode(
nodeMetrics.consensus.BlockSyncing.Set(1)
}
if cfg.P2P.PexReactor {
node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe))
}
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
node.stateSync = stateSync
node.stateSyncReactor = statesync.NewReactor(
ctx,
node.services = append(node.services, statesync.NewReactor(
genDoc.ChainID,
genDoc.InitialHeight,
*cfg.StateSync,
@@ -350,11 +350,24 @@ func makeNode(
cfg.StateSync.TempDir,
nodeMetrics.statesync,
eventBus,
)
// the post-sync operation
func(ctx context.Context, state sm.State) error {
csReactor.SetStateSyncingMetrics(0)
if cfg.P2P.PexReactor {
node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe))
}
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
csReactor.SetBlockSyncingMetrics(1)
if err := bcReactor.SwitchToBlockSync(ctx, state); err != nil {
logger.Error("failed to switch to block sync", "err", err)
return err
}
return nil
},
stateSync,
))
if cfg.Mode == config.ModeValidator {
node.rpcEnv.PubKey = pubKey
@@ -481,40 +494,6 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
if err := n.stateSyncReactor.Start(ctx); err != nil {
return err
}
// Run state sync
// TODO: We shouldn't run state sync if we already have state that has a
// LastBlockHeight that is not InitialHeight
if n.stateSync {
// RUN STATE SYNC NOW:
//
// TODO: Eventually this should run as part of some
// separate orchestrator
n.logger.Info("starting state sync")
ssState, err := n.stateSyncReactor.Sync(ctx)
if err != nil {
n.logger.Error("state sync failed; shutting down this node", "err", err)
// stop the node
n.Stop()
return err
}
n.rpcEnv.ConsensusReactor.SetStateSyncingMetrics(0)
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
n.rpcEnv.ConsensusReactor.SetBlockSyncingMetrics(1)
if err := n.rpcEnv.BlockSyncReactor.SwitchToBlockSync(ctx, ssState); err != nil {
n.logger.Error("failed to switch to block sync", "err", err)
return err
}
}
return nil
}
@@ -531,7 +510,6 @@ func (n *nodeImpl) OnStop() {
reactor.Wait()
}
n.stateSyncReactor.Wait()
n.router.Wait()
n.rpcEnv.IsListening = false