diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 862e24ded..b8e447367 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -49,7 +49,7 @@ func TestByzantine(t *testing.T) { switches[i].SetLogger(p2pLogger.With("validator", i)) } - eventSubs := make([]types.Subscription, N) + blocksSubs := make([]types.Subscription, N) reactors := make([]p2p.Reactor, N) for i := 0; i < N; i++ { // make first val byzantine @@ -69,7 +69,7 @@ func TestByzantine(t *testing.T) { eventBus.SetLogger(logger.With("module", "events", "validator", i)) var err error - eventSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) + blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states @@ -134,7 +134,7 @@ func TestByzantine(t *testing.T) { p2p.Connect2Switches(switches, ind1, ind2) // wait for someone in the big partition (B) to make a block - <-eventSubs[ind2].Out() + <-blocksSubs[ind2].Out() t.Log("A block has been committed. Healing partition") p2p.Connect2Switches(switches, ind0, ind1) @@ -146,7 +146,7 @@ func TestByzantine(t *testing.T) { wg.Add(2) for i := 1; i < N-1; i++ { go func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() wg.Done() }(i) } diff --git a/consensus/common_test.go b/consensus/common_test.go index 282d09b45..5b7c9772c 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -218,15 +218,14 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo cs.mtx.Unlock() } -// genesis func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message { - voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote) + votesSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote) if err != nil { panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) } ch := make(chan tmpubsub.Message) go func() { - for msg := range voteCh0Sub.Out() { + for msg := range votesSub.Out() { vote := msg.Data().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 4b9e53d83..db2c35240 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -91,11 +91,11 @@ func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuse func TestReactorBasic(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) - reactors, eventSubs, eventBuses := startConsensusNet(t, css, N) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() }, css) } @@ -167,19 +167,19 @@ func TestReactorWithEvidence(t *testing.T) { css[i] = cs } - reactors, eventSubs, eventBuses := startConsensusNet(t, css, nValidators) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nValidators) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block with no evidence timeoutWaitGroup(t, nValidators, func(j int) { - msg := <-eventSubs[j].Out() + msg := <-blocksSubs[j].Out() block := msg.Data().(types.EventDataNewBlock).Block assert.True(t, len(block.Evidence.Evidence) == 0) }, css) // second block should have evidence timeoutWaitGroup(t, nValidators, func(j int) { - msg := <-eventSubs[j].Out() + msg := <-blocksSubs[j].Out() block := msg.Data().(types.EventDataNewBlock).Block assert.True(t, len(block.Evidence.Evidence) > 0) }, css) @@ -225,7 +225,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { func(c *cfg.Config) { c.Consensus.CreateEmptyBlocks = false }) - reactors, eventSubs, eventBuses := startConsensusNet(t, css, N) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // send a tx @@ -235,7 +235,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() }, css) } @@ -243,12 +243,12 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { func TestReactorRecordsVotesAndBlockParts(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) - reactors, eventSubs, eventBuses := startConsensusNet(t, css, N) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() }, css) // Get peer @@ -267,7 +267,7 @@ func TestReactorVotingPowerChange(t *testing.T) { nVals := 4 logger := log.TestingLogger() css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentKVStore) - reactors, eventSubs, eventBuses := startConsensusNet(t, css, nVals) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nVals) defer stopConsensusNet(logger, reactors, eventBuses) // map of active validators @@ -279,7 +279,7 @@ func TestReactorVotingPowerChange(t *testing.T) { // wait till everyone makes block 1 timeoutWaitGroup(t, nVals, func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() }, css) //--------------------------------------------------------------------------- @@ -290,10 +290,10 @@ func TestReactorVotingPowerChange(t *testing.T) { updateValidatorTx := kvstore.MakeValSetChangeTx(val1PubKeyABCI, 25) previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx) - waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx) + waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css) if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower()) @@ -302,10 +302,10 @@ func TestReactorVotingPowerChange(t *testing.T) { updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2) previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx) - waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx) + waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css) if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower()) @@ -314,10 +314,10 @@ func TestReactorVotingPowerChange(t *testing.T) { updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 26) previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx) - waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) - waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx) + waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css) if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower()) @@ -331,7 +331,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { logger := log.TestingLogger() - reactors, eventSubs, eventBuses := startConsensusNet(t, css, nPeers) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nPeers) defer stopConsensusNet(logger, reactors, eventBuses) // map of active validators @@ -343,7 +343,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { // wait till everyone makes block 1 timeoutWaitGroup(t, nPeers, func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() }, css) //--------------------------------------------------------------------------- @@ -356,22 +356,22 @@ func TestReactorValidatorSetChanges(t *testing.T) { // wait till everyone makes block 2 // ensure the commit includes all validators // send newValTx to change vals in block 3 - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, newValidatorTx1) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, newValidatorTx1) // wait till everyone makes block 3. // it includes the commit for block 2, which is by the original validator set - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, newValidatorTx1) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, newValidatorTx1) // wait till everyone makes block 4. // it includes the commit for block 3, which is by the original validator set - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css) // the commits for block 4 should be with the updated validator set activeVals[string(newValidatorPubKey1.Address())] = struct{}{} // wait till everyone makes block 5 // it includes the commit for block 4, which should have the updated validator set - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css) //--------------------------------------------------------------------------- logger.Info("---------------------------- Testing changing the voting power of one validator") @@ -381,10 +381,10 @@ func TestReactorValidatorSetChanges(t *testing.T) { updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, updateValidatorTx1) - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, updateValidatorTx1) - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, updateValidatorTx1) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, updateValidatorTx1) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css) if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower()) @@ -401,12 +401,12 @@ func TestReactorValidatorSetChanges(t *testing.T) { newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3) newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, newValidatorTx2, newValidatorTx3) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, newValidatorTx2, newValidatorTx3) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css) activeVals[string(newValidatorPubKey2.Address())] = struct{}{} activeVals[string(newValidatorPubKey3.Address())] = struct{}{} - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css) //--------------------------------------------------------------------------- logger.Info("---------------------------- Testing removing two validators at once") @@ -414,12 +414,12 @@ func TestReactorValidatorSetChanges(t *testing.T) { removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, removeValidatorTx2, removeValidatorTx3) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, removeValidatorTx2, removeValidatorTx3) + waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css) delete(activeVals, string(newValidatorPubKey2.Address())) delete(activeVals, string(newValidatorPubKey3.Address())) - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css) } // Check we can make blocks with skip_timeout_commit=false @@ -431,12 +431,12 @@ func TestReactorWithTimeoutCommit(t *testing.T) { css[i].config.SkipTimeoutCommit = false } - reactors, eventSubs, eventBuses := startConsensusNet(t, css, N-1) + reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N-1) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N-1, func(j int) { - <-eventSubs[j].Out() + <-blocksSubs[j].Out() }, css) } @@ -444,13 +444,13 @@ func waitForAndValidateBlock( t *testing.T, n int, activeVals map[string]struct{}, - eventSubs []types.Subscription, + blocksSubs []types.Subscription, css []*ConsensusState, txs ...[]byte, ) { timeoutWaitGroup(t, n, func(j int) { css[j].Logger.Debug("waitForAndValidateBlock") - msg := <-eventSubs[j].Out() + msg := <-blocksSubs[j].Out() newBlock := msg.Data().(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) @@ -466,7 +466,7 @@ func waitForAndValidateBlockWithTx( t *testing.T, n int, activeVals map[string]struct{}, - eventSubs []types.Subscription, + blocksSubs []types.Subscription, css []*ConsensusState, txs ...[]byte, ) { @@ -475,7 +475,7 @@ func waitForAndValidateBlockWithTx( BLOCK_TX_LOOP: for { css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs) - msg := <-eventSubs[j].Out() + msg := <-blocksSubs[j].Out() newBlock := msg.Data().(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) @@ -501,7 +501,7 @@ func waitForBlockWithUpdatedValsAndValidateIt( t *testing.T, n int, updatedVals map[string]struct{}, - eventSubs []types.Subscription, + blocksSubs []types.Subscription, css []*ConsensusState, ) { timeoutWaitGroup(t, n, func(j int) { @@ -510,7 +510,7 @@ func waitForBlockWithUpdatedValsAndValidateIt( LOOP: for { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt") - msg := <-eventSubs[j].Out() + msg := <-blocksSubs[j].Out() newBlock = msg.Data().(types.EventDataNewBlock).Block if newBlock.LastCommit.Size() == len(updatedVals) { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index cd1230485..effe214d1 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -82,7 +82,6 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { return err } - // TODO: pass a subscription if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { return err } @@ -223,7 +222,6 @@ func (pb *playback) replayConsoleLoop() int { defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) if len(tokens) == 1 { - // TODO: pass a subscription if err := pb.replayReset(1, newStepSub); err != nil { pb.cs.Logger.Error("Replay reset error", "err", err) } @@ -234,7 +232,6 @@ func (pb *playback) replayConsoleLoop() int { } else if i > pb.count { fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) } else { - // TODO: pass a subscription if err := pb.replayReset(i, newStepSub); err != nil { pb.cs.Logger.Error("Replay reset error", "err", err) } diff --git a/consensus/state_test.go b/consensus/state_test.go index 1e4e245e4..3ce791669 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1290,7 +1290,6 @@ func (n *fakeTxNotifier) Notify() { func TestStartNextHeightCorrectly(t *testing.T) { config.Consensus.SkipTimeoutCommit = false cs1, vss := randConsensusState(4) - // DATA RACE cs1.txNotifier = &fakeTxNotifier{ch: make(chan struct{})} vs2, vs3, vs4 := vss[1], vss[2], vss[3]