public interface; max steps; move updateToState

This commit is contained in:
Ethan Buchman
2015-12-12 01:28:33 -05:00
parent c3f880e758
commit 736bc1f02f

View File

@@ -153,16 +153,17 @@ func (rs *RoundState) StringShort() string {
//-----------------------------------------------------------------------------
var (
msgQueueSize = 1000
msgQueueSize = 1000
tickTockBufferSize = 10
)
// msgs from the reactor which update the state
// msgs from the reactor which may update the state
type msgInfo struct {
msg ConsensusMessage
peerKey string
}
// internally generated messages which update the state
// internally generated messages which may update the state
type timeoutInfo struct {
duration time.Duration
height int
@@ -198,6 +199,8 @@ type ConsensusState struct {
evsw events.Fireable
evc *events.EventCache // set in stageBlock and passed into state
nSteps int // used for testing to limit the number of transitions the state makes
}
func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
@@ -209,8 +212,8 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: new(time.Ticker),
tickChan: make(chan timeoutInfo),
tockChan: make(chan timeoutInfo),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
cs.updateToState(state)
// Don't call scheduleRound0 yet.
@@ -220,6 +223,153 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore
return cs
}
//----------------------------------------
// Public interface
// implements events.Eventable
func (cs *ConsensusState) SetFireable(evsw events.Fireable) {
cs.evsw = evsw
}
func (cs *ConsensusState) String() string {
return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step)
}
func (cs *ConsensusState) GetState() *sm.State {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.state.Copy()
}
func (cs *ConsensusState) GetRoundState() *RoundState {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.getRoundState()
}
func (cs *ConsensusState) getRoundState() *RoundState {
rs := cs.RoundState // copy
return &rs
}
func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.privValidator = priv
}
func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh
}
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
// first we start the round (no go routines)
// then we start the timeout and receive routines.
// buffered channels means scheduleRound0 will finish. Once it does,
// all further access to the RoundState is through the receiveRoutine
cs.scheduleRound0(cs.Height)
cs.startRoutines(0) // start timeout and receive
return nil
}
func (cs *ConsensusState) startRoutines(maxSteps int) {
go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan
go cs.receiveRoutine(maxSteps) // serializes processing of proposoals, block parts, votes, and coordinates state transitions
}
func (cs *ConsensusState) OnStop() {
cs.QuitService.OnStop()
}
/*
The following three functions can be used to send messages into the consensus state
which may cause a state transition
*/
// May block on send if queue is full.
func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) {
if peerKey == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey}
}
// TODO: wait for event?!
return false, nil, nil
}
func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) error {
if peerKey == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerKey}
}
// TODO: wait for event?!
return nil
}
func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Part, peerKey string) error {
if peerKey == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerKey}
}
// TODO: wait for event?!
return nil
}
func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerKey string) error {
cs.SetProposal(proposal, peerKey)
for i := 0; i < parts.Total(); i++ {
part := parts.GetPart(i)
cs.AddProposalBlockPart(cs.Height, cs.Round, part, peerKey)
}
return nil // TODO errors
}
//----------------------------------------------
// internal functions for managing the state
func (cs *ConsensusState) updateHeight(height int) {
cs.Height = height
}
func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
cs.Round = round
cs.Step = step
}
// EnterNewRound(height, 0) at cs.StartTime.
func (cs *ConsensusState) scheduleRound0(height int) {
//log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime)
sleepDuration := cs.StartTime.Sub(time.Now())
cs.scheduleTimeout(sleepDuration, height, 0, 1)
}
// Attempt to schedule a timeout by sending timeoutInfo on the tickChan.
// The timeoutRoutine is alwaya available to read from tickChan (it won't block).
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) {
cs.tickChan <- timeoutInfo{duration, height, round, step}
}
// send a msg into the receiveRoutine regarding our own proposal, block part, or vote
func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
timeout := time.After(10 * time.Millisecond)
select {
case cs.internalMsgQueue <- mi:
case <-timeout:
log.Debug("Timed out trying to send an internal messge. Launching go-routine")
go func() { cs.internalMsgQueue <- mi }()
}
}
// Reconstruct LastCommit from SeenValidation, which we saved along with the block,
// (which happens even before saving the state)
func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
@@ -243,224 +393,6 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
cs.LastCommit = lastPrecommits
}
func (cs *ConsensusState) GetState() *sm.State {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.state.Copy()
}
func (cs *ConsensusState) GetRoundState() *RoundState {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.getRoundState()
}
func (cs *ConsensusState) getRoundState() *RoundState {
rs := cs.RoundState // copy
return &rs
}
func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh
}
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
cs.startRoutines()
go cs.scheduleRound0(cs.Height)
return nil
}
func (cs *ConsensusState) startRoutines() {
go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan
go cs.receiveRoutine() // serializes processing of proposoals, block parts, votes and coordinates state transitions
}
func (cs *ConsensusState) OnStop() {
// It's asynchronous so, there's not much to stop.
cs.BaseService.OnStop()
}
func (cs *ConsensusState) updateHeight(height int) {
cs.Height = height
}
func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
cs.Round = round
cs.Step = step
}
// EnterNewRound(height, 0) at cs.StartTime.
func (cs *ConsensusState) scheduleRound0(height int) {
//log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime)
sleepDuration := cs.StartTime.Sub(time.Now())
// TODO: this go-routine ...
go func() {
// should we use the timeoutRoutine?
// we don't really need an event because we get one in NewRound
if 0 < sleepDuration {
time.Sleep(sleepDuration)
}
cs.EnterNewRound(height, 0)
}()
}
// Attempt to schedule a timeout by sending timeoutInfo on the tickChan.
// The timeoutRoutine is alwaya available to read from tickChan (it won't block).
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) {
cs.tickChan <- timeoutInfo{duration, height, round, step}
}
// the state machine sends on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-cs.tickChan:
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.height < ti.height {
continue
} else if newti.height == ti.height {
if newti.round < ti.round {
continue
} else if newti.round == ti.round {
if ti.step > 0 && newti.step <= ti.step {
continue
}
}
}
ti = newti
log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
cs.timeoutTicker.Stop()
cs.timeoutTicker = time.NewTicker(ti.duration)
case <-cs.timeoutTicker.C:
log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func() { cs.tockChan <- ti }()
case <-cs.Quit:
return
}
}
}
func (cs *ConsensusState) stopTimer() {
cs.timeoutTicker.Stop()
}
func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
timeout := time.After(10 * time.Millisecond)
select {
case cs.internalMsgQueue <- mi:
case <-timeout:
log.Debug("Timed out trying to send an internal messge. Launching go-routine")
go func() { cs.internalMsgQueue <- mi }()
}
}
// receiveRoutine handles messages which may cause state transitions.
// It keeps the RoundState and is the only thing that updates it.
// Updates happen on timeouts, complete proposals, and 2/3 majorities
func (cs *ConsensusState) receiveRoutine() {
for {
rs := cs.RoundState
var mi msgInfo
select {
case mi = <-cs.peerMsgQueue:
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi, rs)
case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit:
return
}
}
}
// state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
var err error
msg, peerKey := mi.msg, mi.peerKey
switch msg := msg.(type) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.SetProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit
// if we're the only validator, the EnterPrevote may take us through to the next round
_, err = cs.AddProposalBlockPart(msg.Height, msg.Part)
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
if err == ErrAddingVote {
// TODO: punish peer
}
if added {
// If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
// XXX TODO: do this
// conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex)
}
default:
log.Warn("Unknown msg type", reflect.TypeOf(msg))
}
if err != nil {
log.Error("error with msg", "error", err)
}
}
func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
// timeouts must be for current height, round, step
if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) {
log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
return
}
// the timeout will now cause a state transition
switch ti.step {
case RoundStepPropose:
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
cs.EnterPrevote(ti.height, ti.round)
case RoundStepPrevoteWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.EnterPrecommit(ti.height, ti.round)
case RoundStepPrecommitWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.EnterNewRound(ti.height, ti.round+1)
default:
panic(Fmt("Invalid timeout step: %v", ti.step))
}
}
// Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *sm.State) {
@@ -525,16 +457,190 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
cs.stagedState = nil
// Finally, broadcast RoundState
cs.newStep()
}
func (cs *ConsensusState) newStep() {
cs.nSteps += 1
cs.newStepCh <- cs.getRoundState()
}
func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
//-----------------------------------------
// the main go routines
// the state machine sends on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-cs.tickChan:
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.height < ti.height {
continue
} else if newti.height == ti.height {
if newti.round < ti.round {
continue
} else if newti.round == ti.round {
if ti.step > 0 && newti.step <= ti.step {
continue
}
}
}
ti = newti
// if the newti has duration == 0, we relay to the tockChan immediately (no timeout)
if ti.duration == time.Duration(0) {
go func(t timeoutInfo) { cs.tockChan <- t }(ti)
continue
}
log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
cs.timeoutTicker.Stop()
cs.timeoutTicker = time.NewTicker(ti.duration)
case <-cs.timeoutTicker.C:
log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(t timeoutInfo) { cs.tockChan <- t }(ti)
case <-cs.Quit:
return
}
}
}
// a nice idea but probably more trouble than its worth
func (cs *ConsensusState) stopTimer() {
cs.timeoutTicker.Stop()
}
// receiveRoutine handles messages which may cause state transitions.
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates happen on timeouts, complete proposals, and 2/3 majorities
func (cs *ConsensusState) receiveRoutine(maxSteps int) {
for {
if maxSteps > 0 {
if cs.nSteps >= maxSteps {
log.Warn("reached max steps. exiting receive routine")
cs.nSteps = 0
return
}
}
rs := cs.RoundState
var mi msgInfo
select {
case mi = <-cs.peerMsgQueue:
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi, rs)
case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit:
return
}
}
}
// state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.privValidator = priv
var err error
msg, peerKey := mi.msg, mi.peerKey
switch msg := msg.(type) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.setProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit
// if we're the only validator, the EnterPrevote may take us through to the next round
_, err = cs.addProposalBlockPart(msg.Height, msg.Part)
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
if err == ErrAddingVote {
// TODO: punish peer
}
if added {
// If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
// XXX TODO: do this
// conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex)
}
default:
log.Warn("Unknown msg type", reflect.TypeOf(msg))
}
if err != nil {
log.Error("error with msg", "error", err)
}
}
func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
// if this is a timeout for the new height
if ti.height == rs.Height+1 && ti.round == 0 && ti.step == 1 {
cs.mtx.Lock()
// Increment height.
cs.updateToState(cs.stagedState)
// event fired from EnterNewRound after some updates
cs.EnterNewRound(ti.height, 0)
cs.mtx.Unlock()
return
}
// timeouts must be for current height, round, step
if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) {
log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
return
}
// the timeout will now cause a state transition
cs.mtx.Lock()
defer cs.mtx.Unlock()
switch ti.step {
case RoundStepPropose:
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
cs.EnterPrevote(ti.height, ti.round)
case RoundStepPrevoteWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.EnterPrecommit(ti.height, ti.round)
case RoundStepPrecommitWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.EnterNewRound(ti.height, ti.round+1)
default:
panic(Fmt("Invalid timeout step: %v", ti.step))
}
}
//-----------------------------------------------------------------------------
// State functions
// Many of these functions are capitalized but are not really meant to be used
// by external code as it will cause race conditions with running timeout/receiveRoutine.
// Use AddVote, SetProposal, AddProposalBlockPart instead
// Enter: +2/3 precommits for nil at (height,round-1)
// Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1)
@@ -549,7 +655,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) {
if now := time.Now(); cs.StartTime.After(now) {
log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now)
}
cs.stopTimer()
// cs.stopTimer()
log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
@@ -608,7 +714,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) {
// Done EnterPropose:
cs.updateRoundStep(round, RoundStepPropose)
cs.newStepCh <- cs.getRoundState()
cs.newStep()
// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
@@ -638,18 +744,21 @@ func (cs *ConsensusState) decideProposal(height, round int) {
proposal := types.NewProposal(height, round, blockParts.Header(), cs.Votes.POLRound())
err := cs.privValidator.SignProposal(cs.state.ChainID, proposal)
if err == nil {
log.Notice("Signed and set proposal", "height", height, "round", round, "proposal", proposal)
log.Debug(Fmt("Signed and set proposal block: %v", block))
// Set fields
/* fields set by setProposal and addBlockPart
cs.Proposal = proposal
cs.ProposalBlock = block
cs.ProposalBlockParts = blockParts
*/
// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
}
log.Notice("Signed and sent proposal", "height", height, "round", round, "proposal", proposal)
log.Debug(Fmt("Signed and sent proposal block: %v", block))
} else {
log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err)
}
@@ -741,7 +850,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) {
// TODO: catchup event?
}
cs.stopTimer()
// cs.stopTimer()
log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
@@ -754,7 +863,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) {
cs.doPrevote(height, round)
// Done EnterPrevote:
cs.newStepCh <- cs.getRoundState()
cs.newStep()
// Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait
// (so we have more time to try and collect +2/3 prevotes for a single block)
@@ -806,7 +915,7 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
// Done EnterPrevoteWait:
cs.updateRoundStep(round, RoundStepPrevoteWait)
cs.newStepCh <- cs.getRoundState()
cs.newStep()
// After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit()
cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait)
@@ -826,14 +935,14 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) {
return
}
cs.stopTimer()
// cs.stopTimer()
log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
cs.updateRoundStep(round, RoundStepPrecommit)
defer func() {
// Done EnterPrecommit:
cs.newStepCh <- cs.getRoundState()
cs.newStep()
}()
hash, partsHeader, ok := cs.Votes.Prevotes(round).TwoThirdsMajority()
@@ -929,7 +1038,7 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) {
// Done EnterPrecommitWait:
cs.updateRoundStep(round, RoundStepPrecommitWait)
cs.newStepCh <- cs.getRoundState()
cs.newStep()
// After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound()
cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait)
@@ -951,7 +1060,7 @@ func (cs *ConsensusState) EnterCommit(height int, commitRound int) {
// keep ca.Round the same, it points to the right Precommits set.
cs.Step = RoundStepCommit
cs.CommitRound = commitRound
cs.newStepCh <- cs.getRoundState()
cs.newStep()
// Maybe finalize immediately.
cs.tryFinalizeCommit(height)
@@ -1030,11 +1139,11 @@ func (cs *ConsensusState) FinalizeCommit(height int) {
log.Info(Fmt("Finalizing commit of block: %v", cs.ProposalBlock))
// We have the block, so stage/save/commit-vote.
cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound))
// Increment height.
cs.updateToState(cs.stagedState)
// call updateToState from handleTimeout
// cs.StartTime is already set.
// Schedule Round0 to start soon.
// go
cs.scheduleRound0(height + 1)
// By here,
@@ -1046,7 +1155,7 @@ func (cs *ConsensusState) FinalizeCommit(height int) {
//-----------------------------------------------------------------------------
func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error {
func (cs *ConsensusState) setProposal(proposal *types.Proposal) error {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
@@ -1083,7 +1192,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error {
// NOTE: block is not necessarily valid.
// This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit
func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (added bool, err error) {
func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
@@ -1148,34 +1257,11 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str
return added, nil
}
/*
// Interface to the state machine from external go routines.
// May block on send if queue is full.
// How do we get added/address/error back?
func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) {
if peerKey == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey}
}
// TODO: wait for event?!
}
*/
//-----------------------------------------------------------------------------
func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) {
log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height)
defer func() {
if added {
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
}
}()
// A precommit for the previous height?
if vote.Height+1 == cs.Height {
if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
@@ -1186,6 +1272,8 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
if added {
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
}
return
}
@@ -1195,6 +1283,8 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
height := cs.Height
added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey)
if added {
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
switch vote.Type {
case types.VoteTypePrevote:
prevotes := cs.Votes.Prevotes(vote.Round)
@@ -1307,6 +1397,7 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet
return vote, err
}
// signs the vote, publishes on internalMsgQueue
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote {
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) {
return nil
@@ -1360,14 +1451,7 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe
}
// implements events.Eventable
func (cs *ConsensusState) SetFireable(evsw events.Fireable) {
cs.evsw = evsw
}
func (cs *ConsensusState) String() string {
return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step)
}
//---------------------------------------------------------
func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int {
if h1 < h2 {