From e91bac3565e2132aebe301852ad08f0292863ce2 Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Wed, 8 Dec 2021 11:23:33 -0500 Subject: [PATCH] internal/consensus: proposer waits for previous block time (#7376) This change introduces the logic to have the proposer wait until the previous block time has passed before attempting to propose the next block. The change achieves this by by adding a new clause into the enterPropose state machine method. The method now checks if the validator is the proposer and if the validator's clock is behind the previous block's time. If the validator's clock is behind the previous block time, it schedules a timeout to re-enter the enter propose method after enough time has passed. --- CHANGELOG_PENDING.md | 1 + internal/consensus/common_test.go | 12 +- internal/consensus/pbts_test.go | 224 +++++++++++++++++++++++++----- internal/consensus/state.go | 29 ++-- 4 files changed, 220 insertions(+), 46 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index cb1a79cac..72360901b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -41,6 +41,7 @@ Special thanks to external contributors on this release: - [cli] [#7033](https://github.com/tendermint/tendermint/pull/7033) Add a `rollback` command to rollback to the previous tendermint state in the event of non-determinstic app hash or reverting an upgrade. - [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish) +- [consensus] \#7376 Update the proposal logic per the Propose-based timestamps specification so that the proposer will wait for the previous block time to occur before proposing the next block. (@williambanfield) ### IMPROVEMENTS diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 072414427..550c60b2b 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -682,10 +682,17 @@ func ensureRelock(t *testing.T, relockCh <-chan tmpubsub.Message, height int64, } func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID) { - msg := ensureMessageBeforeTimeout(t, proposalCh, ensureTimeout) + ensureProposalWithTimeout(t, proposalCh, height, round, propID, ensureTimeout) +} + +// nolint: lll +func ensureProposalWithTimeout(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID, timeout time.Duration) { + t.Helper() + msg := ensureMessageBeforeTimeout(t, proposalCh, timeout) proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal) if !ok { - t.Fatalf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", msg.Data()) + t.Fatalf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", + msg.Data()) } if proposalEvent.Height != height { t.Fatalf("expected height %v, got %v", height, proposalEvent.Height) @@ -697,7 +704,6 @@ func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int t.Fatalf("Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID) } } - func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) { t.Helper() ensureVote(t, voteCh, height, round, tmproto.PrecommitType) diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index 9cf3c9cc0..c0c64a715 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -18,6 +18,12 @@ import ( "github.com/tendermint/tendermint/types" ) +const ( + // blockTimeIota is used in the test harness as the time between + // blocks when not otherwise specified. + blockTimeIota = time.Millisecond +) + // pbtsTestHarness constructs a Tendermint network that can be used for testing the // implementation of the Proposer-Based timestamps algorithm. // It runs a series of consensus heights and captures timing of votes and events. @@ -47,7 +53,9 @@ type pbtsTestHarness struct { // channels for verifying that the observed validator completes certain actions. ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message - resultCh <-chan heightResult + // channel of events from the observed validator annotated with the timestamp + // the event was received. + eventCh <-chan timestampedEvent currentHeight int64 currentRound int32 @@ -71,6 +79,11 @@ type pbtsTestConfiguration struct { // The timestamp of the block proposed at height 2. height2ProposedBlockTime time.Time + + // The timestamp of the block proposed at height 4. + // At height 4, the proposed block time and the deliver time are the same so + // that timely-ness does not affect height 4. + height4ProposedBlockTime time.Time } func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfiguration) pbtsTestHarness { @@ -78,6 +91,15 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat const validators = 4 cfg := configSetup(t) clock := new(tmtimemocks.Source) + if tc.height4ProposedBlockTime.IsZero() { + + // Set a default height4ProposedBlockTime. + // Use a proposed block time that is greater than the time that the + // block at height 2 was delivered. Height 3 is not relevant for testing + // and always occurs blockTimeIota before height 4. If not otherwise specified, + // height 4 therefore occurs 2*blockTimeIota after height 2. + tc.height4ProposedBlockTime = tc.height2ProposalDeliverTime.Add(2 * blockTimeIota) + } cfg.Consensus.TimeoutPropose = tc.timeoutPropose consensusParams := types.DefaultConsensusParams() consensusParams.Timing = tc.timingParams @@ -101,7 +123,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat pubKey, err := vss[0].PrivValidator.GetPubKey(ctx) require.NoError(t, err) - resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address()) + eventCh := timestampedCollector(ctx, t, cs.eventBus) return pbtsTestHarness{ pbtsTestConfiguration: tc, @@ -115,20 +137,21 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal), blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock), ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()), - resultCh: resultCh, + eventCh: eventCh, t: t, ctx: ctx, } } -func (p *pbtsTestHarness) genesisHeight() heightResult { - p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(8) +func (p *pbtsTestHarness) observedValidatorProposerHeight(previousBlockTime time.Time) heightResult { + p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(6) - startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound) ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound) propBlock, partSet := p.observedState.createProposalBlock() bid := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: partSet.Header()} - ensureProposal(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid) + + timeout := time.Until(previousBlockTime.Add(ensureTimeout)) + ensureProposalWithTimeout(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid, timeout) ensurePrevote(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) signAddVotes(p.ctx, p.observedState, tmproto.PrevoteType, p.chainID, bid, p.otherValidators...) @@ -136,19 +159,38 @@ func (p *pbtsTestHarness) genesisHeight() heightResult { ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) ensureNewBlock(p.t, p.blockCh, p.currentHeight) + + vk, err := p.observedValidator.GetPubKey(context.Background()) + require.NoError(p.t, err) + res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address()) + p.currentHeight++ incrementHeight(p.otherValidators...) - return <-p.resultCh + return res } func (p *pbtsTestHarness) height2() heightResult { signer := p.otherValidators[0].PrivValidator - return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, time.Now()) + height3BlockTime := p.height2ProposedBlockTime.Add(-blockTimeIota) + return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, height3BlockTime) +} + +func (p *pbtsTestHarness) intermediateHeights() { + signer := p.otherValidators[1].PrivValidator + blockTimeHeight3 := p.height4ProposedBlockTime.Add(-blockTimeIota) + p.nextHeight(signer, blockTimeHeight3, blockTimeHeight3, p.height4ProposedBlockTime) + + signer = p.otherValidators[2].PrivValidator + p.nextHeight(signer, p.height4ProposedBlockTime, p.height4ProposedBlockTime, time.Now()) +} + +func (p *pbtsTestHarness) height5() heightResult { + return p.observedValidatorProposerHeight(p.height4ProposedBlockTime) } // nolint: lll func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, proposedTime, nextProposedTime time.Time) heightResult { - p.validatorClock.On("Now").Return(nextProposedTime).Times(8) + p.validatorClock.On("Now").Return(nextProposedTime).Times(6) ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound) @@ -182,51 +224,104 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, signAddVotes(p.ctx, p.observedState, tmproto.PrecommitType, p.chainID, bid, p.otherValidators...) ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) + vk, err := p.observedValidator.GetPubKey(context.Background()) + require.NoError(p.t, err) + res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address()) + ensureNewBlock(p.t, p.blockCh, p.currentHeight) + p.currentHeight++ incrementHeight(p.otherValidators...) - return <-p.resultCh + return res +} + +func timestampedCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus) <-chan timestampedEvent { + t.Helper() + + // Since eventCh is not read until the end of each height, it must be large + // enough to hold all of the events produced during a single height. + eventCh := make(chan timestampedEvent, 100) + + if err := eb.Observe(ctx, func(msg tmpubsub.Message) error { + eventCh <- timestampedEvent{ + ts: time.Now(), + m: msg, + } + return nil + }, types.EventQueryVote, types.EventQueryCompleteProposal); err != nil { + t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err) + } + return eventCh } // nolint: lll -func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult { +func collectHeightResults(ctx context.Context, t *testing.T, eventCh <-chan timestampedEvent, height int64, address []byte) heightResult { t.Helper() - resultCh := make(chan heightResult, 2) var res heightResult - if err := eb.Observe(ctx, func(msg tmpubsub.Message) error { - ts := time.Now() - vote := msg.Data().(types.EventDataVote) - // we only fire for our own votes - if !bytes.Equal(address, vote.Vote.ValidatorAddress) { - return nil + for event := range eventCh { + switch v := event.m.Data().(type) { + case types.EventDataVote: + if v.Vote.Height > height { + t.Fatalf("received prevote from unexpected height, expected: %d, saw: %d", height, v.Vote.Height) + } + if !bytes.Equal(address, v.Vote.ValidatorAddress) { + continue + } + if v.Vote.Type != tmproto.PrevoteType { + continue + } + res.prevote = v.Vote + res.prevoteIssuedAt = event.ts + + case types.EventDataCompleteProposal: + if v.Height > height { + t.Fatalf("received proposal from unexpected height, expected: %d, saw: %d", height, v.Height) + } + res.proposalIssuedAt = event.ts } - if vote.Vote.Type != tmproto.PrevoteType { - return nil + if res.isComplete() { + return res } - res.prevoteIssuedAt = ts - res.prevote = vote.Vote - resultCh <- res - return nil - }, types.EventQueryVote); err != nil { - t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err) } - return resultCh + t.Fatalf("complete height result never seen for height %d", height) + + panic("unreachable") +} + +type timestampedEvent struct { + ts time.Time + m tmpubsub.Message } func (p *pbtsTestHarness) run() resultSet { - p.genesisHeight() + startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound) + + r1 := p.observedValidatorProposerHeight(p.genesisTime) r2 := p.height2() + p.intermediateHeights() + r5 := p.height5() + err := p.observedState.Stop() + require.NoError(p.t, err) return resultSet{ - height2: r2, + genesisHeight: r1, + height2: r2, + height5: r5, } } type resultSet struct { - height2 heightResult + genesisHeight heightResult + height2 heightResult + height5 heightResult } type heightResult struct { - prevote *types.Vote - prevoteIssuedAt time.Time + proposalIssuedAt time.Time + prevote *types.Vote + prevoteIssuedAt time.Time +} + +func (hr heightResult) isComplete() bool { + return !hr.proposalIssuedAt.IsZero() && !hr.prevoteIssuedAt.IsZero() && hr.prevote != nil } // TestReceiveProposalWaitsForPreviousBlockTime tests that a validator receiving @@ -281,7 +376,7 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) { }, timeoutPropose: 50 * time.Millisecond, genesisTime: initialTime, - height2ProposalDeliverTime: initialTime.Add(610 * time.Millisecond), + height2ProposalDeliverTime: initialTime.Add(660 * time.Millisecond), height2ProposedBlockTime: initialTime.Add(350 * time.Millisecond), } @@ -297,6 +392,67 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) { assert.Nil(t, results.height2.prevote.BlockID.Hash) } +// TestProposerWaitsForGenesisTime tests that a proposer will not propose a block +// until after the genesis time has passed. The test sets the genesis time in the +// future and then ensures that the observed validator waits to propose a block. +func TestProposerWaitsForGenesisTime(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create a genesis time far (enough) in the future. + initialTime := time.Now().Add(800 * time.Millisecond) + cfg := pbtsTestConfiguration{ + timestampParams: types.TimestampParams{ + Precision: 10 * time.Millisecond, + MsgDelay: 10 * time.Millisecond, + }, + timeoutPropose: 10 * time.Millisecond, + genesisTime: initialTime, + height2ProposalDeliverTime: initialTime.Add(10 * time.Millisecond), + height2ProposedBlockTime: initialTime.Add(10 * time.Millisecond), + } + + pbtsTest := newPBTSTestHarness(ctx, t, cfg) + results := pbtsTest.run() + + // ensure that the proposal was issued after the genesis time. + assert.True(t, results.genesisHeight.proposalIssuedAt.After(cfg.genesisTime)) +} + +// TestProposerWaitsForPreviousBlock tests that the proposer of a block waits until +// the block time of the previous height has passed to propose the next block. +// The test harness ensures that the observed validator will be the proposer at +// height 1 and height 5. The test sets the block time of height 4 in the future +// and then verifies that the observed validator waits until after the block time +// of height 4 to propose a block at height 5. +func TestProposerWaitsForPreviousBlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initialTime := time.Now().Add(time.Millisecond * 50) + cfg := pbtsTestConfiguration{ + timestampParams: types.TimestampParams{ + Precision: 100 * time.Millisecond, + MsgDelay: 500 * time.Millisecond, + }, + timeoutPropose: 50 * time.Millisecond, + genesisTime: initialTime, + height2ProposalDeliverTime: initialTime.Add(150 * time.Millisecond), + height2ProposedBlockTime: initialTime.Add(100 * time.Millisecond), + height4ProposedBlockTime: initialTime.Add(800 * time.Millisecond), + } + + pbtsTest := newPBTSTestHarness(ctx, t, cfg) + results := pbtsTest.run() + + // the observed validator is the proposer at height 5. + // ensure that the observed validator did not propose a block until after + // the time configured for height 4. + assert.True(t, results.height5.proposalIssuedAt.After(cfg.height4ProposedBlockTime)) + + // Ensure that the validator issued a prevote for a non-nil block. + assert.NotNil(t, results.height5.prevote.BlockID.Hash) +} + func TestProposerWaitTime(t *testing.T) { genesisTime, err := time.Parse(time.RFC3339, "2019-03-13T23:00:00Z") require.NoError(t, err) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 28bba333e..29e72d234 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1069,9 +1069,10 @@ func (cs *State) enterNewRound(height int64, round int32) { cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round, cstypes.RoundStepNewRound) } - } else { - cs.enterPropose(height, round) + return } + + cs.enterPropose(height, round) } // needProofBlock returns true on the first height (so the genesis app hash is signed right away) @@ -1104,6 +1105,16 @@ func (cs *State) enterPropose(height int64, round int32) { return } + // If this validator is the proposer of this round, and the previous block time is later than + // our local clock time, wait to propose until our local clock time has passed the block time. + if cs.privValidatorPubKey != nil && cs.isProposer(cs.privValidatorPubKey.Address()) { + proposerWaitTime := proposerWaitTime(tmtime.DefaultSource{}, cs.state.LastBlockTime) + if proposerWaitTime > 0 { + cs.scheduleTimeout(proposerWaitTime, height, round, cstypes.RoundStepNewRound) + return + } + } + logger.Debug("entering propose step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) defer func() { @@ -1131,8 +1142,6 @@ func (cs *State) enterPropose(height int64, round int32) { return } - logger.Debug("node is a validator") - if cs.privValidatorPubKey == nil { // If this node is a validator & proposer in the current round, it will // miss the opportunity to create a block. @@ -1140,18 +1149,20 @@ func (cs *State) enterPropose(height int64, round int32) { return } - address := cs.privValidatorPubKey.Address() + addr := cs.privValidatorPubKey.Address() // if not a validator, we're done - if !cs.Validators.HasAddress(address) { - logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators) + if !cs.Validators.HasAddress(addr) { + logger.Debug("node is not a validator", "addr", addr, "vals", cs.Validators) return } - if cs.isProposer(address) { + logger.Debug("node is a validator") + + if cs.isProposer(addr) { logger.Debug( "propose step; our turn to propose", - "proposer", address, + "proposer", addr, ) cs.decideProposal(height, round)