Allow for selective subscription by vote type

This is an attempt to fix the intermittently failing
`TestPrepareProposalReceivesVoteExtensions` test in the internal
consensus package.

Occasionally we get prevote messages via the subscription channel, and
we're not interested in those. This change allows us to specify what
types of votes we're interested in (i.e. precommits) and discard the
rest.

Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
Thane Thomson
2022-04-11 20:35:45 -04:00
parent 3124ac5c72
commit e7523c430a
2 changed files with 17 additions and 6 deletions

View File

@@ -366,18 +366,29 @@ func validatePrecommit(
}
}
func subscribeToVoter(ctx context.Context, t *testing.T, cs *State, addr []byte) <-chan tmpubsub.Message {
// subscribeToVoter allows one to subscribe to vote messages from the validator
// associated with the given address. If supplied, only the specified vote
// types will be published via the return channel; if not supplied, all vote
// types will be published via the return channel.
func subscribeToVoter(ctx context.Context, t *testing.T, cs *State, addr []byte, voteTypes ...tmproto.SignedMsgType) <-chan tmpubsub.Message {
t.Helper()
vt := make(map[tmproto.SignedMsgType]struct{})
for _, t := range voteTypes {
vt[t] = struct{}{}
}
ch := make(chan tmpubsub.Message, 1)
if err := cs.eventBus.Observe(ctx, func(msg tmpubsub.Message) error {
vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- msg:
if _, wanted := vt[vote.Vote.Type]; len(vt) == 0 || wanted {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- msg:
}
}
}
return nil

View File

@@ -2211,7 +2211,7 @@ func TestPrepareProposalReceivesVoteExtensions(t *testing.T) {
pv1, err := cs1.privValidator.GetPubKey(ctx)
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(ctx, t, cs1, addr)
voteCh := subscribeToVoter(ctx, t, cs1, addr, tmproto.PrecommitType)
// ensure that the height is committed.
ensurePrecommit(t, voteCh, height, round)