internal/consensus: add failing tests for proposer wait

This commit is contained in:
William Banfield
2021-11-30 18:54:42 -05:00
parent a9aab99b41
commit 9e76e604dd
2 changed files with 143 additions and 28 deletions

View File

@@ -698,6 +698,28 @@ func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int
}
}
func ensureProposalWithTimeout(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID, timeout time.Duration) {
t.Helper()
select {
case <-time.After(timeout):
t.Fatalf("Timeout expired while waiting for NewProposal event")
case msg := <-proposalCh:
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
if !ok {
t.Fatalf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
}
if proposalEvent.Height != height {
t.Fatalf("expected height %v, got %v", height, proposalEvent.Height)
}
if proposalEvent.Round != round {
t.Fatalf("expected round %v, got %v", round, proposalEvent.Round)
}
if !proposalEvent.BlockID.Equals(propID) {
t.Fatalf("Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID)
}
}
}
func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
ensureVote(t, voteCh, height, round, tmproto.PrecommitType)

View File

@@ -47,7 +47,7 @@ type pbtsTestHarness struct {
// channels for verifying that the observed validator completes certain actions.
ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message
resultCh <-chan heightResult
eventCh <-chan timestampedEvent
currentHeight int64
currentRound int32
@@ -101,7 +101,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
pubKey, err := vss[0].PrivValidator.GetPubKey(ctx)
require.NoError(t, err)
resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address())
eventCh := timestampedCollector(ctx, t, cs.eventBus)
return pbtsTestHarness{
pbtsTestConfiguration: tc,
@@ -115,7 +115,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal),
blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock),
ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()),
resultCh: resultCh,
eventCh: eventCh,
t: t,
ctx: ctx,
}
@@ -128,7 +128,8 @@ func (p *pbtsTestHarness) genesisHeight() heightResult {
ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound)
propBlock, partSet := p.observedState.createProposalBlock()
bid := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: partSet.Header()}
ensureProposal(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid)
it := p.pbtsTestConfiguration.genesisTime
ensureProposalWithTimeout(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid, it.Add(ensureTimeout).Sub(time.Now()))
ensurePrevote(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
signAddVotes(p.ctx, p.observedState, tmproto.PrevoteType, p.chainID, bid, p.otherValidators...)
@@ -136,9 +137,14 @@ func (p *pbtsTestHarness) genesisHeight() heightResult {
ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
ensureNewBlock(p.t, p.blockCh, p.currentHeight)
vk, err := p.observedValidator.GetPubKey(context.Background())
require.NoError(p.t, err)
res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address())
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
return res
}
func (p *pbtsTestHarness) height2() heightResult {
@@ -182,51 +188,90 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime,
signAddVotes(p.ctx, p.observedState, tmproto.PrecommitType, p.chainID, bid, p.otherValidators...)
ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
vk, err := p.observedValidator.GetPubKey(context.Background())
require.NoError(p.t, err)
res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address())
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
return res
}
func timestampedCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus) <-chan timestampedEvent {
t.Helper()
eventCh := make(chan timestampedEvent, 100)
if err := eb.Observe(ctx, func(msg tmpubsub.Message) error {
eventCh <- timestampedEvent{
ts: time.Now(),
m: msg,
}
return nil
}, types.EventQueryVote, types.EventQueryCompleteProposal); err != nil {
t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err)
}
return eventCh
}
// nolint: lll
func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult {
func collectHeightResults(ctx context.Context, t *testing.T, eventCh <-chan timestampedEvent, height int64, address []byte) heightResult {
t.Helper()
resultCh := make(chan heightResult, 2)
var res heightResult
if err := eb.Observe(ctx, func(msg tmpubsub.Message) error {
ts := time.Now()
vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes
if !bytes.Equal(address, vote.Vote.ValidatorAddress) {
return nil
LOOP:
for event := range eventCh {
switch v := event.m.Data().(type) {
case types.EventDataVote:
if v.Vote.Height > height {
break LOOP
}
if !bytes.Equal(address, v.Vote.ValidatorAddress) {
continue
}
if v.Vote.Type != tmproto.PrevoteType {
continue
}
res.prevote = v.Vote
res.prevoteIssuedAt = event.ts
case types.EventDataCompleteProposal:
if v.Height > height {
break LOOP
}
res.proposalIssuedAt = event.ts
}
if vote.Vote.Type != tmproto.PrevoteType {
return nil
if res.isComplete() {
break
}
res.prevoteIssuedAt = ts
res.prevote = vote.Vote
resultCh <- res
return nil
}, types.EventQueryVote); err != nil {
t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err)
}
return resultCh
return res
}
type timestampedEvent struct {
ts time.Time
m tmpubsub.Message
}
func (p *pbtsTestHarness) run() resultSet {
p.genesisHeight()
r1 := p.genesisHeight()
r2 := p.height2()
return resultSet{
height2: r2,
genesisHeight: r1,
height2: r2,
}
}
type resultSet struct {
height2 heightResult
genesisHeight heightResult
height2 heightResult
}
type heightResult struct {
prevote *types.Vote
prevoteIssuedAt time.Time
proposalIssuedAt time.Time
prevote *types.Vote
prevoteIssuedAt time.Time
}
func (hr heightResult) isComplete() bool {
return !hr.proposalIssuedAt.IsZero() && !hr.prevoteIssuedAt.IsZero() && hr.prevote != nil
}
// TestReceiveProposalWaitsForPreviousBlockTime tests that a validator receiving
@@ -297,6 +342,54 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) {
assert.Nil(t, results.height2.prevote.BlockID.Hash)
}
func TestProposerWaitsForGenesisTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initialTime := time.Now().Add(800 * time.Millisecond)
cfg := pbtsTestConfiguration{
timingParams: types.TimingParams{
Precision: 10 * time.Millisecond,
MessageDelay: 10 * time.Millisecond,
},
timeoutPropose: 10 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(10 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(10 * time.Millisecond),
}
pbtsTest := newPBTSTestHarness(ctx, t, cfg)
results := pbtsTest.run()
assert.True(t, results.genesisHeight.proposalIssuedAt.After(cfg.genesisTime))
}
/*
func TestProposeStepWaitsForPreviousBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initialTime := time.Now()
cfg := pbtsTestConfiguration{
timingParams: types.TimingParams{
Precision: 100 * time.Millisecond,
MessageDelay: 500 * time.Millisecond,
},
timeoutPropose: 50 * time.Millisecond,
genesisTime: initialTime,
}
pbtsTest := newPBTSTestHarness(ctx, t, cfg)
results := pbtsTest.run()
// Check that the validator waited until after the proposer-based timestamp
// waitinTime bound.
maxWaitingTime := initialTime.Add(cfg.timingParams.Precision).Add(cfg.timingParams.MessageDelay)
assert.True(t, results.height2.prevoteIssuedAt.After(maxWaitingTime))
// Ensure that the validator issued a prevote for nil.
assert.Nil(t, results.height2.prevote.BlockID.Hash)
}
*/
func TestProposerWaitTime(t *testing.T) {
genesisTime, err := time.Parse(time.RFC3339, "2019-03-13T23:00:00Z")
require.NoError(t, err)