internal/consensus: refactor ensure functions to use a common function (#7373)

* internal/consensus: refactor the common_test functions to use a single timeout function

* remove ensurePrecommit

* Update internal/consensus/common_test.go

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>

* join lines for fatal messages

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
This commit is contained in:
William Banfield
2021-12-02 19:09:02 -05:00
committed by William Banfield
parent e7f77049eb
commit 315ee5aac5
2 changed files with 70 additions and 103 deletions

View File

@@ -541,7 +541,8 @@ func makeState(ctx context.Context, t *testing.T, cfg *config.Config, logger log
//-------------------------------------------------------------------------------
func ensureNoNewEvent(t *testing.T, ch <-chan tmpubsub.Message, timeout time.Duration, errorMessage string) {
func ensureNoMessageBeforeTimeout(t *testing.T, ch <-chan tmpubsub.Message, timeout time.Duration,
errorMessage string) {
t.Helper()
select {
case <-time.After(timeout):
@@ -553,7 +554,8 @@ func ensureNoNewEvent(t *testing.T, ch <-chan tmpubsub.Message, timeout time.Dur
func ensureNoNewEventOnChannel(t *testing.T, ch <-chan tmpubsub.Message) {
t.Helper()
ensureNoNewEvent(t,
ensureNoMessageBeforeTimeout(
t,
ch,
ensureTimeout,
"We should be stuck waiting, not receiving new event on the channel")
@@ -561,7 +563,7 @@ func ensureNoNewEventOnChannel(t *testing.T, ch <-chan tmpubsub.Message) {
func ensureNoNewRoundStep(t *testing.T, stepCh <-chan tmpubsub.Message) {
t.Helper()
ensureNoNewEvent(
ensureNoMessageBeforeTimeout(
t,
stepCh,
ensureTimeout,
@@ -571,127 +573,98 @@ func ensureNoNewRoundStep(t *testing.T, stepCh <-chan tmpubsub.Message) {
func ensureNoNewTimeout(t *testing.T, stepCh <-chan tmpubsub.Message, timeout int64) {
t.Helper()
timeoutDuration := time.Duration(timeout*10) * time.Nanosecond
ensureNoNewEvent(t,
ensureNoMessageBeforeTimeout(
t,
stepCh,
timeoutDuration,
"We should be stuck waiting, not receiving NewTimeout event")
}
func ensureNewEvent(t *testing.T, ch <-chan tmpubsub.Message, height int64, round int32, timeout time.Duration, errorMessage string) {
func ensureNewEvent(t *testing.T, ch <-chan tmpubsub.Message, height int64, round int32, timeout time.Duration) {
t.Helper()
select {
case <-time.After(timeout):
t.Fatal(errorMessage)
case msg := <-ch:
roundStateEvent, ok := msg.Data().(types.EventDataRoundState)
require.True(t, ok,
"expected a EventDataRoundState, got %T. Wrong subscription channel?",
msg.Data())
msg := ensureMessageBeforeTimeout(t, ch, ensureTimeout)
roundStateEvent, ok := msg.Data().(types.EventDataRoundState)
require.True(t, ok,
"expected a EventDataRoundState, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, roundStateEvent.Height)
require.Equal(t, round, roundStateEvent.Round)
// TODO: We could check also for a step at this point!
}
require.Equal(t, height, roundStateEvent.Height)
require.Equal(t, round, roundStateEvent.Round)
// TODO: We could check also for a step at this point!
}
func ensureNewRound(t *testing.T, roundCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for NewRound event")
case msg := <-roundCh:
newRoundEvent, ok := msg.Data().(types.EventDataNewRound)
require.True(t, ok, "expected a EventDataNewRound, got %T. Wrong subscription channel?",
msg.Data())
msg := ensureMessageBeforeTimeout(t, roundCh, ensureTimeout)
newRoundEvent, ok := msg.Data().(types.EventDataNewRound)
require.True(t, ok, "expected a EventDataNewRound, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, newRoundEvent.Height)
require.Equal(t, round, newRoundEvent.Round)
}
require.Equal(t, height, newRoundEvent.Height)
require.Equal(t, round, newRoundEvent.Round)
}
func ensureNewTimeout(t *testing.T, timeoutCh <-chan tmpubsub.Message, height int64, round int32, timeout int64) {
t.Helper()
timeoutDuration := time.Duration(timeout*10) * time.Nanosecond
ensureNewEvent(t, timeoutCh, height, round, timeoutDuration,
"Timeout expired while waiting for NewTimeout event")
ensureNewEvent(t, timeoutCh, height, round, timeoutDuration)
}
func ensureNewProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for NewProposal event")
case msg := <-proposalCh:
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
require.True(t, ok, "expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, proposalEvent.Height)
require.Equal(t, round, proposalEvent.Round)
}
msg := ensureMessageBeforeTimeout(t, proposalCh, ensureTimeout)
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
require.True(t, ok, "expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, proposalEvent.Height)
require.Equal(t, round, proposalEvent.Round)
}
func ensureNewValidBlock(t *testing.T, validBlockCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
ensureNewEvent(t, validBlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewValidBlock event")
ensureNewEvent(t, validBlockCh, height, round, ensureTimeout)
}
func ensureNewBlock(t *testing.T, blockCh <-chan tmpubsub.Message, height int64) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for NewBlock event")
case msg := <-blockCh:
blockEvent, ok := msg.Data().(types.EventDataNewBlock)
require.True(t, ok, "expected a EventDataNewBlock, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, blockEvent.Block.Height)
}
msg := ensureMessageBeforeTimeout(t, blockCh, ensureTimeout)
blockEvent, ok := msg.Data().(types.EventDataNewBlock)
require.True(t, ok, "expected a EventDataNewBlock, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, blockEvent.Block.Height)
}
func ensureNewBlockHeader(t *testing.T, blockCh <-chan tmpubsub.Message, height int64, blockHash tmbytes.HexBytes) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for NewBlockHeader event")
case msg := <-blockCh:
blockHeaderEvent, ok := msg.Data().(types.EventDataNewBlockHeader)
require.True(t, ok, "expected a EventDataNewBlockHeader, got %T. Wrong subscription channel?",
msg.Data())
msg := ensureMessageBeforeTimeout(t, blockCh, ensureTimeout)
blockHeaderEvent, ok := msg.Data().(types.EventDataNewBlockHeader)
require.True(t, ok, "expected a EventDataNewBlockHeader, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, blockHeaderEvent.Header.Height)
require.True(t, bytes.Equal(blockHeaderEvent.Header.Hash(), blockHash))
}
require.Equal(t, height, blockHeaderEvent.Header.Height)
require.True(t, bytes.Equal(blockHeaderEvent.Header.Hash(), blockHash))
}
func ensureLock(t *testing.T, lockCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
ensureNewEvent(t, lockCh, height, round, ensureTimeout,
"Timeout expired while waiting for LockValue event")
ensureNewEvent(t, lockCh, height, round, ensureTimeout)
}
func ensureRelock(t *testing.T, relockCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
ensureNewEvent(t, relockCh, height, round, ensureTimeout,
"Timeout expired while waiting for RelockValue event")
ensureNewEvent(t, relockCh, height, round, ensureTimeout)
}
func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for NewProposal event")
case msg := <-proposalCh:
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
require.True(t, ok, "expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, proposalEvent.Height)
require.Equal(t, round, proposalEvent.Round)
require.True(t, proposalEvent.BlockID.Equals(propID),
"Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID)
}
msg := ensureMessageBeforeTimeout(t, proposalCh, ensureTimeout)
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
require.True(t, ok, "expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
require.Equal(t, height, proposalEvent.Height)
require.Equal(t, round, proposalEvent.Round)
require.True(t, proposalEvent.BlockID.Equals(propID),
"Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID)
}
func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) {
@@ -706,38 +679,32 @@ func ensurePrevote(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, r
func ensureVote(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32, voteType tmproto.SignedMsgType) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for NewVote event")
case msg := <-voteCh:
voteEvent, ok := msg.Data().(types.EventDataVote)
require.True(t, ok, "expected a EventDataVote, got %T. Wrong subscription channel?",
msg.Data())
msg := ensureMessageBeforeTimeout(t, voteCh, ensureTimeout)
voteEvent, ok := msg.Data().(types.EventDataVote)
require.True(t, ok, "expected a EventDataVote, got %T. Wrong subscription channel?",
msg.Data())
vote := voteEvent.Vote
require.Equal(t, height, vote.Height)
require.Equal(t, round, vote.Round)
vote := voteEvent.Vote
require.Equal(t, height, vote.Height)
require.Equal(t, round, vote.Round)
require.Equal(t, voteType, vote.Type)
}
}
func ensurePrecommitTimeout(t *testing.T, ch <-chan tmpubsub.Message) {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for the Precommit to Timeout")
case <-ch:
}
require.Equal(t, voteType, vote.Type)
}
func ensureNewEventOnChannel(t *testing.T, ch <-chan tmpubsub.Message) {
t.Helper()
ensureMessageBeforeTimeout(t, ch, ensureTimeout)
}
func ensureMessageBeforeTimeout(t *testing.T, ch <-chan tmpubsub.Message, to time.Duration) tmpubsub.Message {
t.Helper()
select {
case <-time.After(ensureTimeout):
t.Fatal("Timeout expired while waiting for new activity on the channel")
case <-ch:
case <-time.After(to):
t.Fatalf("Timeout expired while waiting for message")
case msg := <-ch:
return msg
}
panic("unreachable")
}
//-------------------------------------------------------------------------------

View File

@@ -2278,7 +2278,7 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
signAddVotes(ctx, t, cs1, tmproto.PrecommitType, config.ChainID(), blockID, vs3)
// wait till timeout occurs
ensurePrecommitTimeout(t, precommitTimeoutCh)
ensureNewTimeout(t, precommitTimeoutCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds())
ensureNewRound(t, newRoundCh, height, round+1)