statesync+blocksync: move event publications into the sync operations (#8274)

This commit is contained in:
Sam Kleinman
2022-04-07 16:23:36 -04:00
committed by GitHub
parent 6ed3f2d98d
commit 9d20e06900
5 changed files with 30 additions and 78 deletions

View File

@@ -359,6 +359,13 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
go r.requestRoutine(ctx, bsCh)
go r.poolRoutine(ctx, true, bsCh)
if err := r.PublishStatus(ctx, types.EventDataBlockSyncStatus{
Complete: false,
Height: state.LastBlockHeight,
}); err != nil {
return err
}
return nil
}

View File

@@ -752,9 +752,6 @@ func (r *Reactor) gossipVotesForHeight(
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
// XXX: simple hack to throttle logs upon sleep
logThrottle := 0
timer := time.NewTimer(0)
defer timer.Stop()
@@ -772,13 +769,6 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
rs := r.getRoundState()
prs := ps.GetRoundState()
switch logThrottle {
case 1: // first sleep
logThrottle = 2
case 2: // no more sleep
logThrottle = 0
}
// if height matches, then send LastCommit, Prevotes, and Precommits
if rs.Height == prs.Height {
if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil {
@@ -813,20 +803,6 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
}
}
if logThrottle == 0 {
// we sent nothing -- sleep
logThrottle = 1
logger.Debug(
"no votes to send; sleeping",
"rs.Height", rs.Height,
"prs.Height", prs.Height,
"localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits,
)
} else if logThrottle == 2 {
logThrottle = 1
}
timer.Reset(r.state.config.PeerGossipSleepDuration)
select {
case <-ctx.Done():

View File

@@ -310,20 +310,21 @@ func (r *Reactor) OnStop() {
r.dispatcher.Close()
}
func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataStateSyncStatus) error {
if r.eventBus == nil {
return errors.New("event system is not configured")
}
return r.eventBus.PublishEventStateSyncStatus(ctx, event)
}
// Sync runs a state sync, fetching snapshots and providing chunks to the
// application. At the close of the operation, Sync will bootstrap the state
// store and persist the commit at that height so that either consensus or
// blocksync can commence. It will then proceed to backfill the necessary amount
// of historical blocks before participating in consensus
func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
if r.eventBus != nil {
if err := r.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{
Complete: false,
Height: r.initialHeight,
}); err != nil {
return sm.State{}, err
}
}
// We need at least two peers (for cross-referencing of light blocks) before we can
// begin state sync
if err := r.waitForEnoughPeers(ctx, 2); err != nil {
@@ -357,21 +358,27 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
return sm.State{}, err
}
err = r.stateStore.Bootstrap(state)
if err != nil {
if err := r.stateStore.Bootstrap(state); err != nil {
return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err)
}
err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
if err != nil {
if err := r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit); err != nil {
return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err)
}
err = r.Backfill(ctx, state)
if err != nil {
if err := r.Backfill(ctx, state); err != nil {
r.logger.Error("backfill failed. Proceeding optimistically...", "err", err)
}
if r.eventBus != nil {
if err := r.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{
Complete: true,
Height: state.LastBlockHeight,
}); err != nil {
return sm.State{}, err
}
}
return state, nil
}

View File

@@ -489,23 +489,6 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
// TODO: We shouldn't run state sync if we already have state that has a
// LastBlockHeight that is not InitialHeight
if n.stateSync {
bcR := n.rpcEnv.BlockSyncReactor
// we need to get the genesis state to get parameters such as
state, err := sm.MakeGenesisState(n.genesisDoc)
if err != nil {
return fmt.Errorf("unable to derive state: %w", err)
}
// TODO: we may want to move these events within the respective
// reactors.
// At the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
if err := n.stateSyncReactor.PublishStatus(ctx, d); err != nil {
n.logger.Error("failed to emit the statesync start event", "err", err)
}
// RUN STATE SYNC NOW:
//
// TODO: Eventually this should run as part of some
@@ -521,33 +504,15 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
n.rpcEnv.ConsensusReactor.SetStateSyncingMetrics(0)
if err := n.stateSyncReactor.PublishStatus(ctx,
types.EventDataStateSyncStatus{
Complete: true,
Height: ssState.LastBlockHeight,
}); err != nil {
n.logger.Error("failed to emit the statesync start event", "err", err)
return err
}
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
n.rpcEnv.ConsensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(ctx, ssState); err != nil {
if err := n.rpcEnv.BlockSyncReactor.SwitchToBlockSync(ctx, ssState); err != nil {
n.logger.Error("failed to switch to block sync", "err", err)
return err
}
if err := bcR.PublishStatus(ctx,
types.EventDataBlockSyncStatus{
Complete: false,
Height: ssState.LastBlockHeight,
}); err != nil {
n.logger.Error("failed to emit the block sync starting event", "err", err)
return err
}
}
return nil

View File

@@ -68,7 +68,7 @@ type ValidatorSet struct {
// MaxVotesCount - commits by a validator set larger than this will fail
// validation.
func NewValidatorSet(valz []*Validator) *ValidatorSet {
vals := &ValidatorSet{}
vals := &ValidatorSet{Validators: []*Validator{}}
err := vals.updateWithChangeSet(valz, false)
if err != nil {
panic(fmt.Errorf("cannot create validator set: %w", err))
@@ -235,9 +235,6 @@ func (vals *ValidatorSet) shiftByAvgProposerPriority() {
// Makes a copy of the validator list.
func validatorListCopy(valsList []*Validator) []*Validator {
if valsList == nil {
return nil
}
valsCopy := make([]*Validator, len(valsList))
for i, val := range valsList {
valsCopy[i] = val.Copy()