From 736bc1f02f862fa18747012c4c4e0c4127966217 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 12 Dec 2015 01:28:33 -0500 Subject: [PATCH] public interface; max steps; move updateToState --- consensus/state.go | 628 +++++++++++++++++++++++++-------------------- 1 file changed, 356 insertions(+), 272 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 05e6920bb..21f7f346f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -153,16 +153,17 @@ func (rs *RoundState) StringShort() string { //----------------------------------------------------------------------------- var ( - msgQueueSize = 1000 + msgQueueSize = 1000 + tickTockBufferSize = 10 ) -// msgs from the reactor which update the state +// msgs from the reactor which may update the state type msgInfo struct { msg ConsensusMessage peerKey string } -// internally generated messages which update the state +// internally generated messages which may update the state type timeoutInfo struct { duration time.Duration height int @@ -198,6 +199,8 @@ type ConsensusState struct { evsw events.Fireable evc *events.EventCache // set in stageBlock and passed into state + + nSteps int // used for testing to limit the number of transitions the state makes } func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { @@ -209,8 +212,8 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: new(time.Ticker), - tickChan: make(chan timeoutInfo), - tockChan: make(chan timeoutInfo), + tickChan: make(chan timeoutInfo, tickTockBufferSize), + tockChan: make(chan timeoutInfo, tickTockBufferSize), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -220,6 +223,153 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore return cs } +//---------------------------------------- +// Public interface + +// implements events.Eventable +func (cs *ConsensusState) SetFireable(evsw events.Fireable) { + cs.evsw = evsw +} + +func (cs *ConsensusState) String() string { + return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) +} + +func (cs *ConsensusState) GetState() *sm.State { + cs.mtx.Lock() + defer cs.mtx.Unlock() + return cs.state.Copy() +} + +func (cs *ConsensusState) GetRoundState() *RoundState { + cs.mtx.Lock() + defer cs.mtx.Unlock() + return cs.getRoundState() +} + +func (cs *ConsensusState) getRoundState() *RoundState { + rs := cs.RoundState // copy + return &rs +} + +func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.privValidator = priv +} + +func (cs *ConsensusState) NewStepCh() chan *RoundState { + return cs.newStepCh +} + +func (cs *ConsensusState) OnStart() error { + cs.BaseService.OnStart() + + // first we start the round (no go routines) + // then we start the timeout and receive routines. + // buffered channels means scheduleRound0 will finish. Once it does, + // all further access to the RoundState is through the receiveRoutine + cs.scheduleRound0(cs.Height) + cs.startRoutines(0) // start timeout and receive + return nil +} + +func (cs *ConsensusState) startRoutines(maxSteps int) { + go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan + go cs.receiveRoutine(maxSteps) // serializes processing of proposoals, block parts, votes, and coordinates state transitions +} + +func (cs *ConsensusState) OnStop() { + cs.QuitService.OnStop() +} + +/* + The following three functions can be used to send messages into the consensus state + which may cause a state transition +*/ + +// May block on send if queue is full. +func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey} + } + + // TODO: wait for event?! + return false, nil, nil +} + +func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) error { + + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerKey} + } + + // TODO: wait for event?! + return nil +} + +func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Part, peerKey string) error { + + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerKey} + } + + // TODO: wait for event?! + return nil +} + +func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerKey string) error { + cs.SetProposal(proposal, peerKey) + for i := 0; i < parts.Total(); i++ { + part := parts.GetPart(i) + cs.AddProposalBlockPart(cs.Height, cs.Round, part, peerKey) + } + return nil // TODO errors +} + +//---------------------------------------------- +// internal functions for managing the state + +func (cs *ConsensusState) updateHeight(height int) { + cs.Height = height +} + +func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { + cs.Round = round + cs.Step = step +} + +// EnterNewRound(height, 0) at cs.StartTime. +func (cs *ConsensusState) scheduleRound0(height int) { + //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) + sleepDuration := cs.StartTime.Sub(time.Now()) + cs.scheduleTimeout(sleepDuration, height, 0, 1) +} + +// Attempt to schedule a timeout by sending timeoutInfo on the tickChan. +// The timeoutRoutine is alwaya available to read from tickChan (it won't block). +// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. +func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) { + cs.tickChan <- timeoutInfo{duration, height, round, step} +} + +// send a msg into the receiveRoutine regarding our own proposal, block part, or vote +func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { + timeout := time.After(10 * time.Millisecond) + select { + case cs.internalMsgQueue <- mi: + case <-timeout: + log.Debug("Timed out trying to send an internal messge. Launching go-routine") + go func() { cs.internalMsgQueue <- mi }() + } +} + // Reconstruct LastCommit from SeenValidation, which we saved along with the block, // (which happens even before saving the state) func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { @@ -243,224 +393,6 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { cs.LastCommit = lastPrecommits } -func (cs *ConsensusState) GetState() *sm.State { - cs.mtx.Lock() - defer cs.mtx.Unlock() - return cs.state.Copy() -} - -func (cs *ConsensusState) GetRoundState() *RoundState { - cs.mtx.Lock() - defer cs.mtx.Unlock() - return cs.getRoundState() -} - -func (cs *ConsensusState) getRoundState() *RoundState { - rs := cs.RoundState // copy - return &rs -} - -func (cs *ConsensusState) NewStepCh() chan *RoundState { - return cs.newStepCh -} - -func (cs *ConsensusState) OnStart() error { - cs.BaseService.OnStart() - - cs.startRoutines() - go cs.scheduleRound0(cs.Height) - return nil -} - -func (cs *ConsensusState) startRoutines() { - go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan - go cs.receiveRoutine() // serializes processing of proposoals, block parts, votes and coordinates state transitions -} - -func (cs *ConsensusState) OnStop() { - // It's asynchronous so, there's not much to stop. - cs.BaseService.OnStop() -} - -func (cs *ConsensusState) updateHeight(height int) { - cs.Height = height -} - -func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { - cs.Round = round - cs.Step = step -} - -// EnterNewRound(height, 0) at cs.StartTime. -func (cs *ConsensusState) scheduleRound0(height int) { - //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) - sleepDuration := cs.StartTime.Sub(time.Now()) - - // TODO: this go-routine ... - go func() { - // should we use the timeoutRoutine? - // we don't really need an event because we get one in NewRound - if 0 < sleepDuration { - time.Sleep(sleepDuration) - } - cs.EnterNewRound(height, 0) - }() - -} - -// Attempt to schedule a timeout by sending timeoutInfo on the tickChan. -// The timeoutRoutine is alwaya available to read from tickChan (it won't block). -// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. -func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) { - cs.tickChan <- timeoutInfo{duration, height, round, step} -} - -// the state machine sends on tickChan to start a new timer. -// timers are interupted and replaced by new ticks from later steps -// timeouts of 0 on the tickChan will be immediately relayed to the tockChan -func (cs *ConsensusState) timeoutRoutine() { - log.Debug("Starting timeout routine") - var ti timeoutInfo - for { - select { - case newti := <-cs.tickChan: - log.Debug("Received tick", "old_ti", ti, "new_ti", newti) - - // ignore tickers for old height/round/step - if newti.height < ti.height { - continue - } else if newti.height == ti.height { - if newti.round < ti.round { - continue - } else if newti.round == ti.round { - if ti.step > 0 && newti.step <= ti.step { - continue - } - } - } - - ti = newti - - log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) - cs.timeoutTicker.Stop() - cs.timeoutTicker = time.NewTicker(ti.duration) - case <-cs.timeoutTicker.C: - log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) - cs.timeoutTicker.Stop() - // go routine here gaurantees timeoutRoutine doesn't block. - // Determinism comes from playback in the receiveRoutine. - // We can eliminate it by merging the timeoutRoutine into receiveRoutine - // and managing the timeouts ourselves with a millisecond ticker - go func() { cs.tockChan <- ti }() - case <-cs.Quit: - return - } - } -} - -func (cs *ConsensusState) stopTimer() { - cs.timeoutTicker.Stop() -} - -func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { - timeout := time.After(10 * time.Millisecond) - select { - case cs.internalMsgQueue <- mi: - case <-timeout: - log.Debug("Timed out trying to send an internal messge. Launching go-routine") - go func() { cs.internalMsgQueue <- mi }() - } -} - -// receiveRoutine handles messages which may cause state transitions. -// It keeps the RoundState and is the only thing that updates it. -// Updates happen on timeouts, complete proposals, and 2/3 majorities -func (cs *ConsensusState) receiveRoutine() { - for { - rs := cs.RoundState - var mi msgInfo - - select { - case mi = <-cs.peerMsgQueue: - // handles proposals, block parts, votes - // may generate internal events (votes, complete proposals, 2/3 majorities) - cs.handleMsg(mi, rs) - case mi = <-cs.internalMsgQueue: - // handles proposals, block parts, votes - cs.handleMsg(mi, rs) - case ti := <-cs.tockChan: - // if the timeout is relevant to the rs - // go to the next step - cs.handleTimeout(ti, rs) - case <-cs.Quit: - return - } - } -} - -// state transitions on complete-proposal, 2/3-any, 2/3-one -func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { - var err error - msg, peerKey := mi.msg, mi.peerKey - switch msg := msg.(type) { - case *ProposalMessage: - // will not cause transition. - // once proposal is set, we can receive block parts - err = cs.SetProposal(msg.Proposal) - case *BlockPartMessage: - // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit - // if we're the only validator, the EnterPrevote may take us through to the next round - _, err = cs.AddProposalBlockPart(msg.Height, msg.Part) - case *VoteMessage: - // attempt to add the vote and dupeout the validator if its a duplicate signature - // if the vote gives us a 2/3-any or 2/3-one, we transition - added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) - if err == ErrAddingVote { - // TODO: punish peer - } - - if added { - // If rs.Height == vote.Height && rs.Round < vote.Round, - // the peer is sending us CatchupCommit precommits. - // We could make note of this and help filter in broadcastHasVoteMessage(). - - // XXX TODO: do this - // conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) - } - default: - log.Warn("Unknown msg type", reflect.TypeOf(msg)) - } - if err != nil { - log.Error("error with msg", "error", err) - } -} - -func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { - log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) - // timeouts must be for current height, round, step - if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { - log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) - return - } - - // the timeout will now cause a state transition - - switch ti.step { - case RoundStepPropose: - cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.EnterPrevote(ti.height, ti.round) - case RoundStepPrevoteWait: - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterPrecommit(ti.height, ti.round) - case RoundStepPrecommitWait: - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterNewRound(ti.height, ti.round+1) - default: - panic(Fmt("Invalid timeout step: %v", ti.step)) - } - -} - // Updates ConsensusState and increments height to match that of state. // The round becomes 0 and cs.Step becomes RoundStepNewHeight. func (cs *ConsensusState) updateToState(state *sm.State) { @@ -525,16 +457,190 @@ func (cs *ConsensusState) updateToState(state *sm.State) { cs.stagedState = nil // Finally, broadcast RoundState + cs.newStep() +} + +func (cs *ConsensusState) newStep() { + cs.nSteps += 1 cs.newStepCh <- cs.getRoundState() } -func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { +//----------------------------------------- +// the main go routines + +// the state machine sends on tickChan to start a new timer. +// timers are interupted and replaced by new ticks from later steps +// timeouts of 0 on the tickChan will be immediately relayed to the tockChan +func (cs *ConsensusState) timeoutRoutine() { + log.Debug("Starting timeout routine") + var ti timeoutInfo + for { + select { + case newti := <-cs.tickChan: + log.Debug("Received tick", "old_ti", ti, "new_ti", newti) + + // ignore tickers for old height/round/step + if newti.height < ti.height { + continue + } else if newti.height == ti.height { + if newti.round < ti.round { + continue + } else if newti.round == ti.round { + if ti.step > 0 && newti.step <= ti.step { + continue + } + } + } + + ti = newti + + // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) + if ti.duration == time.Duration(0) { + go func(t timeoutInfo) { cs.tockChan <- t }(ti) + continue + } + + log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + cs.timeoutTicker.Stop() + cs.timeoutTicker = time.NewTicker(ti.duration) + case <-cs.timeoutTicker.C: + log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + cs.timeoutTicker.Stop() + // go routine here gaurantees timeoutRoutine doesn't block. + // Determinism comes from playback in the receiveRoutine. + // We can eliminate it by merging the timeoutRoutine into receiveRoutine + // and managing the timeouts ourselves with a millisecond ticker + go func(t timeoutInfo) { cs.tockChan <- t }(ti) + case <-cs.Quit: + return + } + } +} + +// a nice idea but probably more trouble than its worth +func (cs *ConsensusState) stopTimer() { + cs.timeoutTicker.Stop() +} + +// receiveRoutine handles messages which may cause state transitions. +// it's argument (n) is the number of messages to process before exiting - use 0 to run forever +// It keeps the RoundState and is the only thing that updates it. +// Updates happen on timeouts, complete proposals, and 2/3 majorities +func (cs *ConsensusState) receiveRoutine(maxSteps int) { + for { + if maxSteps > 0 { + if cs.nSteps >= maxSteps { + log.Warn("reached max steps. exiting receive routine") + cs.nSteps = 0 + return + } + } + rs := cs.RoundState + var mi msgInfo + + select { + case mi = <-cs.peerMsgQueue: + // handles proposals, block parts, votes + // may generate internal events (votes, complete proposals, 2/3 majorities) + cs.handleMsg(mi, rs) + case mi = <-cs.internalMsgQueue: + // handles proposals, block parts, votes + cs.handleMsg(mi, rs) + case ti := <-cs.tockChan: + // if the timeout is relevant to the rs + // go to the next step + cs.handleTimeout(ti, rs) + case <-cs.Quit: + return + } + } +} + +// state transitions on complete-proposal, 2/3-any, 2/3-one +func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { cs.mtx.Lock() defer cs.mtx.Unlock() - cs.privValidator = priv + + var err error + msg, peerKey := mi.msg, mi.peerKey + switch msg := msg.(type) { + case *ProposalMessage: + // will not cause transition. + // once proposal is set, we can receive block parts + err = cs.setProposal(msg.Proposal) + case *BlockPartMessage: + // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit + // if we're the only validator, the EnterPrevote may take us through to the next round + _, err = cs.addProposalBlockPart(msg.Height, msg.Part) + case *VoteMessage: + // attempt to add the vote and dupeout the validator if its a duplicate signature + // if the vote gives us a 2/3-any or 2/3-one, we transition + added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + if err == ErrAddingVote { + // TODO: punish peer + } + + if added { + // If rs.Height == vote.Height && rs.Round < vote.Round, + // the peer is sending us CatchupCommit precommits. + // We could make note of this and help filter in broadcastHasVoteMessage(). + + // XXX TODO: do this + // conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) + } + default: + log.Warn("Unknown msg type", reflect.TypeOf(msg)) + } + if err != nil { + log.Error("error with msg", "error", err) + } +} + +func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { + log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + + // if this is a timeout for the new height + if ti.height == rs.Height+1 && ti.round == 0 && ti.step == 1 { + cs.mtx.Lock() + // Increment height. + cs.updateToState(cs.stagedState) + // event fired from EnterNewRound after some updates + cs.EnterNewRound(ti.height, 0) + cs.mtx.Unlock() + return + } + + // timeouts must be for current height, round, step + if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { + log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) + return + } + + // the timeout will now cause a state transition + cs.mtx.Lock() + defer cs.mtx.Unlock() + + switch ti.step { + case RoundStepPropose: + cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) + cs.EnterPrevote(ti.height, ti.round) + case RoundStepPrevoteWait: + cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + cs.EnterPrecommit(ti.height, ti.round) + case RoundStepPrecommitWait: + cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + cs.EnterNewRound(ti.height, ti.round+1) + default: + panic(Fmt("Invalid timeout step: %v", ti.step)) + } + } //----------------------------------------------------------------------------- +// State functions +// Many of these functions are capitalized but are not really meant to be used +// by external code as it will cause race conditions with running timeout/receiveRoutine. +// Use AddVote, SetProposal, AddProposalBlockPart instead // Enter: +2/3 precommits for nil at (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) @@ -549,7 +655,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { if now := time.Now(); cs.StartTime.After(now) { log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now) } - cs.stopTimer() + // cs.stopTimer() log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -608,7 +714,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { // Done EnterPropose: cs.updateRoundStep(round, RoundStepPropose) - cs.newStepCh <- cs.getRoundState() + cs.newStep() // If we have the whole proposal + POL, then goto Prevote now. // else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart), @@ -638,18 +744,21 @@ func (cs *ConsensusState) decideProposal(height, round int) { proposal := types.NewProposal(height, round, blockParts.Header(), cs.Votes.POLRound()) err := cs.privValidator.SignProposal(cs.state.ChainID, proposal) if err == nil { - log.Notice("Signed and set proposal", "height", height, "round", round, "proposal", proposal) - log.Debug(Fmt("Signed and set proposal block: %v", block)) // Set fields + /* fields set by setProposal and addBlockPart cs.Proposal = proposal cs.ProposalBlock = block cs.ProposalBlockParts = blockParts + */ + // send proposal and block parts on internal msg queue cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) for i := 0; i < blockParts.Total(); i++ { part := blockParts.GetPart(i) cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } + log.Notice("Signed and sent proposal", "height", height, "round", round, "proposal", proposal) + log.Debug(Fmt("Signed and sent proposal block: %v", block)) } else { log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) } @@ -741,7 +850,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { // TODO: catchup event? } - cs.stopTimer() + // cs.stopTimer() log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -754,7 +863,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { cs.doPrevote(height, round) // Done EnterPrevote: - cs.newStepCh <- cs.getRoundState() + cs.newStep() // Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait // (so we have more time to try and collect +2/3 prevotes for a single block) @@ -806,7 +915,7 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Done EnterPrevoteWait: cs.updateRoundStep(round, RoundStepPrevoteWait) - cs.newStepCh <- cs.getRoundState() + cs.newStep() // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) @@ -826,14 +935,14 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { return } - cs.stopTimer() + // cs.stopTimer() log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) cs.updateRoundStep(round, RoundStepPrecommit) defer func() { // Done EnterPrecommit: - cs.newStepCh <- cs.getRoundState() + cs.newStep() }() hash, partsHeader, ok := cs.Votes.Prevotes(round).TwoThirdsMajority() @@ -929,7 +1038,7 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { // Done EnterPrecommitWait: cs.updateRoundStep(round, RoundStepPrecommitWait) - cs.newStepCh <- cs.getRoundState() + cs.newStep() // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) @@ -951,7 +1060,7 @@ func (cs *ConsensusState) EnterCommit(height int, commitRound int) { // keep ca.Round the same, it points to the right Precommits set. cs.Step = RoundStepCommit cs.CommitRound = commitRound - cs.newStepCh <- cs.getRoundState() + cs.newStep() // Maybe finalize immediately. cs.tryFinalizeCommit(height) @@ -1030,11 +1139,11 @@ func (cs *ConsensusState) FinalizeCommit(height int) { log.Info(Fmt("Finalizing commit of block: %v", cs.ProposalBlock)) // We have the block, so stage/save/commit-vote. cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound)) - // Increment height. - cs.updateToState(cs.stagedState) + + // call updateToState from handleTimeout + // cs.StartTime is already set. // Schedule Round0 to start soon. - // go cs.scheduleRound0(height + 1) // By here, @@ -1046,7 +1155,7 @@ func (cs *ConsensusState) FinalizeCommit(height int) { //----------------------------------------------------------------------------- -func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error { +func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1083,7 +1192,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error { // NOTE: block is not necessarily valid. // This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit -func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (added bool, err error) { +func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1148,34 +1257,11 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str return added, nil } -/* - -// Interface to the state machine from external go routines. -// May block on send if queue is full. -// How do we get added/address/error back? -func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { - if peerKey == "" { - cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} - } else { - cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey} - } - - // TODO: wait for event?! -} - -*/ - //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) - defer func() { - if added { - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) - } - }() - // A precommit for the previous height? if vote.Height+1 == cs.Height { if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { @@ -1186,6 +1272,8 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) + cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + } return } @@ -1195,6 +1283,8 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { + cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + switch vote.Type { case types.VoteTypePrevote: prevotes := cs.Votes.Prevotes(vote.Round) @@ -1307,6 +1397,7 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet return vote, err } +// signs the vote, publishes on internalMsgQueue func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { return nil @@ -1360,14 +1451,7 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe } -// implements events.Eventable -func (cs *ConsensusState) SetFireable(evsw events.Fireable) { - cs.evsw = evsw -} - -func (cs *ConsensusState) String() string { - return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) -} +//--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { if h1 < h2 {