Merge branch 'master' into wb/issue-7950

This commit is contained in:
William Banfield
2022-04-08 14:59:28 -04:00
committed by GitHub
6 changed files with 96 additions and 80 deletions

View File

@@ -77,14 +77,25 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er
EarliestAppHash: earliestAppHash,
EarliestBlockHeight: earliestBlockHeight,
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
MaxPeerBlockHeight: env.BlockSyncReactor.GetMaxPeerBlockHeight(),
CatchingUp: env.ConsensusReactor.WaitSync(),
TotalSyncedTime: env.BlockSyncReactor.GetTotalSyncedTime(),
RemainingTime: env.BlockSyncReactor.GetRemainingSyncTime(),
// this should start as true, if consensus
// hasn't started yet, and then flip to false
// (or true,) depending on what's actually
// happening.
CatchingUp: true,
},
ValidatorInfo: validatorInfo,
}
if env.ConsensusReactor != nil {
result.SyncInfo.CatchingUp = env.ConsensusReactor.WaitSync()
}
if env.BlockSyncReactor != nil {
result.SyncInfo.MaxPeerBlockHeight = env.BlockSyncReactor.GetMaxPeerBlockHeight()
result.SyncInfo.TotalSyncedTime = env.BlockSyncReactor.GetTotalSyncedTime()
result.SyncInfo.RemainingTime = env.BlockSyncReactor.GetRemainingSyncTime()
}
if env.StateSyncMetricer != nil {
result.SyncInfo.TotalSnapshots = env.StateSyncMetricer.TotalSnapshots()
result.SyncInfo.ChunkProcessAvgTime = env.StateSyncMetricer.ChunkProcessAvgTime()
@@ -103,6 +114,9 @@ func (env *Environment) validatorAtHeight(h int64) *types.Validator {
if err != nil {
return nil
}
if env.ConsensusState == nil {
return nil
}
if env.PubKey == nil {
return nil
}

View File

@@ -143,6 +143,12 @@ type Reactor struct {
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
sendBlockError func(context.Context, p2p.PeerError) error
postSyncHook func(context.Context, sm.State) error
// when true, the reactor will, during startup perform a
// statesync for this node, and otherwise just provide
// snapshots to other nodes.
needsStateSync bool
// Dispatcher is used to multiplex light block requests and responses over multiple
// peers used by the p2p state provider and in reverse sync.
@@ -171,7 +177,6 @@ type Reactor struct {
// and querying, references to p2p Channels and a channel to listen for peer
// updates on. Note, the reactor will close all p2p Channels when stopping.
func NewReactor(
ctx context.Context,
chainID string,
initialHeight int64,
cfg config.StateSyncConfig,
@@ -184,23 +189,26 @@ func NewReactor(
tempDir string,
ssMetrics *Metrics,
eventBus *eventbus.EventBus,
postSyncHook func(context.Context, sm.State) error,
needsStateSync bool,
) *Reactor {
r := &Reactor{
logger: logger,
chainID: chainID,
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
chCreator: channelCreator,
peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
logger: logger,
chainID: chainID,
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
chCreator: channelCreator,
peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
postSyncHook: postSyncHook,
needsStateSync: needsStateSync,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
@@ -300,6 +308,14 @@ func (r *Reactor) OnStart(ctx context.Context) error {
go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
go r.processPeerUpdates(ctx, r.peerEvents(ctx))
if r.needsStateSync {
r.logger.Info("starting state sync")
if _, err := r.Sync(ctx); err != nil {
r.logger.Error("state sync failed; shutting down this node", "err", err)
return err
}
}
return nil
}
@@ -379,6 +395,12 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
}
}
if r.postSyncHook != nil {
if err := r.postSyncHook(ctx, state); err != nil {
return sm.State{}, err
}
}
return state, nil
}

View File

@@ -155,7 +155,6 @@ func setup(
logger := log.NewNopLogger()
rts.reactor = NewReactor(
ctx,
factory.DefaultTestChainID,
1,
*cfg,
@@ -167,7 +166,9 @@ func setup(
rts.blockStore,
"",
m,
nil, // eventbus can be nil
nil, // eventbus can be nil
nil, // post-sync-hook
false, // run Sync during Start()
)
rts.syncer = &syncer{