Compare commits

...

1 Commits

Author SHA1 Message Date
William Banfield
df20641317 change lock handling in consensus reactor 2022-02-24 16:26:28 -05:00

View File

@@ -46,6 +46,7 @@ type Reactor struct {
mtx tmsync.RWMutex
waitSync bool
eventBus *types.EventBus
rs *cstypes.RoundState
Metrics *Metrics
}
@@ -58,6 +59,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
@@ -399,6 +401,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
const subscriber = "consensus-reactor"
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data tmevents.EventData) {
conR.setRoundState(data.(*cstypes.RoundState))
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
@@ -482,11 +485,23 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
}
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, MustEncode(nrsMsg))
}
func (conR *Reactor) setRoundState(rs *cstypes.RoundState) {
conR.mtx.Lock()
defer conR.mtx.Unlock()
conR.rs = rs
}
func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
return conR.rs
}
func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
@@ -496,7 +511,7 @@ OUTER_LOOP:
if !peer.IsRunning() || !conR.IsRunning() {
return
}
rs := conR.conS.GetRoundState()
rs := conR.getRoundState()
prs := ps.GetRoundState()
// Send proposal Block parts?
@@ -639,7 +654,7 @@ OUTER_LOOP:
if !peer.IsRunning() || !conR.IsRunning() {
return
}
rs := conR.conS.GetRoundState()
rs := conR.getRoundState()
prs := ps.GetRoundState()
switch sleeping {
@@ -771,7 +786,7 @@ OUTER_LOOP:
// Maybe send Height/Round/Prevotes
{
rs := conR.conS.GetRoundState()
rs := conR.getRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
@@ -788,7 +803,7 @@ OUTER_LOOP:
// Maybe send Height/Round/Precommits
{
rs := conR.conS.GetRoundState()
rs := conR.getRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
@@ -805,7 +820,7 @@ OUTER_LOOP:
// Maybe send Height/Round/ProposalPOL
{
rs := conR.conS.GetRoundState()
rs := conR.getRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {