diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 6533fa046..6f3743e62 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -359,6 +359,13 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { go r.requestRoutine(ctx, bsCh) go r.poolRoutine(ctx, true, bsCh) + if err := r.PublishStatus(ctx, types.EventDataBlockSyncStatus{ + Complete: false, + Height: state.LastBlockHeight, + }); err != nil { + return err + } + return nil } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 1c6c99cb9..c73f998d8 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -752,9 +752,6 @@ func (r *Reactor) gossipVotesForHeight( func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) { logger := r.logger.With("peer", ps.peerID) - // XXX: simple hack to throttle logs upon sleep - logThrottle := 0 - timer := time.NewTimer(0) defer timer.Stop() @@ -772,13 +769,6 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh rs := r.getRoundState() prs := ps.GetRoundState() - switch logThrottle { - case 1: // first sleep - logThrottle = 2 - case 2: // no more sleep - logThrottle = 0 - } - // if height matches, then send LastCommit, Prevotes, and Precommits if rs.Height == prs.Height { if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil { @@ -813,20 +803,6 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh } } - if logThrottle == 0 { - // we sent nothing -- sleep - logThrottle = 1 - logger.Debug( - "no votes to send; sleeping", - "rs.Height", rs.Height, - "prs.Height", prs.Height, - "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, - "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits, - ) - } else if logThrottle == 2 { - logThrottle = 1 - } - timer.Reset(r.state.config.PeerGossipSleepDuration) select { case <-ctx.Done(): diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 15abf3ef0..5ca1d0798 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -310,20 +310,21 @@ func (r *Reactor) OnStop() { r.dispatcher.Close() } -func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataStateSyncStatus) error { - if r.eventBus == nil { - return errors.New("event system is not configured") - } - - return r.eventBus.PublishEventStateSyncStatus(ctx, event) -} - // Sync runs a state sync, fetching snapshots and providing chunks to the // application. At the close of the operation, Sync will bootstrap the state // store and persist the commit at that height so that either consensus or // blocksync can commence. It will then proceed to backfill the necessary amount // of historical blocks before participating in consensus func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { + if r.eventBus != nil { + if err := r.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{ + Complete: false, + Height: r.initialHeight, + }); err != nil { + return sm.State{}, err + } + } + // We need at least two peers (for cross-referencing of light blocks) before we can // begin state sync if err := r.waitForEnoughPeers(ctx, 2); err != nil { @@ -357,21 +358,27 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { return sm.State{}, err } - err = r.stateStore.Bootstrap(state) - if err != nil { + if err := r.stateStore.Bootstrap(state); err != nil { return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err) } - err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit) - if err != nil { + if err := r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit); err != nil { return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err) } - err = r.Backfill(ctx, state) - if err != nil { + if err := r.Backfill(ctx, state); err != nil { r.logger.Error("backfill failed. Proceeding optimistically...", "err", err) } + if r.eventBus != nil { + if err := r.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{ + Complete: true, + Height: state.LastBlockHeight, + }); err != nil { + return sm.State{}, err + } + } + return state, nil } diff --git a/node/node.go b/node/node.go index 3ee75cfcf..37b66c697 100644 --- a/node/node.go +++ b/node/node.go @@ -489,23 +489,6 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { // TODO: We shouldn't run state sync if we already have state that has a // LastBlockHeight that is not InitialHeight if n.stateSync { - bcR := n.rpcEnv.BlockSyncReactor - - // we need to get the genesis state to get parameters such as - state, err := sm.MakeGenesisState(n.genesisDoc) - if err != nil { - return fmt.Errorf("unable to derive state: %w", err) - } - - // TODO: we may want to move these events within the respective - // reactors. - // At the beginning of the statesync start, we use the initialHeight as the event height - // because of the statesync doesn't have the concreate state height before fetched the snapshot. - d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight} - if err := n.stateSyncReactor.PublishStatus(ctx, d); err != nil { - n.logger.Error("failed to emit the statesync start event", "err", err) - } - // RUN STATE SYNC NOW: // // TODO: Eventually this should run as part of some @@ -521,33 +504,15 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { n.rpcEnv.ConsensusReactor.SetStateSyncingMetrics(0) - if err := n.stateSyncReactor.PublishStatus(ctx, - types.EventDataStateSyncStatus{ - Complete: true, - Height: ssState.LastBlockHeight, - }); err != nil { - n.logger.Error("failed to emit the statesync start event", "err", err) - return err - } - // TODO: Some form of orchestrator is needed here between the state // advancing reactors to be able to control which one of the three // is running // FIXME Very ugly to have these metrics bleed through here. n.rpcEnv.ConsensusReactor.SetBlockSyncingMetrics(1) - if err := bcR.SwitchToBlockSync(ctx, ssState); err != nil { + if err := n.rpcEnv.BlockSyncReactor.SwitchToBlockSync(ctx, ssState); err != nil { n.logger.Error("failed to switch to block sync", "err", err) return err } - - if err := bcR.PublishStatus(ctx, - types.EventDataBlockSyncStatus{ - Complete: false, - Height: ssState.LastBlockHeight, - }); err != nil { - n.logger.Error("failed to emit the block sync starting event", "err", err) - return err - } } return nil diff --git a/types/validator_set.go b/types/validator_set.go index 1f773d1b5..d825ff590 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -68,7 +68,7 @@ type ValidatorSet struct { // MaxVotesCount - commits by a validator set larger than this will fail // validation. func NewValidatorSet(valz []*Validator) *ValidatorSet { - vals := &ValidatorSet{} + vals := &ValidatorSet{Validators: []*Validator{}} err := vals.updateWithChangeSet(valz, false) if err != nil { panic(fmt.Errorf("cannot create validator set: %w", err)) @@ -235,9 +235,6 @@ func (vals *ValidatorSet) shiftByAvgProposerPriority() { // Makes a copy of the validator list. func validatorListCopy(valsList []*Validator) []*Validator { - if valsList == nil { - return nil - } valsCopy := make([]*Validator, len(valsList)) for i, val := range valsList { valsCopy[i] = val.Copy()