From 9e4cf5bd07ca53e74f5a441575e22925761dedef Mon Sep 17 00:00:00 2001 From: William Banfield Date: Mon, 29 Nov 2021 18:53:45 -0500 Subject: [PATCH] register result chan separately --- internal/consensus/common_test.go | 4 +-- internal/consensus/pbts_test.go | 60 ++++++++++++++----------------- internal/consensus/state_test.go | 1 + 3 files changed, 30 insertions(+), 35 deletions(-) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 8031a66cc..e9fce8917 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -405,13 +405,13 @@ func subscribeToVoterBuffered(ctx context.Context, t *testing.T, cs *State, addr if err != nil { t.Fatalf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote) } - ch := make(chan tmpubsub.Message) + ch := make(chan tmpubsub.Message, 10) go func() { for { msg, err := votesSub.Next(ctx) if err != nil { if !errors.Is(err, pubsub.ErrTerminated) && !errors.Is(err, context.Canceled) { - t.Fatalf("error retrieving value of subscription %s", err) + t.Logf("error terminating pubsub %s", err) } return } diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index 308b312eb..93b8798c0 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -47,6 +47,8 @@ type pbtsTestHarness struct { // channels for verifying that the observed validator completes certain actions. ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message + resultCh <-chan heightResult + currentHeight int64 currentRound int32 @@ -99,6 +101,8 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat pubKey, err := vss[0].PrivValidator.GetPubKey(ctx) require.NoError(t, err) + resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address()) + return pbtsTestHarness{ pbtsTestConfiguration: tc, observedValidator: vss[0], @@ -111,12 +115,13 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal), blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock), ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()), + resultCh: resultCh, t: t, ctx: ctx, } } -func (p *pbtsTestHarness) genesisHeight() { +func (p *pbtsTestHarness) genesisHeight() heightResult { p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(8) startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound) @@ -133,6 +138,7 @@ func (p *pbtsTestHarness) genesisHeight() { ensureNewBlock(p.t, p.blockCh, p.currentHeight) p.currentHeight++ incrementHeight(p.otherValidators...) + return <-p.resultCh } func (p *pbtsTestHarness) height2() heightResult { @@ -143,9 +149,6 @@ func (p *pbtsTestHarness) height2() heightResult { // nolint: lll func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, proposedTime, nextProposedTime time.Time) heightResult { p.validatorClock.On("Now").Return(nextProposedTime).Times(8) - pubKey, err := p.observedValidator.PrivValidator.GetPubKey(p.ctx) - require.NoError(p.t, err) - resultCh := collectResults(p.ctx, p.t, p.observedState.eventBus, pubKey.Address()) ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound) @@ -181,39 +184,30 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, p.currentHeight++ incrementHeight(p.otherValidators...) - return <-resultCh + return <-p.resultCh } -func collectResults(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult { +func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult { t.Helper() - resultCh := make(chan heightResult) - voteSub, err := eb.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ - ClientID: "voteSubscriber", - Query: types.EventQueryVote}) - require.NoError(t, err) - go func() { - var res heightResult - for { - voteMsg, err := voteSub.Next(ctx) - require.NoError(t, err) - ts := time.Now() - vote := voteMsg.Data().(types.EventDataVote) - if !bytes.Equal(address, vote.Vote.ValidatorAddress) { - continue - } - voteEvent, _ := voteMsg.Data().(types.EventDataVote) - if voteEvent.Vote.Type != tmproto.PrevoteType { - continue - } - res.prevoteIssuedAt = ts - res.prevote = voteEvent.Vote - break + resultCh := make(chan heightResult, 2) + var res heightResult + if err := eb.Observe(ctx, func(msg tmpubsub.Message) error { + ts := time.Now() + vote := msg.Data().(types.EventDataVote) + // we only fire for our own votes + if !bytes.Equal(address, vote.Vote.ValidatorAddress) { + return nil } - err := eb.UnsubscribeAll(context.Background(), "voteSubscriber") - require.NoError(t, err) + if vote.Vote.Type != tmproto.PrevoteType { + return nil + } + res.prevoteIssuedAt = ts + res.prevote = vote.Vote resultCh <- res - close(resultCh) - }() + return nil + }, types.EventQueryVote); err != nil { + t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err) + } return resultCh } @@ -259,7 +253,7 @@ func TestReceiveProposalWaitsForPreviousBlockTime(t *testing.T) { results := pbtsTest.run() // Check that the validator waited until after the proposer-based timestamp - // waitinTime bound. + // waitingTime bound. assert.True(t, results.height2.prevoteIssuedAt.After(cfg.height2ProposalDeliverTime)) maxWaitingTime := cfg.genesisTime.Add(cfg.timestampParams.Precision).Add(cfg.timestampParams.MsgDelay) assert.True(t, results.height2.prevoteIssuedAt.Before(maxWaitingTime)) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 0f3663fd5..f88e40814 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2529,6 +2529,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { cs, _, err := makeState(ctx, config, logger, 1) require.NoError(t, err) peerID, err := types.NewNodeID("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + require.NoError(t, err) // 1) new block part parts := types.NewPartSetFromData(tmrand.Bytes(100), 10)