diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 3020e6e16..18589c93a 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -211,7 +211,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { go r.peerStatsRoutine(ctx, peerUpdates) r.subscribeToBroadcastEvents(chBundle.state) - go r.updateRoundStateRoutine() if !r.WaitSync() { if err := r.state.Start(ctx); err != nil { @@ -219,6 +218,8 @@ func (r *Reactor) OnStart(ctx context.Context) error { } } + go r.updateRoundStateRoutine(ctx) + go r.processStateCh(ctx, chBundle) go r.processDataCh(ctx, chBundle) go r.processVoteCh(ctx, chBundle) @@ -260,6 +261,15 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a // NewRoundStepMessage. r.state.updateToState(ctx, state) + if err := r.state.Start(ctx); err != nil { + panic(fmt.Sprintf(`failed to start consensus state: %v + +conS: +%+v + +conR: +%+v`, err, r.state, r)) + } r.mtx.Lock() r.waitSync = false @@ -273,16 +283,6 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL r.state.doWALCatchup = false } - if err := r.state.Start(ctx); err != nil { - panic(fmt.Sprintf(`failed to start consensus state: %v - -conS: -%+v - -conR: -%+v`, err, r.state, r)) - } - d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight} if err := r.eventBus.PublishEventBlockSyncStatus(ctx, d); err != nil { r.logger.Error("failed to emit the blocksync complete event", "err", err) @@ -408,17 +408,20 @@ func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.Node }) } -func (r *Reactor) updateRoundStateRoutine() { +func (r *Reactor) updateRoundStateRoutine(ctx context.Context) { t := time.NewTicker(100 * time.Microsecond) defer t.Stop() - for range t.C { - if !r.IsRunning() { + + for { + select { + case <-ctx.Done(): return + case <-t.C: + rs := r.state.GetRoundState() + r.mtx.Lock() + r.rs = rs + r.mtx.Unlock() } - rs := r.state.GetRoundState() - r.mtx.Lock() - r.rs = rs - r.mtx.Unlock() } } @@ -839,16 +842,6 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh // TODO create more reliable copies of these // structures so the following go routines don't race rs := r.getRoundState() - if rs.Votes == nil { - // if we have gotten here, we've connected to - // a peer before the state of the reactor has - // updated to the current round, so we should - // sleep for a while before we attempt to - // start gossiping the data that doesn't exist - // yet. This prevents a panic. - timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) - continue - } prs := ps.GetRoundState() wg := &sync.WaitGroup{}