register result chan separately

This commit is contained in:
William Banfield
2021-11-29 18:53:45 -05:00
parent dccca05541
commit 9e4cf5bd07
3 changed files with 30 additions and 35 deletions

View File

@@ -405,13 +405,13 @@ func subscribeToVoterBuffered(ctx context.Context, t *testing.T, cs *State, addr
if err != nil {
t.Fatalf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)
}
ch := make(chan tmpubsub.Message)
ch := make(chan tmpubsub.Message, 10)
go func() {
for {
msg, err := votesSub.Next(ctx)
if err != nil {
if !errors.Is(err, pubsub.ErrTerminated) && !errors.Is(err, context.Canceled) {
t.Fatalf("error retrieving value of subscription %s", err)
t.Logf("error terminating pubsub %s", err)
}
return
}

View File

@@ -47,6 +47,8 @@ type pbtsTestHarness struct {
// channels for verifying that the observed validator completes certain actions.
ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message
resultCh <-chan heightResult
currentHeight int64
currentRound int32
@@ -99,6 +101,8 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
pubKey, err := vss[0].PrivValidator.GetPubKey(ctx)
require.NoError(t, err)
resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address())
return pbtsTestHarness{
pbtsTestConfiguration: tc,
observedValidator: vss[0],
@@ -111,12 +115,13 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal),
blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock),
ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()),
resultCh: resultCh,
t: t,
ctx: ctx,
}
}
func (p *pbtsTestHarness) genesisHeight() {
func (p *pbtsTestHarness) genesisHeight() heightResult {
p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(8)
startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound)
@@ -133,6 +138,7 @@ func (p *pbtsTestHarness) genesisHeight() {
ensureNewBlock(p.t, p.blockCh, p.currentHeight)
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
}
func (p *pbtsTestHarness) height2() heightResult {
@@ -143,9 +149,6 @@ func (p *pbtsTestHarness) height2() heightResult {
// nolint: lll
func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, proposedTime, nextProposedTime time.Time) heightResult {
p.validatorClock.On("Now").Return(nextProposedTime).Times(8)
pubKey, err := p.observedValidator.PrivValidator.GetPubKey(p.ctx)
require.NoError(p.t, err)
resultCh := collectResults(p.ctx, p.t, p.observedState.eventBus, pubKey.Address())
ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound)
@@ -181,39 +184,30 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime,
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-resultCh
return <-p.resultCh
}
func collectResults(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult {
func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult {
t.Helper()
resultCh := make(chan heightResult)
voteSub, err := eb.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: "voteSubscriber",
Query: types.EventQueryVote})
require.NoError(t, err)
go func() {
var res heightResult
for {
voteMsg, err := voteSub.Next(ctx)
require.NoError(t, err)
ts := time.Now()
vote := voteMsg.Data().(types.EventDataVote)
if !bytes.Equal(address, vote.Vote.ValidatorAddress) {
continue
}
voteEvent, _ := voteMsg.Data().(types.EventDataVote)
if voteEvent.Vote.Type != tmproto.PrevoteType {
continue
}
res.prevoteIssuedAt = ts
res.prevote = voteEvent.Vote
break
resultCh := make(chan heightResult, 2)
var res heightResult
if err := eb.Observe(ctx, func(msg tmpubsub.Message) error {
ts := time.Now()
vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes
if !bytes.Equal(address, vote.Vote.ValidatorAddress) {
return nil
}
err := eb.UnsubscribeAll(context.Background(), "voteSubscriber")
require.NoError(t, err)
if vote.Vote.Type != tmproto.PrevoteType {
return nil
}
res.prevoteIssuedAt = ts
res.prevote = vote.Vote
resultCh <- res
close(resultCh)
}()
return nil
}, types.EventQueryVote); err != nil {
t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err)
}
return resultCh
}
@@ -259,7 +253,7 @@ func TestReceiveProposalWaitsForPreviousBlockTime(t *testing.T) {
results := pbtsTest.run()
// Check that the validator waited until after the proposer-based timestamp
// waitinTime bound.
// waitingTime bound.
assert.True(t, results.height2.prevoteIssuedAt.After(cfg.height2ProposalDeliverTime))
maxWaitingTime := cfg.genesisTime.Add(cfg.timestampParams.Precision).Add(cfg.timestampParams.MsgDelay)
assert.True(t, results.height2.prevoteIssuedAt.Before(maxWaitingTime))

View File

@@ -2529,6 +2529,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
cs, _, err := makeState(ctx, config, logger, 1)
require.NoError(t, err)
peerID, err := types.NewNodeID("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
require.NoError(t, err)
// 1) new block part
parts := types.NewPartSetFromData(tmrand.Bytes(100), 10)