From b9e143d95651b6d5930c2579e2ab82ebf8fd7dc7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 14 Dec 2015 00:38:19 -0500 Subject: [PATCH] Fireable -> EventSwitch; rs in EventDataRoundState; fixes from review --- blockchain/reactor.go | 4 +- consensus/common_test.go | 4 +- consensus/reactor.go | 26 ++++--- consensus/state.go | 59 ++++++---------- consensus/state_test.go | 148 ++++++++++++++++++--------------------- events/events.go | 6 +- mempool/reactor.go | 4 +- node/node.go | 6 +- state/state.go | 5 +- types/events.go | 31 ++++---- 10 files changed, 127 insertions(+), 166 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 0a9778308..bf7a8236c 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -52,7 +52,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block - evsw events.Fireable + evsw *events.EventSwitch } func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor { @@ -261,7 +261,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { +func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { bcR.evsw = evsw } diff --git a/consensus/common_test.go b/consensus/common_test.go index 2650b03f3..81100adaf 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -324,7 +324,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { cs.SetPrivValidator(privVals[0]) evsw := events.NewEventSwitch() - cs.SetFireable(evsw) + cs.SetEventSwitch(evsw) evsw.Start() // start the transition routines @@ -340,7 +340,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { } func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - voteCh0 := cs.evsw.(*events.EventSwitch).SubscribeToEvent(types.EventStringVote(), 0) + voteCh0 := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) voteCh := make(chan interface{}) go func() { for { diff --git a/consensus/reactor.go b/consensus/reactor.go index ab2fb4913..a68f917c2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -34,7 +34,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState fastSync bool - evsw events.Fireable + evsw *events.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { @@ -221,9 +221,9 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { } // implements events.Eventable -func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { +func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { conR.evsw = evsw - conR.conS.SetFireable(evsw) + conR.conS.SetEventSwitch(evsw) } //-------------------------------------- @@ -231,21 +231,19 @@ func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { // Listens for new steps and votes, // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - // XXX: should we change SetFireable to just use EventSwitch so we don't need these assertions? - evsw := conR.evsw.(*events.EventSwitch) - evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { - rs := data.(*types.EventDataRoundState) + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) conR.broadcastNewRoundStep(rs) }) - evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { edv := data.(*types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) } -func (conR *ConsensusReactor) broadcastNewRoundStep(rs *types.EventDataRoundState) { +func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { @@ -282,20 +280,20 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in */ } -func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { +func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { step := RoundStepType(rs.Step) nrsMsg = &NewRoundStepMessage{ Height: rs.Height, Round: rs.Round, Step: step, SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()), - LastCommitRound: rs.LastCommitRound, + LastCommitRound: rs.LastCommit.Round(), } if step == RoundStepCommit { csMsg = &CommitStepMessage{ Height: rs.Height, - BlockPartsHeader: rs.BlockPartsHeader, - BlockParts: rs.BlockParts, + BlockPartsHeader: rs.ProposalBlockParts.Header(), + BlockParts: rs.ProposalBlockParts.BitArray(), } } return @@ -303,7 +301,7 @@ func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepM func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { rs := conR.conS.GetRoundState() - nrsMsg, csMsg := makeRoundStepMessages(rs.RoundStateEvent()) + nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { peer.Send(StateChannel, nrsMsg) } diff --git a/consensus/state.go b/consensus/state.go index eb9b59030..bb9bf586b 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -98,27 +98,13 @@ type RoundState struct { } func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { - var header types.PartSetHeader - var parts *BitArray - if rs.ProposalBlockParts != nil { - header = rs.ProposalBlockParts.Header() - parts = rs.ProposalBlockParts.BitArray() - } - return &types.EventDataRoundState{ - CurrentTime: time.Now(), - Height: rs.Height, - Round: rs.Round, - Step: int(rs.Step), - StartTime: rs.StartTime, - CommitTime: rs.CommitTime, - Proposal: rs.Proposal, - ProposalBlock: rs.ProposalBlock, - LockedRound: rs.LockedRound, - LockedBlock: rs.LockedBlock, - POLRound: rs.Votes.POLRound(), - BlockPartsHeader: header, - BlockParts: parts, + edrs := &types.EventDataRoundState{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), } + edrs.SetRoundState(rs) + return edrs } func (rs *RoundState) String() string { @@ -204,7 +190,7 @@ type ConsensusState struct { tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine - evsw events.Fireable + evsw *events.EventSwitch evc *events.EventCache // set in stageBlock and passed into state nSteps int // used for testing to limit the number of transitions the state makes @@ -233,7 +219,7 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore // Public interface // implements events.Eventable -func (cs *ConsensusState) SetFireable(evsw events.Fireable) { +func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { cs.evsw = evsw } @@ -641,9 +627,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { //----------------------------------------------------------------------------- // State functions -// Many of these functions are capitalized but are not really meant to be used -// by external code as it will cause race conditions with running timeout/receiveRoutine. -// Use AddVote, SetProposal, AddProposalBlockPart instead +// Used internally by handleTimeout and handleMsg to make state transitions // Enter: +2/3 precommits for nil at (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) @@ -706,6 +690,13 @@ func (cs *ConsensusState) enterPropose(height int, round int) { // Done enterPropose: cs.updateRoundStep(round, RoundStepPropose) cs.newStep() + + // If we have the whole proposal + POL, then goto Prevote now. + // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), + // or else after timeoutPropose + if cs.isProposalComplete() { + cs.enterPrevote(height, cs.Round) + } }() // This step times out after `timeoutPropose` @@ -723,12 +714,6 @@ func (cs *ConsensusState) enterPropose(height int, round int) { cs.decideProposal(height, round) } - // If we have the whole proposal + POL, then goto Prevote now. - // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), - // or else after timeoutPropose - if cs.isProposalComplete() { - cs.enterPrevote(height, cs.Round) - } } func (cs *ConsensusState) decideProposal(height, round int) { @@ -1117,29 +1102,29 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) { return } // go - cs.FinalizeCommit(height) + cs.finalizeCommit(height) } // Increment height and goto RoundStepNewHeight -func (cs *ConsensusState) FinalizeCommit(height int) { +func (cs *ConsensusState) finalizeCommit(height int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || cs.Step != RoundStepCommit { - log.Debug(Fmt("FinalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) return } hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() if !ok { - PanicSanity(Fmt("Cannot FinalizeCommit, commit does not have two thirds majority")) + PanicSanity(Fmt("Cannot finalizeCommit, commit does not have two thirds majority")) } if !cs.ProposalBlockParts.HasHeader(header) { PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header")) } if !cs.ProposalBlock.HashesTo(hash) { - PanicSanity(Fmt("Cannot FinalizeCommit, ProposalBlock does not hash to commit hash")) + PanicSanity(Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) } if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) @@ -1378,7 +1363,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS // Create a copy of the state for staging stateCopy := cs.state.Copy() - stateCopy.SetFireable(cs.evc) + stateCopy.SetEventCache(cs.evc) // Run the block on the State: // + update validator sets diff --git a/consensus/state_test.go b/consensus/state_test.go index d4917ac39..d6e8682a4 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,7 +8,6 @@ import ( _ "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/tendermint/events" - "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -54,9 +53,8 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := simpleConsensusState(4) height, round := cs1.Height, cs1.Round - evsw := cs1.evsw.(*events.EventSwitch) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) startTestRound(cs1, height, round) @@ -88,8 +86,7 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators - evsw := cs1.evsw.(*events.EventSwitch) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -120,9 +117,8 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round - evsw := cs.evsw.(*events.EventSwitch) // Listen for propose timeout event - timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) startTestRound(cs, height, round) @@ -146,9 +142,9 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - evsw := cs.evsw.(*events.EventSwitch) - timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + + timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + proposalCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) cs.enterNewRound(height, round) cs.startRoutines(3) @@ -182,9 +178,8 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round cs2 := vss[1] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) @@ -238,10 +233,9 @@ func TestFullRound1(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - evsw := cs.evsw.(*events.EventSwitch) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - propCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + propCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) startTestRound(cs, height, round) @@ -249,7 +243,7 @@ func TestFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(*types.EventDataRoundState).ProposalBlock.Hash() + propBlockHash := re.(*types.EventDataRoundState).RoundState().(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote validatePrevote(t, cs, round, vss[0], propBlockHash) @@ -267,8 +261,7 @@ func TestFullRoundNil(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - evsw := cs.evsw.(*events.EventSwitch) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) cs.enterPrevote(height, round) cs.startRoutines(4) @@ -287,9 +280,8 @@ func TestFullRound2(t *testing.T) { cs2 := vss[1] height, round := cs1.Height, cs1.Round - evsw := cs1.evsw.(*events.EventSwitch) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -329,12 +321,11 @@ func TestLockNoPOL(t *testing.T) { cs2 := vss[1] height := cs1.Height - evsw := cs1.evsw.(*events.EventSwitch) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) /* Round1 (cs1, B) // B B // B B2 @@ -345,7 +336,7 @@ func TestLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -385,7 +376,7 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.ProposalBlock != nil { t.Fatal("Expected proposal block to be nil") @@ -429,7 +420,7 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) re = <-proposalCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -492,13 +483,12 @@ func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) @@ -515,7 +505,7 @@ func TestLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -586,7 +576,7 @@ func TestLockPOLRelock(t *testing.T) { be := <-newBlockCh b := be.(types.EventDataNewBlock) re = <-newRoundCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } @@ -601,12 +591,11 @@ func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -621,7 +610,7 @@ func TestLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -645,7 +634,7 @@ func TestLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt gauranteed to get there before the timeoutPropose ... @@ -693,18 +682,17 @@ func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -752,7 +740,7 @@ func TestLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.LockedBlock != nil { t.Fatal("we should not be locked!") @@ -792,7 +780,7 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) - newStepCh := evsw.SubscribeToEvent(types.EventStringNewRoundStep(), 0) + newStepCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRoundStep(), 0) // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) @@ -813,12 +801,11 @@ func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // the block for R0: gets polkad but we miss it @@ -904,10 +891,10 @@ func TestSlashingPrevotes(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) + + proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -916,7 +903,7 @@ func TestSlashingPrevotes(t *testing.T) { re := <-proposalCh <-voteCh // prevote - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait @@ -939,10 +926,10 @@ func TestSlashingPrecommits(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) + + proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -984,18 +971,17 @@ func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet() @@ -1018,7 +1004,7 @@ func TestHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) log.Notice("### ONTO ROUND 1") /*Round2 @@ -1036,7 +1022,7 @@ func TestHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.Height != 2 { t.Fatal("expected height to increment") diff --git a/events/events.go b/events/events.go index e70d39620..647522d0a 100644 --- a/events/events.go +++ b/events/events.go @@ -10,7 +10,7 @@ import ( // reactors and other modules should export // this interface to become eventable type Eventable interface { - SetFireable(Fireable) + SetEventSwitch(evsw *EventSwitch) } // an event switch or cache implements fireable @@ -123,10 +123,10 @@ func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { eventCell.FireEvent(data) } -func (evsw *EventSwitch) SubscribeToEvent(eventID string, chanCap int) chan interface{} { +func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { // listen for new round ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { + evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { // NOTE: in production, evsw callbacks should be nonblocking. ch <- data }) diff --git a/mempool/reactor.go b/mempool/reactor.go index ed8b8b260..5dd187fdc 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -25,7 +25,7 @@ const ( type MempoolReactor struct { p2p.BaseReactor Mempool *Mempool // TODO: un-expose - evsw events.Fireable + evsw *events.EventSwitch } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -135,7 +135,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } // implements events.Eventable -func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { +func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index b63116355..e01f02a8e 100644 --- a/node/node.go +++ b/node/node.go @@ -101,7 +101,7 @@ func NewNode() *Node { // add the event switch to all services // they should all satisfy events.Eventable - SetFireable(eventSwitch, bcReactor, mempoolReactor, consensusReactor) + SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor) // run the profile server profileHost := config.GetString("prof_laddr") @@ -144,9 +144,9 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { for _, e := range eventables { - e.SetFireable(evsw) + e.SetEventSwitch(evsw) } } diff --git a/state/state.go b/state/state.go index 32a6bf179..9523a6497 100644 --- a/state/state.go +++ b/state/state.go @@ -33,7 +33,7 @@ type State struct { LastValidators *types.ValidatorSet LastAppHash []byte - evc events.Fireable // typically an events.EventCache + evc *events.EventCache } func LoadState(db dbm.DB) *State { @@ -81,8 +81,7 @@ func (s *State) Save() { s.db.Set(stateKey, buf.Bytes()) } -// Implements events.Eventable. Typically uses events.EventCache -func (s *State) SetFireable(evc events.Fireable) { +func (s *State) SetEventCache(evc *events.EventCache) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/types/events.go b/types/events.go index b046e9656..436e02de6 100644 --- a/types/events.go +++ b/types/events.go @@ -1,9 +1,6 @@ package types import ( - "time" - - "github.com/tendermint/go-common" "github.com/tendermint/go-wire" ) @@ -74,25 +71,21 @@ type EventDataApp struct { Data []byte `json:"bytes"` } -// We fire the most recent round state that led to the event -// (ie. NewRound will have the previous rounds state) type EventDataRoundState struct { - CurrentTime time.Time `json:"current_time"` + Height int `json:"height"` + Round int `json:"round"` + Step string `json:"step"` - Height int `json:"height"` - Round int `json:"round"` - Step int `json:"step"` - LastCommitRound int `json:"last_commit_round"` - StartTime time.Time `json:"start_time"` - CommitTime time.Time `json:"commit_time"` - Proposal *Proposal `json:"proposal"` - ProposalBlock *Block `json:"proposal_block"` - LockedRound int `json:"locked_round"` - LockedBlock *Block `json:"locked_block"` - POLRound int `json:"pol_round"` + // private, not exposed to websockets + rs interface{} +} - BlockPartsHeader PartSetHeader `json:"block_parts_header"` - BlockParts *common.BitArray `json:"block_parts"` +func (edrs *EventDataRoundState) RoundState() interface{} { + return edrs.rs +} + +func (edrs *EventDataRoundState) SetRoundState(rs interface{}) { + edrs.rs = rs } type EventDataVote struct {