consensus: refactor operations in consensus queryMaj23Routine (backport #7791) (#7793)

This commit is contained in:
mergify[bot]
2022-02-10 15:20:51 -05:00
committed by GitHub
parent caddddcb3a
commit 3f3b9e93b1

View File

@@ -787,7 +787,7 @@ OUTER_LOOP:
if prs.Height != 0 && rs.Height == prs.Height+1 {
if r.pickSendVote(ps, rs.LastCommit) {
logger.Debug("picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
continue
}
}
@@ -799,7 +799,7 @@ OUTER_LOOP:
if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if r.pickSendVote(ps, commit) {
logger.Debug("picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
continue
}
}
}
@@ -826,9 +826,11 @@ OUTER_LOOP:
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
func (r *Reactor) queryMaj23Routine(ps *PeerState) {
OUTER_LOOP:
timer := time.NewTimer(0)
defer timer.Stop()
for {
if !r.IsRunning() {
if !ps.IsRunning() {
return
}
@@ -839,16 +841,26 @@ OUTER_LOOP:
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
default:
case <-timer.C:
}
// maybe send Height/Round/Prevotes
{
rs := r.state.GetRoundState()
prs := ps.GetRoundState()
if !ps.IsRunning() {
return
}
if rs.Height == prs.Height {
rs := r.state.GetRoundState()
prs := ps.GetRoundState()
// TODO create more reliable coppies of these
// structures so the following go routines don't race
wg := &sync.WaitGroup{}
if rs.Height == prs.Height {
wg.Add(1)
go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) {
defer wg.Done()
// maybe send Height/Round/Prevotes
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
select {
case <-ps.closer.Done():
@@ -864,102 +876,76 @@ OUTER_LOOP:
BlockID: maj23.ToProto(),
},
}:
}
time.Sleep(r.state.config.PeerQueryMaj23SleepDuration)
}
}(rs, prs)
if prs.ProposalPOLRound >= 0 {
wg.Add(1)
go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) {
defer wg.Done()
// maybe send Height/Round/ProposalPOL
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
select {
case <-ps.closer.Done():
return
case <-r.closeCh:
return
case r.stateCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
}:
}
}
}(rs, prs)
wg.Add(1)
go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) {
defer wg.Done()
// maybe send Height/Round/Precommits
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
select {
case <-ps.closer.Done():
return
case <-r.closeCh:
return
case r.stateCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23.ToProto(),
},
}:
}
time.Sleep(r.state.config.PeerQueryMaj23SleepDuration)
}
}(rs, prs)
}
waitSignal := make(chan struct{})
go func() { defer close(waitSignal); wg.Wait() }()
select {
case <-r.closeCh:
return
case <-ps.closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-waitSignal:
timer.Reset(r.state.config.PeerQueryMaj23SleepDuration)
}
}
// maybe send Height/Round/Precommits
{
rs := r.state.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
select {
case <-ps.closer.Done():
return
case <-r.closeCh:
return
case r.stateCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23.ToProto(),
},
}:
}
time.Sleep(r.state.config.PeerQueryMaj23SleepDuration)
}
}
}
// maybe send Height/Round/ProposalPOL
{
rs := r.state.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
select {
case <-ps.closer.Done():
return
case <-r.closeCh:
return
case r.stateCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
}:
}
time.Sleep(r.state.config.PeerQueryMaj23SleepDuration)
}
}
}
// Little point sending LastCommitRound/LastCommit, these are fleeting and
// non-blocking.
// maybe send Height/CatchupCommitRound/CatchupCommit
{
prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= r.state.blockStore.Height() &&
prs.Height >= r.state.blockStore.Base() {
if commit := r.state.LoadCommit(prs.Height); commit != nil {
select {
case <-ps.closer.Done():
return
case <-r.closeCh:
return
case r.stateCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID.ToProto(),
},
}:
}
time.Sleep(r.state.config.PeerQueryMaj23SleepDuration)
}
}
}
time.Sleep(r.state.config.PeerQueryMaj23SleepDuration)
continue OUTER_LOOP
}
}