From 9e76e604ddb6ff78dfe4de774bdc071320970b98 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Tue, 30 Nov 2021 18:54:42 -0500 Subject: [PATCH] internal/consensus: add failing tests for proposer wait --- internal/consensus/common_test.go | 22 +++++ internal/consensus/pbts_test.go | 149 ++++++++++++++++++++++++------ 2 files changed, 143 insertions(+), 28 deletions(-) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 072414427..b4c0b503a 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -698,6 +698,28 @@ func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int } } +func ensureProposalWithTimeout(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID, timeout time.Duration) { + t.Helper() + select { + case <-time.After(timeout): + t.Fatalf("Timeout expired while waiting for NewProposal event") + case msg := <-proposalCh: + proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal) + if !ok { + t.Fatalf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", + msg.Data()) + } + if proposalEvent.Height != height { + t.Fatalf("expected height %v, got %v", height, proposalEvent.Height) + } + if proposalEvent.Round != round { + t.Fatalf("expected round %v, got %v", round, proposalEvent.Round) + } + if !proposalEvent.BlockID.Equals(propID) { + t.Fatalf("Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID) + } + } +} func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) { t.Helper() ensureVote(t, voteCh, height, round, tmproto.PrecommitType) diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index 7a40b67c8..e00c6b9cd 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -47,7 +47,7 @@ type pbtsTestHarness struct { // channels for verifying that the observed validator completes certain actions. ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message - resultCh <-chan heightResult + eventCh <-chan timestampedEvent currentHeight int64 currentRound int32 @@ -101,7 +101,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat pubKey, err := vss[0].PrivValidator.GetPubKey(ctx) require.NoError(t, err) - resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address()) + eventCh := timestampedCollector(ctx, t, cs.eventBus) return pbtsTestHarness{ pbtsTestConfiguration: tc, @@ -115,7 +115,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal), blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock), ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()), - resultCh: resultCh, + eventCh: eventCh, t: t, ctx: ctx, } @@ -128,7 +128,8 @@ func (p *pbtsTestHarness) genesisHeight() heightResult { ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound) propBlock, partSet := p.observedState.createProposalBlock() bid := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: partSet.Header()} - ensureProposal(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid) + it := p.pbtsTestConfiguration.genesisTime + ensureProposalWithTimeout(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid, it.Add(ensureTimeout).Sub(time.Now())) ensurePrevote(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) signAddVotes(p.ctx, p.observedState, tmproto.PrevoteType, p.chainID, bid, p.otherValidators...) @@ -136,9 +137,14 @@ func (p *pbtsTestHarness) genesisHeight() heightResult { ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) ensureNewBlock(p.t, p.blockCh, p.currentHeight) + + vk, err := p.observedValidator.GetPubKey(context.Background()) + require.NoError(p.t, err) + res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address()) + p.currentHeight++ incrementHeight(p.otherValidators...) - return <-p.resultCh + return res } func (p *pbtsTestHarness) height2() heightResult { @@ -182,51 +188,90 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, signAddVotes(p.ctx, p.observedState, tmproto.PrecommitType, p.chainID, bid, p.otherValidators...) ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) + vk, err := p.observedValidator.GetPubKey(context.Background()) + require.NoError(p.t, err) + res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address()) + p.currentHeight++ incrementHeight(p.otherValidators...) - return <-p.resultCh + return res +} + +func timestampedCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus) <-chan timestampedEvent { + t.Helper() + eventCh := make(chan timestampedEvent, 100) + if err := eb.Observe(ctx, func(msg tmpubsub.Message) error { + eventCh <- timestampedEvent{ + ts: time.Now(), + m: msg, + } + return nil + }, types.EventQueryVote, types.EventQueryCompleteProposal); err != nil { + t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err) + } + return eventCh } // nolint: lll -func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult { +func collectHeightResults(ctx context.Context, t *testing.T, eventCh <-chan timestampedEvent, height int64, address []byte) heightResult { t.Helper() - resultCh := make(chan heightResult, 2) var res heightResult - if err := eb.Observe(ctx, func(msg tmpubsub.Message) error { - ts := time.Now() - vote := msg.Data().(types.EventDataVote) - // we only fire for our own votes - if !bytes.Equal(address, vote.Vote.ValidatorAddress) { - return nil +LOOP: + for event := range eventCh { + switch v := event.m.Data().(type) { + case types.EventDataVote: + if v.Vote.Height > height { + break LOOP + } + if !bytes.Equal(address, v.Vote.ValidatorAddress) { + continue + } + if v.Vote.Type != tmproto.PrevoteType { + continue + } + res.prevote = v.Vote + res.prevoteIssuedAt = event.ts + + case types.EventDataCompleteProposal: + if v.Height > height { + break LOOP + } + res.proposalIssuedAt = event.ts } - if vote.Vote.Type != tmproto.PrevoteType { - return nil + if res.isComplete() { + break } - res.prevoteIssuedAt = ts - res.prevote = vote.Vote - resultCh <- res - return nil - }, types.EventQueryVote); err != nil { - t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err) } - return resultCh + return res +} + +type timestampedEvent struct { + ts time.Time + m tmpubsub.Message } func (p *pbtsTestHarness) run() resultSet { - p.genesisHeight() + r1 := p.genesisHeight() r2 := p.height2() return resultSet{ - height2: r2, + genesisHeight: r1, + height2: r2, } } type resultSet struct { - height2 heightResult + genesisHeight heightResult + height2 heightResult } type heightResult struct { - prevote *types.Vote - prevoteIssuedAt time.Time + proposalIssuedAt time.Time + prevote *types.Vote + prevoteIssuedAt time.Time +} + +func (hr heightResult) isComplete() bool { + return !hr.proposalIssuedAt.IsZero() && !hr.prevoteIssuedAt.IsZero() && hr.prevote != nil } // TestReceiveProposalWaitsForPreviousBlockTime tests that a validator receiving @@ -297,6 +342,54 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) { assert.Nil(t, results.height2.prevote.BlockID.Hash) } +func TestProposerWaitsForGenesisTime(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initialTime := time.Now().Add(800 * time.Millisecond) + cfg := pbtsTestConfiguration{ + timingParams: types.TimingParams{ + Precision: 10 * time.Millisecond, + MessageDelay: 10 * time.Millisecond, + }, + timeoutPropose: 10 * time.Millisecond, + genesisTime: initialTime, + height2ProposalDeliverTime: initialTime.Add(10 * time.Millisecond), + height2ProposedBlockTime: initialTime.Add(10 * time.Millisecond), + } + + pbtsTest := newPBTSTestHarness(ctx, t, cfg) + results := pbtsTest.run() + + assert.True(t, results.genesisHeight.proposalIssuedAt.After(cfg.genesisTime)) +} + +/* +func TestProposeStepWaitsForPreviousBlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initialTime := time.Now() + cfg := pbtsTestConfiguration{ + timingParams: types.TimingParams{ + Precision: 100 * time.Millisecond, + MessageDelay: 500 * time.Millisecond, + }, + timeoutPropose: 50 * time.Millisecond, + genesisTime: initialTime, + } + + pbtsTest := newPBTSTestHarness(ctx, t, cfg) + results := pbtsTest.run() + + // Check that the validator waited until after the proposer-based timestamp + // waitinTime bound. + maxWaitingTime := initialTime.Add(cfg.timingParams.Precision).Add(cfg.timingParams.MessageDelay) + assert.True(t, results.height2.prevoteIssuedAt.After(maxWaitingTime)) + + // Ensure that the validator issued a prevote for nil. + assert.Nil(t, results.height2.prevote.BlockID.Hash) +} +*/ + func TestProposerWaitTime(t *testing.T) { genesisTime, err := time.Parse(time.RFC3339, "2019-03-13T23:00:00Z") require.NoError(t, err)