mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-04 04:04:00 +00:00
switch to consensus change startup ordering (#8290)
This commit is contained in:
@@ -211,7 +211,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
go r.peerStatsRoutine(ctx, peerUpdates)
|
||||
|
||||
r.subscribeToBroadcastEvents(chBundle.state)
|
||||
go r.updateRoundStateRoutine()
|
||||
|
||||
if !r.WaitSync() {
|
||||
if err := r.state.Start(ctx); err != nil {
|
||||
@@ -219,6 +218,8 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
go r.updateRoundStateRoutine(ctx)
|
||||
|
||||
go r.processStateCh(ctx, chBundle)
|
||||
go r.processDataCh(ctx, chBundle)
|
||||
go r.processVoteCh(ctx, chBundle)
|
||||
@@ -260,6 +261,15 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL
|
||||
// NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
|
||||
// NewRoundStepMessage.
|
||||
r.state.updateToState(ctx, state)
|
||||
if err := r.state.Start(ctx); err != nil {
|
||||
panic(fmt.Sprintf(`failed to start consensus state: %v
|
||||
|
||||
conS:
|
||||
%+v
|
||||
|
||||
conR:
|
||||
%+v`, err, r.state, r))
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
r.waitSync = false
|
||||
@@ -273,16 +283,6 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL
|
||||
r.state.doWALCatchup = false
|
||||
}
|
||||
|
||||
if err := r.state.Start(ctx); err != nil {
|
||||
panic(fmt.Sprintf(`failed to start consensus state: %v
|
||||
|
||||
conS:
|
||||
%+v
|
||||
|
||||
conR:
|
||||
%+v`, err, r.state, r))
|
||||
}
|
||||
|
||||
d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight}
|
||||
if err := r.eventBus.PublishEventBlockSyncStatus(ctx, d); err != nil {
|
||||
r.logger.Error("failed to emit the blocksync complete event", "err", err)
|
||||
@@ -408,17 +408,20 @@ func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.Node
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) updateRoundStateRoutine() {
|
||||
func (r *Reactor) updateRoundStateRoutine(ctx context.Context) {
|
||||
t := time.NewTicker(100 * time.Microsecond)
|
||||
defer t.Stop()
|
||||
for range t.C {
|
||||
if !r.IsRunning() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
rs := r.state.GetRoundState()
|
||||
r.mtx.Lock()
|
||||
r.rs = rs
|
||||
r.mtx.Unlock()
|
||||
}
|
||||
rs := r.state.GetRoundState()
|
||||
r.mtx.Lock()
|
||||
r.rs = rs
|
||||
r.mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -839,16 +842,6 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh
|
||||
// TODO create more reliable copies of these
|
||||
// structures so the following go routines don't race
|
||||
rs := r.getRoundState()
|
||||
if rs.Votes == nil {
|
||||
// if we have gotten here, we've connected to
|
||||
// a peer before the state of the reactor has
|
||||
// updated to the current round, so we should
|
||||
// sleep for a while before we attempt to
|
||||
// start gossiping the data that doesn't exist
|
||||
// yet. This prevents a panic.
|
||||
timer.Reset(r.state.config.PeerQueryMaj23SleepDuration)
|
||||
continue
|
||||
}
|
||||
prs := ps.GetRoundState()
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
Reference in New Issue
Block a user