consensus: change lock handling in 'handleMsg' method (#7992)

* change lock handling in consensus state file

* add comment explaining the unlock

* comment fix

* Update consensus/state.go

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>

* spelling fix

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
This commit is contained in:
William Banfield
2022-03-09 12:57:55 -05:00
committed by GitHub
parent 186e0e4df2
commit fd3bfb38e7

View File

@@ -800,9 +800,6 @@ func (cs *State) receiveRoutine(maxSteps int) {
// state transitions on complete-proposal, 2/3-any, 2/3-one // state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *State) handleMsg(mi msgInfo) { func (cs *State) handleMsg(mi msgInfo) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
var ( var (
added bool added bool
err error err error
@@ -814,11 +811,32 @@ func (cs *State) handleMsg(mi msgInfo) {
case *ProposalMessage: case *ProposalMessage:
// will not cause transition. // will not cause transition.
// once proposal is set, we can receive block parts // once proposal is set, we can receive block parts
cs.mtx.Lock()
err = cs.setProposal(msg.Proposal) err = cs.setProposal(msg.Proposal)
cs.mtx.Unlock()
case *BlockPartMessage: case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
cs.mtx.Lock()
added, err = cs.addProposalBlockPart(msg, peerID) added, err = cs.addProposalBlockPart(msg, peerID)
// We unlock here to yield to any routines that need to read the the RoundState.
// Previously, this code held the lock from the point at which the final block
// part was recieved until the block executed against the application.
// This prevented the reactor from being able to retrieve the most updated
// version of the RoundState. The reactor needs the updated RoundState to
// gossip the now completed block.
//
// This code can be further improved by either always operating on a copy
// of RoundState and only locking when switching out State's copy of
// RoundState with the updated copy or by emitting RoundState events in
// more places for routines depending on it to listen for.
cs.mtx.Unlock()
cs.mtx.Lock()
if added && cs.ProposalBlockParts.IsComplete() {
cs.handleCompleteProposal(msg.Height)
}
if added { if added {
cs.statsMsgQueue <- mi cs.statsMsgQueue <- mi
} }
@@ -832,14 +850,17 @@ func (cs *State) handleMsg(mi msgInfo) {
) )
err = nil err = nil
} }
cs.mtx.Unlock()
case *VoteMessage: case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature // attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition // if the vote gives us a 2/3-any or 2/3-one, we transition
cs.mtx.Lock()
added, err = cs.tryAddVote(msg.Vote, peerID) added, err = cs.tryAddVote(msg.Vote, peerID)
if added { if added {
cs.statsMsgQueue <- mi cs.statsMsgQueue <- mi
} }
cs.mtx.Unlock()
// if err == ErrAddingVote { // if err == ErrAddingVote {
// TODO: punish peer // TODO: punish peer
@@ -1888,44 +1909,43 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil { if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
cs.Logger.Error("failed publishing event complete proposal", "err", err) cs.Logger.Error("failed publishing event complete proposal", "err", err)
} }
}
return added, nil
}
// Update Valid* if we can. func (cs *State) handleCompleteProposal(blockHeight int64) {
prevotes := cs.Votes.Prevotes(cs.Round) // Update Valid* if we can.
blockID, hasTwoThirds := prevotes.TwoThirdsMajority() prevotes := cs.Votes.Prevotes(cs.Round)
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) { blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if cs.ProposalBlock.HashesTo(blockID.Hash) { if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
cs.Logger.Debug( if cs.ProposalBlock.HashesTo(blockID.Hash) {
"updating valid block to new proposal block", cs.Logger.Debug(
"valid_round", cs.Round, "updating valid block to new proposal block",
"valid_block_hash", cs.ProposalBlock.Hash(), "valid_round", cs.Round,
) "valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
} }
// TODO: In case there is +2/3 majority in Prevotes set for some
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { // block and cs.ProposalBlock contains different block, either
// Move onto the next step // proposer is faulty or voting power of faulty processes is more
cs.enterPrevote(height, cs.Round) // than 1/3. We should trigger in the future accountability
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added // procedure at this point.
cs.enterPrecommit(height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
}
return added, nil
} }
return added, nil if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(blockHeight, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(blockHeight, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(blockHeight)
}
} }
// Attempt to add the vote. if its a duplicate signature, dupeout the validator // Attempt to add the vote. if its a duplicate signature, dupeout the validator