diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 214b41430..fe4488b38 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -787,7 +787,7 @@ OUTER_LOOP: if prs.Height != 0 && rs.Height == prs.Height+1 { if r.pickSendVote(ps, rs.LastCommit) { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) - continue OUTER_LOOP + continue } } @@ -799,7 +799,7 @@ OUTER_LOOP: if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil { if r.pickSendVote(ps, commit) { logger.Debug("picked Catchup commit to send", "height", prs.Height) - continue OUTER_LOOP + continue } } } @@ -826,9 +826,11 @@ OUTER_LOOP: // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. func (r *Reactor) queryMaj23Routine(ps *PeerState) { -OUTER_LOOP: + timer := time.NewTimer(0) + defer timer.Stop() + for { - if !r.IsRunning() { + if !ps.IsRunning() { return } @@ -839,16 +841,26 @@ OUTER_LOOP: // The peer is marked for removal via a PeerUpdate as the doneCh was // explicitly closed to signal we should exit. return - - default: + case <-timer.C: } - // maybe send Height/Round/Prevotes - { - rs := r.state.GetRoundState() - prs := ps.GetRoundState() + if !ps.IsRunning() { + return + } - if rs.Height == prs.Height { + rs := r.state.GetRoundState() + prs := ps.GetRoundState() + // TODO create more reliable coppies of these + // structures so the following go routines don't race + + wg := &sync.WaitGroup{} + + if rs.Height == prs.Height { + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() + + // maybe send Height/Round/Prevotes if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { select { case <-ps.closer.Done(): @@ -864,102 +876,76 @@ OUTER_LOOP: BlockID: maj23.ToProto(), }, }: - } - - time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) } + }(rs, prs) + + if prs.ProposalPOLRound >= 0 { + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() + + // maybe send Height/Round/ProposalPOL + if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { + select { + case <-ps.closer.Done(): + return + case <-r.closeCh: + return + case r.stateCh.Out <- p2p.Envelope{ + To: ps.peerID, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: prs.ProposalPOLRound, + Type: tmproto.PrevoteType, + BlockID: maj23.ToProto(), + }, + }: + } + } + }(rs, prs) + + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() + + // maybe send Height/Round/Precommits + if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { + select { + case <-ps.closer.Done(): + return + case <-r.closeCh: + return + case r.stateCh.Out <- p2p.Envelope{ + To: ps.peerID, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: prs.Round, + Type: tmproto.PrecommitType, + BlockID: maj23.ToProto(), + }, + }: + } + + time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) + } + }(rs, prs) + } + + waitSignal := make(chan struct{}) + go func() { defer close(waitSignal); wg.Wait() }() + + select { + case <-r.closeCh: + return + case <-ps.closer.Done(): + // The peer is marked for removal via a PeerUpdate as the doneCh was + // explicitly closed to signal we should exit. + return + case <-waitSignal: + timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) } } - - // maybe send Height/Round/Precommits - { - rs := r.state.GetRoundState() - prs := ps.GetRoundState() - - if rs.Height == prs.Height { - if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - select { - case <-ps.closer.Done(): - return - case <-r.closeCh: - return - case r.stateCh.Out <- p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.VoteSetMaj23{ - Height: prs.Height, - Round: prs.Round, - Type: tmproto.PrecommitType, - BlockID: maj23.ToProto(), - }, - }: - } - - time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) - } - } - } - - // maybe send Height/Round/ProposalPOL - { - rs := r.state.GetRoundState() - prs := ps.GetRoundState() - - if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { - if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - select { - case <-ps.closer.Done(): - return - case <-r.closeCh: - return - case r.stateCh.Out <- p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.VoteSetMaj23{ - Height: prs.Height, - Round: prs.ProposalPOLRound, - Type: tmproto.PrevoteType, - BlockID: maj23.ToProto(), - }, - }: - } - time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) - } - } - } - - // Little point sending LastCommitRound/LastCommit, these are fleeting and - // non-blocking. - - // maybe send Height/CatchupCommitRound/CatchupCommit - { - prs := ps.GetRoundState() - - if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= r.state.blockStore.Height() && - prs.Height >= r.state.blockStore.Base() { - if commit := r.state.LoadCommit(prs.Height); commit != nil { - select { - case <-ps.closer.Done(): - return - case <-r.closeCh: - return - case r.stateCh.Out <- p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.VoteSetMaj23{ - Height: prs.Height, - Round: commit.Round, - Type: tmproto.PrecommitType, - BlockID: commit.BlockID.ToProto(), - }, - }: - } - - time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) - } - } - } - - time.Sleep(r.state.config.PeerQueryMaj23SleepDuration) - continue OUTER_LOOP } }