Compare commits

...

11 Commits

Author SHA1 Message Date
William Banfield
66f32ba9fc Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-18 12:27:36 -04:00
William Banfield
352fa60814 Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-18 12:19:13 -04:00
William Banfield
bb943f00f5 Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-17 16:53:47 -04:00
William Banfield
52ba4dfb36 handle panics 2022-03-17 16:51:47 -04:00
William Banfield
9bfc890e19 Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-17 16:02:52 -04:00
William Banfield
555c4b3247 update GetRoundState 2022-03-17 15:35:43 -04:00
William Banfield
aaed0f681e Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-17 15:07:42 -04:00
William Banfield
9fef5a3aa0 lint++ 2022-03-17 14:46:05 -04:00
William Banfield
14873ac5d5 Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-17 14:39:15 -04:00
William Banfield
f7df45257b Merge branch 'master' into wb/epoch-fixes-forward-port-master 2022-03-17 10:47:31 -04:00
William Banfield
f553f4d747 consensus: change lock handling in and handleMsg reactor for RoundState 2022-03-16 14:25:02 -04:00
2 changed files with 79 additions and 40 deletions

View File

@@ -119,6 +119,7 @@ type Reactor struct {
mtx sync.RWMutex
peers map[types.NodeID]*PeerState
waitSync bool
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel
@@ -166,6 +167,7 @@ func NewReactor(
logger: logger,
state: cs,
waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus,
Metrics: metrics,
@@ -199,6 +201,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
go r.peerStatsRoutine(ctx)
r.subscribeToBroadcastEvents()
go r.updateRoundStateRoutine()
if !r.WaitSync() {
if err := r.state.Start(ctx); err != nil {
@@ -407,10 +410,30 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error {
return r.stateCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: makeRoundStepMessage(r.state.GetRoundState()),
Message: makeRoundStepMessage(r.getRoundState()),
})
}
func (r *Reactor) updateRoundStateRoutine() {
t := time.NewTicker(100 * time.Microsecond)
defer t.Stop()
for range t.C {
if !r.IsRunning() {
return
}
rs := r.state.GetRoundState()
r.mtx.Lock()
r.rs = rs
r.mtx.Unlock()
}
}
func (r *Reactor) getRoundState() *cstypes.RoundState {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.rs
}
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
@@ -493,7 +516,7 @@ OUTER_LOOP:
default:
}
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState()
// Send proposal Block parts?
@@ -751,7 +774,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
default:
}
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState()
switch logThrottle {
@@ -842,7 +865,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
return
}
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState()
// TODO create more reliable coppies of these
// structures so the following go routines don't race

View File

@@ -936,7 +936,6 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) {
func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
var (
added bool
err error
@@ -953,6 +952,24 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(ctx, msg, peerID)
// We unlock here to yield to any routines that need to read the the RoundState.
// Previously, this code held the lock from the point at which the final block
// part was received until the block executed against the application.
// This prevented the reactor from being able to retrieve the most updated
// version of the RoundState. The reactor needs the updated RoundState to
// gossip the now completed block.
//
// This code can be further improved by either always operating on a copy
// of RoundState and only locking when switching out State's copy of
// RoundState with the updated copy or by emitting RoundState events in
// more places for routines depending on it to listen for.
cs.mtx.Unlock()
cs.mtx.Lock()
if added && cs.ProposalBlockParts.IsComplete() {
cs.handleCompleteProposal(ctx, msg.Height)
}
if added {
select {
case cs.statsMsgQueue <- mi:
@@ -2155,45 +2172,44 @@ func (cs *State) addProposalBlockPart(
if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil {
cs.logger.Error("failed publishing event complete proposal", "err", err)
}
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(ctx, height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(ctx, height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(ctx, height)
}
return added, nil
}
return added, nil
}
func (cs *State) handleCompleteProposal(ctx context.Context, height int64) {
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(ctx, height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(ctx, height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(ctx, height)
}
}
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *State) tryAddVote(ctx context.Context, vote *types.Vote, peerID types.NodeID) (bool, error) {