From fd3bfb38e7802f8916c4f3aa9d35b4b7a7e0249f Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Wed, 9 Mar 2022 12:57:55 -0500 Subject: [PATCH] consensus: change lock handling in 'handleMsg' method (#7992) * change lock handling in consensus state file * add comment explaining the unlock * comment fix * Update consensus/state.go Co-authored-by: M. J. Fromberger * spelling fix Co-authored-by: M. J. Fromberger --- consensus/state.go | 92 ++++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 36 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 7289a8375..26b19cf76 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -800,9 +800,6 @@ func (cs *State) receiveRoutine(maxSteps int) { // state transitions on complete-proposal, 2/3-any, 2/3-one func (cs *State) handleMsg(mi msgInfo) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - var ( added bool err error @@ -814,11 +811,32 @@ func (cs *State) handleMsg(mi msgInfo) { case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts + cs.mtx.Lock() err = cs.setProposal(msg.Proposal) + cs.mtx.Unlock() case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit + cs.mtx.Lock() added, err = cs.addProposalBlockPart(msg, peerID) + + // We unlock here to yield to any routines that need to read the the RoundState. + // Previously, this code held the lock from the point at which the final block + // part was recieved until the block executed against the application. + // This prevented the reactor from being able to retrieve the most updated + // version of the RoundState. The reactor needs the updated RoundState to + // gossip the now completed block. + // + // This code can be further improved by either always operating on a copy + // of RoundState and only locking when switching out State's copy of + // RoundState with the updated copy or by emitting RoundState events in + // more places for routines depending on it to listen for. + cs.mtx.Unlock() + + cs.mtx.Lock() + if added && cs.ProposalBlockParts.IsComplete() { + cs.handleCompleteProposal(msg.Height) + } if added { cs.statsMsgQueue <- mi } @@ -832,14 +850,17 @@ func (cs *State) handleMsg(mi msgInfo) { ) err = nil } + cs.mtx.Unlock() case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition + cs.mtx.Lock() added, err = cs.tryAddVote(msg.Vote, peerID) if added { cs.statsMsgQueue <- mi } + cs.mtx.Unlock() // if err == ErrAddingVote { // TODO: punish peer @@ -1888,44 +1909,43 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil { cs.Logger.Error("failed publishing event complete proposal", "err", err) } + } + return added, nil +} - // Update Valid* if we can. - prevotes := cs.Votes.Prevotes(cs.Round) - blockID, hasTwoThirds := prevotes.TwoThirdsMajority() - if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) { - if cs.ProposalBlock.HashesTo(blockID.Hash) { - cs.Logger.Debug( - "updating valid block to new proposal block", - "valid_round", cs.Round, - "valid_block_hash", cs.ProposalBlock.Hash(), - ) +func (cs *State) handleCompleteProposal(blockHeight int64) { + // Update Valid* if we can. + prevotes := cs.Votes.Prevotes(cs.Round) + blockID, hasTwoThirds := prevotes.TwoThirdsMajority() + if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) { + if cs.ProposalBlock.HashesTo(blockID.Hash) { + cs.Logger.Debug( + "updating valid block to new proposal block", + "valid_round", cs.Round, + "valid_block_hash", cs.ProposalBlock.Hash(), + ) - cs.ValidRound = cs.Round - cs.ValidBlock = cs.ProposalBlock - cs.ValidBlockParts = cs.ProposalBlockParts - } - // TODO: In case there is +2/3 majority in Prevotes set for some - // block and cs.ProposalBlock contains different block, either - // proposer is faulty or voting power of faulty processes is more - // than 1/3. We should trigger in the future accountability - // procedure at this point. + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts } - - if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { - // Move onto the next step - cs.enterPrevote(height, cs.Round) - if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added - cs.enterPrecommit(height, cs.Round) - } - } else if cs.Step == cstypes.RoundStepCommit { - // If we're waiting on the proposal block... - cs.tryFinalizeCommit(height) - } - - return added, nil + // TODO: In case there is +2/3 majority in Prevotes set for some + // block and cs.ProposalBlock contains different block, either + // proposer is faulty or voting power of faulty processes is more + // than 1/3. We should trigger in the future accountability + // procedure at this point. } - return added, nil + if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { + // Move onto the next step + cs.enterPrevote(blockHeight, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(blockHeight, cs.Round) + } + } else if cs.Step == cstypes.RoundStepCommit { + // If we're waiting on the proposal block... + cs.tryFinalizeCommit(blockHeight) + } } // Attempt to add the vote. if its a duplicate signature, dupeout the validator