mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-09 22:47:24 +00:00
consensus: more comments
This commit is contained in:
@@ -242,7 +242,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
|
||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||
ps.SetHasVote(msg.Vote)
|
||||
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
|
||||
cs.peerMsgQueue <- msgInfo{msg, src.Key}
|
||||
|
||||
default:
|
||||
// don't punish (leave room for soft upgrades)
|
||||
|
||||
@@ -35,6 +35,7 @@ var (
|
||||
//-----------------------------------------------------------------------------
|
||||
// RoundStepType enum type
|
||||
|
||||
// RoundStepType enumerates the state of the consensus state machine
|
||||
type RoundStepType uint8 // These must be numeric, ordered.
|
||||
|
||||
const (
|
||||
@@ -49,6 +50,7 @@ const (
|
||||
// NOTE: RoundStepNewHeight acts as RoundStepCommitWait.
|
||||
)
|
||||
|
||||
// String returns a string
|
||||
func (rs RoundStepType) String() string {
|
||||
switch rs {
|
||||
case RoundStepNewHeight:
|
||||
@@ -74,7 +76,8 @@ func (rs RoundStepType) String() string {
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// Immutable when returned from ConsensusState.GetRoundState()
|
||||
// RoundState defines the internal consensus state.
|
||||
// It is Immutable when returned from ConsensusState.GetRoundState()
|
||||
// TODO: Actually, only the top pointer is copied,
|
||||
// so access to field pointers is still racey
|
||||
type RoundState struct {
|
||||
@@ -96,6 +99,7 @@ type RoundState struct {
|
||||
LastValidators *types.ValidatorSet
|
||||
}
|
||||
|
||||
// RoundStateEvent returns the H/R/S of the RoundState as an event.
|
||||
func (rs *RoundState) RoundStateEvent() types.EventDataRoundState {
|
||||
edrs := types.EventDataRoundState{
|
||||
Height: rs.Height,
|
||||
@@ -106,10 +110,12 @@ func (rs *RoundState) RoundStateEvent() types.EventDataRoundState {
|
||||
return edrs
|
||||
}
|
||||
|
||||
// String returns a string
|
||||
func (rs *RoundState) String() string {
|
||||
return rs.StringIndented("")
|
||||
}
|
||||
|
||||
// StringIndented returns a string
|
||||
func (rs *RoundState) StringIndented(indent string) string {
|
||||
return fmt.Sprintf(`RoundState{
|
||||
%s H:%v R:%v S:%v
|
||||
@@ -138,6 +144,7 @@ func (rs *RoundState) StringIndented(indent string) string {
|
||||
indent)
|
||||
}
|
||||
|
||||
// StringShort returns a string
|
||||
func (rs *RoundState) StringShort() string {
|
||||
return fmt.Sprintf(`RoundState{H:%v R:%v S:%v ST:%v}`,
|
||||
rs.Height, rs.Round, rs.Step, rs.StartTime)
|
||||
@@ -167,13 +174,17 @@ func (ti *timeoutInfo) String() string {
|
||||
return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step)
|
||||
}
|
||||
|
||||
// PrivValidator is a validator that can sign votes and proposals.
|
||||
type PrivValidator interface {
|
||||
GetAddress() []byte
|
||||
SignVote(chainID string, vote *types.Vote) error
|
||||
SignProposal(chainID string, proposal *types.Proposal) error
|
||||
}
|
||||
|
||||
// Tracks consensus state across block heights and rounds.
|
||||
// ConsensusState handles execution of the consensus algorithm.
|
||||
// It processes votes and proposals, and upon reaching agreement,
|
||||
// commits blocks to the chain and executes them against the application.
|
||||
// The internal state machine receives input from peers, the internal validator, and from a timer.
|
||||
type ConsensusState struct {
|
||||
cmn.BaseService
|
||||
|
||||
@@ -218,6 +229,7 @@ type ConsensusState struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewConsensusState returns a new ConsensusState.
|
||||
func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState {
|
||||
cs := &ConsensusState{
|
||||
config: config,
|
||||
@@ -256,17 +268,20 @@ func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) {
|
||||
cs.evsw = evsw
|
||||
}
|
||||
|
||||
// String returns a string.
|
||||
func (cs *ConsensusState) String() string {
|
||||
// better not to access shared variables
|
||||
return cmn.Fmt("ConsensusState") //(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step)
|
||||
}
|
||||
|
||||
// GetState returns a copy of the chain state.
|
||||
func (cs *ConsensusState) GetState() *sm.State {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
return cs.state.Copy()
|
||||
}
|
||||
|
||||
// GetRoundState returns a copy of the internal consensus state.
|
||||
func (cs *ConsensusState) GetRoundState() *RoundState {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
@@ -278,26 +293,28 @@ func (cs *ConsensusState) getRoundState() *RoundState {
|
||||
return &rs
|
||||
}
|
||||
|
||||
// GetValidators returns a copy of the current validators.
|
||||
func (cs *ConsensusState) GetValidators() (int, []*types.Validator) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators
|
||||
}
|
||||
|
||||
// Sets our private validator account for signing votes.
|
||||
// SetPrivValidator sets the private validator account for signing votes.
|
||||
func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
cs.privValidator = priv
|
||||
}
|
||||
|
||||
// Set the local timer
|
||||
// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
|
||||
func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
cs.timeoutTicker = timeoutTicker
|
||||
}
|
||||
|
||||
// LoadCommit loads the commit for a given height.
|
||||
func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
@@ -307,6 +324,8 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
|
||||
return cs.blockStore.LoadBlockCommit(height)
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
// It loads the latest state via the WAL, and starts the timeout and receive routines.
|
||||
func (cs *ConsensusState) OnStart() error {
|
||||
|
||||
walFile := cs.config.WalFile()
|
||||
@@ -347,6 +366,7 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
|
||||
go cs.receiveRoutine(maxSteps)
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish.
|
||||
func (cs *ConsensusState) OnStop() {
|
||||
cs.BaseService.OnStop()
|
||||
|
||||
@@ -358,13 +378,14 @@ func (cs *ConsensusState) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
// Wait waits for the the main routine to return.
|
||||
// NOTE: be sure to Stop() the event switch and drain
|
||||
// any event channels or this may deadlock
|
||||
func (cs *ConsensusState) Wait() {
|
||||
<-cs.done
|
||||
}
|
||||
|
||||
// Open file to log all consensus messages and timeouts for deterministic accountability
|
||||
// OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability
|
||||
func (cs *ConsensusState) OpenWAL(walFile string) (err error) {
|
||||
err = cmn.EnsureDir(path.Dir(walFile), 0700)
|
||||
if err != nil {
|
||||
@@ -387,11 +408,13 @@ func (cs *ConsensusState) OpenWAL(walFile string) (err error) {
|
||||
}
|
||||
|
||||
//------------------------------------------------------------
|
||||
// Public interface for passing messages into the consensus state,
|
||||
// possibly causing a state transition
|
||||
// Public interface for passing messages into the consensus state, possibly causing a state transition.
|
||||
// If peerKey == "", the msg is considered internal.
|
||||
// Messages are added to the appropriate queue (peer or internal).
|
||||
// If the queue is full, the function may block.
|
||||
// TODO: should these return anything or let callers just use events?
|
||||
|
||||
// May block on send if queue is full.
|
||||
// AddVote inputs a vote.
|
||||
func (cs *ConsensusState) AddVote(vote *types.Vote, peerKey string) (added bool, err error) {
|
||||
if peerKey == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
|
||||
@@ -403,7 +426,7 @@ func (cs *ConsensusState) AddVote(vote *types.Vote, peerKey string) (added bool,
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// May block on send if queue is full.
|
||||
// SetProposal inputs a proposal.
|
||||
func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) error {
|
||||
|
||||
if peerKey == "" {
|
||||
@@ -416,7 +439,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string)
|
||||
return nil
|
||||
}
|
||||
|
||||
// May block on send if queue is full.
|
||||
// AddProposalBlockPart inputs a part of the proposal block.
|
||||
func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Part, peerKey string) error {
|
||||
|
||||
if peerKey == "" {
|
||||
@@ -429,7 +452,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Pa
|
||||
return nil
|
||||
}
|
||||
|
||||
// May block on send if queue is full.
|
||||
// SetProposalAndBlock inputs the proposal and all block parts.
|
||||
func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerKey string) error {
|
||||
cs.SetProposal(proposal, peerKey)
|
||||
for i := 0; i < parts.Total(); i++ {
|
||||
@@ -704,9 +727,11 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
|
||||
// State functions
|
||||
// Used internally by handleTimeout and handleMsg to make state transitions
|
||||
|
||||
// Enter: +2/3 precommits for nil at (height,round-1)
|
||||
// Enter: `timeoutNewHeight` by startTime (commitTime+timeoutCommit),
|
||||
// or, if SkipTimeout==true, after receiving all precommits from (height,round-1)
|
||||
// Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1)
|
||||
// Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height)
|
||||
// Enter: +2/3 precommits for nil at (height,round-1)
|
||||
// Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round)
|
||||
// NOTE: cs.StartTime was already set for height.
|
||||
func (cs *ConsensusState) enterNewRound(height int, round int) {
|
||||
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) {
|
||||
@@ -749,7 +774,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
|
||||
cs.enterPropose(height, round)
|
||||
}
|
||||
|
||||
// Enter: from NewRound(height,round).
|
||||
// Enter: from enterNewRound(height,round).
|
||||
func (cs *ConsensusState) enterPropose(height int, round int) {
|
||||
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) {
|
||||
cs.Logger.Debug(cmn.Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
||||
@@ -927,7 +952,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) {
|
||||
return
|
||||
}
|
||||
|
||||
// Valdiate proposal block
|
||||
// Validate proposal block
|
||||
err := cs.state.ValidateBlock(cs.ProposalBlock)
|
||||
if err != nil {
|
||||
// ProposalBlock is invalid, prevote nil.
|
||||
@@ -964,8 +989,8 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) {
|
||||
cs.scheduleTimeout(cs.config.Prevote(round), height, round, RoundStepPrevoteWait)
|
||||
}
|
||||
|
||||
// Enter: +2/3 precomits for block or nil.
|
||||
// Enter: `timeoutPrevote` after any +2/3 prevotes.
|
||||
// Enter: +2/3 precomits for block or nil.
|
||||
// Enter: any +2/3 precommits for next round.
|
||||
// Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round)
|
||||
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
|
||||
|
||||
@@ -3,7 +3,7 @@ package consensus
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/tmlibs/common"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
@@ -29,24 +29,26 @@ type TimeoutTicker interface {
|
||||
// Timeouts are scheduled along the tickChan,
|
||||
// and fired on the tockChan.
|
||||
type timeoutTicker struct {
|
||||
BaseService
|
||||
cmn.BaseService
|
||||
|
||||
timer *time.Timer
|
||||
tickChan chan timeoutInfo
|
||||
tockChan chan timeoutInfo
|
||||
tickChan chan timeoutInfo // for scheduling timeouts
|
||||
tockChan chan timeoutInfo // for notifying about them
|
||||
}
|
||||
|
||||
// NewTimeoutTicker returns a new TimeoutTicker.
|
||||
func NewTimeoutTicker() TimeoutTicker {
|
||||
tt := &timeoutTicker{
|
||||
timer: time.NewTimer(0),
|
||||
tickChan: make(chan timeoutInfo, tickTockBufferSize),
|
||||
tockChan: make(chan timeoutInfo, tickTockBufferSize),
|
||||
}
|
||||
tt.BaseService = *NewBaseService(nil, "TimeoutTicker", tt)
|
||||
tt.BaseService = *cmn.NewBaseService(nil, "TimeoutTicker", tt)
|
||||
tt.stopTimer() // don't want to fire until the first scheduled timeout
|
||||
return tt
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service. It starts the timeout routine.
|
||||
func (t *timeoutTicker) OnStart() error {
|
||||
|
||||
go t.timeoutRoutine()
|
||||
@@ -54,16 +56,19 @@ func (t *timeoutTicker) OnStart() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service. It stops the timeout routine.
|
||||
func (t *timeoutTicker) OnStop() {
|
||||
t.BaseService.OnStop()
|
||||
t.stopTimer()
|
||||
}
|
||||
|
||||
// Chan returns a channel on which timeouts are sent.
|
||||
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
|
||||
return t.tockChan
|
||||
}
|
||||
|
||||
// The timeoutRoutine is alwaya available to read from tickChan (it won't block).
|
||||
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
|
||||
// The timeoutRoutine is alwaya available to read from tickChan, so this won't block.
|
||||
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
|
||||
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
|
||||
t.tickChan <- ti
|
||||
|
||||
Reference in New Issue
Block a user