diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1609e9755..354163ee8 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -47,6 +47,7 @@ Special thanks to external contributors on this release: - [rpc] [\#7270](https://github.com/tendermint/tendermint/pull/7270) Add `header` and `header_by_hash` RPC Client queries. (@fedekunze) - [cli] [#7033](https://github.com/tendermint/tendermint/pull/7033) Add a `rollback` command to rollback to the previous tendermint state in the event of non-determinstic app hash or reverting an upgrade. - [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish) +- [consensus] \#7376 Update the proposal logic per the Propose-based timestamps specification so that the proposer will wait for the previous block time to occur before proposing the next block. (@williambanfield) ### IMPROVEMENTS - [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 6417909c2..4cf80cf15 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -40,7 +40,7 @@ const ( testSubscriber = "test-client" // genesis, chain_id, priv_val - ensureTimeout = time.Millisecond * 200 + ensureTimeout = time.Millisecond * 500 ) // A cleanupFunc cleans up any config / test files created for a particular @@ -656,8 +656,13 @@ func ensureRelock(t *testing.T, relockCh <-chan tmpubsub.Message, height int64, } func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID) { + ensureProposalWithTimeout(t, proposalCh, height, round, propID, ensureTimeout) +} + +// nolint: lll +func ensureProposalWithTimeout(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID, timeout time.Duration) { t.Helper() - msg := ensureMessageBeforeTimeout(t, proposalCh, ensureTimeout) + msg := ensureMessageBeforeTimeout(t, proposalCh, timeout) proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal) require.True(t, ok, "expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", msg.Data()) @@ -666,7 +671,6 @@ func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int require.True(t, proposalEvent.BlockID.Equals(propID), "Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID) } - func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) { t.Helper() ensureVote(t, voteCh, height, round, tmproto.PrecommitType) diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index e4e271051..d991f43f2 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -18,6 +18,12 @@ import ( "github.com/tendermint/tendermint/types" ) +const ( + // blockTimeIota is used in the test harness as the time between + // blocks when not otherwise specified. + blockTimeIota = time.Millisecond +) + // pbtsTestHarness constructs a Tendermint network that can be used for testing the // implementation of the Proposer-Based timestamps algorithm. // It runs a series of consensus heights and captures timing of votes and events. @@ -47,7 +53,9 @@ type pbtsTestHarness struct { // channels for verifying that the observed validator completes certain actions. ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message - resultCh <-chan heightResult + // channel of events from the observed validator annotated with the timestamp + // the event was received. + eventCh <-chan timestampedEvent currentHeight int64 currentRound int32 @@ -71,6 +79,11 @@ type pbtsTestConfiguration struct { // The timestamp of the block proposed at height 2. height2ProposedBlockTime time.Time + + // The timestamp of the block proposed at height 4. + // At height 4, the proposed block time and the deliver time are the same so + // that timely-ness does not affect height 4. + height4ProposedBlockTime time.Time } func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfiguration) pbtsTestHarness { @@ -78,6 +91,15 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat const validators = 4 cfg := configSetup(t) clock := new(tmtimemocks.Source) + if tc.height4ProposedBlockTime.IsZero() { + + // Set a default height4ProposedBlockTime. + // Use a proposed block time that is greater than the time that the + // block at height 2 was delivered. Height 3 is not relevant for testing + // and always occurs blockTimeIota before height 4. If not otherwise specified, + // height 4 therefore occurs 2*blockTimeIota after height 2. + tc.height4ProposedBlockTime = tc.height2ProposalDeliverTime.Add(2 * blockTimeIota) + } cfg.Consensus.TimeoutPropose = tc.timeoutPropose consensusParams := types.DefaultConsensusParams() consensusParams.Timing = tc.timingParams @@ -87,7 +109,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat Time: tc.genesisTime, Validators: validators, }) - cs := newState(ctx, t, log.TestingLogger(), state, privVals[0], kvstore.NewApplication()) + cs := newState(ctx, t, log.NewTestingLogger(t), state, privVals[0], kvstore.NewApplication()) vss := make([]*validatorStub, validators) for i := 0; i < validators; i++ { vss[i] = newValidatorStub(privVals[i], int32(i)) @@ -100,7 +122,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat pubKey, err := vss[0].PrivValidator.GetPubKey(ctx) require.NoError(t, err) - resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address()) + eventCh := timestampedCollector(ctx, t, cs.eventBus) return pbtsTestHarness{ pbtsTestConfiguration: tc, @@ -114,21 +136,22 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal), blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock), ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()), - resultCh: resultCh, + eventCh: eventCh, t: t, ctx: ctx, } } -func (p *pbtsTestHarness) genesisHeight() heightResult { - p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(8) +func (p *pbtsTestHarness) observedValidatorProposerHeight(previousBlockTime time.Time) heightResult { + p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(6) - startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound) ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound) propBlock, partSet, err := p.observedState.createProposalBlock() require.NoError(p.t, err) bid := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: partSet.Header()} - ensureProposal(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid) + + timeout := time.Until(previousBlockTime.Add(ensureTimeout)) + ensureProposalWithTimeout(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid, timeout) ensurePrevote(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) signAddVotes(p.ctx, p.t, p.observedState, tmproto.PrevoteType, p.chainID, bid, p.otherValidators...) @@ -136,19 +159,37 @@ func (p *pbtsTestHarness) genesisHeight() heightResult { ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) ensureNewBlock(p.t, p.blockCh, p.currentHeight) + + vk, err := p.observedValidator.GetPubKey(context.Background()) + require.NoError(p.t, err) + res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address()) + p.currentHeight++ incrementHeight(p.otherValidators...) - return <-p.resultCh + return res } func (p *pbtsTestHarness) height2() heightResult { signer := p.otherValidators[0].PrivValidator - return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, time.Now()) + height3BlockTime := p.height2ProposedBlockTime.Add(-blockTimeIota) + return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, height3BlockTime) +} + +func (p *pbtsTestHarness) intermediateHeights() { + signer := p.otherValidators[1].PrivValidator + blockTimeHeight3 := p.height4ProposedBlockTime.Add(-blockTimeIota) + p.nextHeight(signer, blockTimeHeight3, blockTimeHeight3, p.height4ProposedBlockTime) + + signer = p.otherValidators[2].PrivValidator + p.nextHeight(signer, p.height4ProposedBlockTime, p.height4ProposedBlockTime, time.Now()) +} + +func (p *pbtsTestHarness) height5() heightResult { + return p.observedValidatorProposerHeight(p.height4ProposedBlockTime) } -// nolint: lll func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, proposedTime, nextProposedTime time.Time) heightResult { - p.validatorClock.On("Now").Return(nextProposedTime).Times(8) + p.validatorClock.On("Now").Return(nextProposedTime).Times(6) ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound) @@ -184,51 +225,103 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, signAddVotes(p.ctx, p.t, p.observedState, tmproto.PrecommitType, p.chainID, bid, p.otherValidators...) ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound) + vk, err := p.observedValidator.GetPubKey(context.Background()) + require.NoError(p.t, err) + res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address()) + ensureNewBlock(p.t, p.blockCh, p.currentHeight) + p.currentHeight++ incrementHeight(p.otherValidators...) - return <-p.resultCh + return res } -// nolint: lll -func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult { +func timestampedCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus) <-chan timestampedEvent { t.Helper() - resultCh := make(chan heightResult, 2) - var res heightResult + + // Since eventCh is not read until the end of each height, it must be large + // enough to hold all of the events produced during a single height. + eventCh := make(chan timestampedEvent, 100) + if err := eb.Observe(ctx, func(msg tmpubsub.Message) error { - ts := time.Now() - vote := msg.Data().(types.EventDataVote) - // we only fire for our own votes - if !bytes.Equal(address, vote.Vote.ValidatorAddress) { - return nil + eventCh <- timestampedEvent{ + ts: time.Now(), + m: msg, } - if vote.Vote.Type != tmproto.PrevoteType { - return nil - } - res.prevoteIssuedAt = ts - res.prevote = vote.Vote - resultCh <- res return nil - }, types.EventQueryVote); err != nil { + }, types.EventQueryVote, types.EventQueryCompleteProposal); err != nil { t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err) } - return resultCh + return eventCh +} + +func collectHeightResults(ctx context.Context, t *testing.T, eventCh <-chan timestampedEvent, height int64, address []byte) heightResult { + t.Helper() + var res heightResult + for event := range eventCh { + switch v := event.m.Data().(type) { + case types.EventDataVote: + if v.Vote.Height > height { + t.Fatalf("received prevote from unexpected height, expected: %d, saw: %d", height, v.Vote.Height) + } + if !bytes.Equal(address, v.Vote.ValidatorAddress) { + continue + } + if v.Vote.Type != tmproto.PrevoteType { + continue + } + res.prevote = v.Vote + res.prevoteIssuedAt = event.ts + + case types.EventDataCompleteProposal: + if v.Height > height { + t.Fatalf("received proposal from unexpected height, expected: %d, saw: %d", height, v.Height) + } + res.proposalIssuedAt = event.ts + } + if res.isComplete() { + return res + } + } + t.Fatalf("complete height result never seen for height %d", height) + + panic("unreachable") +} + +type timestampedEvent struct { + ts time.Time + m tmpubsub.Message } func (p *pbtsTestHarness) run() resultSet { - p.genesisHeight() + startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound) + + r1 := p.observedValidatorProposerHeight(p.genesisTime) r2 := p.height2() + p.intermediateHeights() + r5 := p.height5() + err := p.observedState.Stop() + require.NoError(p.t, err) return resultSet{ - height2: r2, + genesisHeight: r1, + height2: r2, + height5: r5, } } type resultSet struct { - height2 heightResult + genesisHeight heightResult + height2 heightResult + height5 heightResult } type heightResult struct { - prevote *types.Vote - prevoteIssuedAt time.Time + proposalIssuedAt time.Time + prevote *types.Vote + prevoteIssuedAt time.Time +} + +func (hr heightResult) isComplete() bool { + return !hr.proposalIssuedAt.IsZero() && !hr.prevoteIssuedAt.IsZero() && hr.prevote != nil } // TestReceiveProposalWaitsForPreviousBlockTime tests that a validator receiving @@ -283,7 +376,7 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) { }, timeoutPropose: 50 * time.Millisecond, genesisTime: initialTime, - height2ProposalDeliverTime: initialTime.Add(610 * time.Millisecond), + height2ProposalDeliverTime: initialTime.Add(660 * time.Millisecond), height2ProposedBlockTime: initialTime.Add(350 * time.Millisecond), } @@ -299,6 +392,67 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) { assert.Nil(t, results.height2.prevote.BlockID.Hash) } +// TestProposerWaitsForGenesisTime tests that a proposer will not propose a block +// until after the genesis time has passed. The test sets the genesis time in the +// future and then ensures that the observed validator waits to propose a block. +func TestProposerWaitsForGenesisTime(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create a genesis time far (enough) in the future. + initialTime := time.Now().Add(800 * time.Millisecond) + cfg := pbtsTestConfiguration{ + timingParams: types.TimingParams{ + Precision: 10 * time.Millisecond, + MessageDelay: 10 * time.Millisecond, + }, + timeoutPropose: 10 * time.Millisecond, + genesisTime: initialTime, + height2ProposalDeliverTime: initialTime.Add(10 * time.Millisecond), + height2ProposedBlockTime: initialTime.Add(10 * time.Millisecond), + } + + pbtsTest := newPBTSTestHarness(ctx, t, cfg) + results := pbtsTest.run() + + // ensure that the proposal was issued after the genesis time. + assert.True(t, results.genesisHeight.proposalIssuedAt.After(cfg.genesisTime)) +} + +// TestProposerWaitsForPreviousBlock tests that the proposer of a block waits until +// the block time of the previous height has passed to propose the next block. +// The test harness ensures that the observed validator will be the proposer at +// height 1 and height 5. The test sets the block time of height 4 in the future +// and then verifies that the observed validator waits until after the block time +// of height 4 to propose a block at height 5. +func TestProposerWaitsForPreviousBlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initialTime := time.Now().Add(time.Millisecond * 50) + cfg := pbtsTestConfiguration{ + timingParams: types.TimingParams{ + Precision: 100 * time.Millisecond, + MessageDelay: 500 * time.Millisecond, + }, + timeoutPropose: 50 * time.Millisecond, + genesisTime: initialTime, + height2ProposalDeliverTime: initialTime.Add(150 * time.Millisecond), + height2ProposedBlockTime: initialTime.Add(100 * time.Millisecond), + height4ProposedBlockTime: initialTime.Add(800 * time.Millisecond), + } + + pbtsTest := newPBTSTestHarness(ctx, t, cfg) + results := pbtsTest.run() + + // the observed validator is the proposer at height 5. + // ensure that the observed validator did not propose a block until after + // the time configured for height 4. + assert.True(t, results.height5.proposalIssuedAt.After(cfg.height4ProposedBlockTime)) + + // Ensure that the validator issued a prevote for a non-nil block. + assert.NotNil(t, results.height5.prevote.BlockID.Hash) +} + func TestProposerWaitTime(t *testing.T) { genesisTime, err := time.Parse(time.RFC3339, "2019-03-13T23:00:00Z") require.NoError(t, err) @@ -350,7 +504,7 @@ func TestProposalTimeout(t *testing.T) { expectedDuration time.Duration }{ { - name: "MsgDelay + Precision has not quite elapsed", + name: "MessageDelay + Precision has not quite elapsed", localTime: genesisTime.Add(525 * time.Millisecond), previousBlockTime: genesisTime.Add(6 * time.Millisecond), precision: time.Millisecond * 20, @@ -358,7 +512,7 @@ func TestProposalTimeout(t *testing.T) { expectedDuration: 1 * time.Millisecond, }, { - name: "MsgDelay + Precision equals current time", + name: "MessageDelay + Precision equals current time", localTime: genesisTime.Add(525 * time.Millisecond), previousBlockTime: genesisTime.Add(5 * time.Millisecond), precision: time.Millisecond * 20, @@ -366,7 +520,7 @@ func TestProposalTimeout(t *testing.T) { expectedDuration: 0, }, { - name: "MsgDelay + Precision has elapsed", + name: "MessageDelay + Precision has elapsed", localTime: genesisTime.Add(725 * time.Millisecond), previousBlockTime: genesisTime.Add(5 * time.Millisecond), precision: time.Millisecond * 20, diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 042157fd8..76705c8c7 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1116,9 +1116,10 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) { cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round, cstypes.RoundStepNewRound) } - } else { - cs.enterPropose(ctx, height, round) + return } + + cs.enterPropose(ctx, height, round) } // needProofBlock returns true on the first height (so the genesis app hash is signed right away) @@ -1151,6 +1152,16 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) { return } + // If this validator is the proposer of this round, and the previous block time is later than + // our local clock time, wait to propose until our local clock time has passed the block time. + if cs.privValidatorPubKey != nil && cs.isProposer(cs.privValidatorPubKey.Address()) { + proposerWaitTime := proposerWaitTime(tmtime.DefaultSource{}, cs.state.LastBlockTime) + if proposerWaitTime > 0 { + cs.scheduleTimeout(proposerWaitTime, height, round, cstypes.RoundStepNewRound) + return + } + } + logger.Debug("entering propose step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) defer func() { @@ -1178,8 +1189,6 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) { return } - logger.Debug("node is a validator") - if cs.privValidatorPubKey == nil { // If this node is a validator & proposer in the current round, it will // miss the opportunity to create a block. @@ -1187,18 +1196,20 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) { return } - address := cs.privValidatorPubKey.Address() + addr := cs.privValidatorPubKey.Address() // if not a validator, we're done - if !cs.Validators.HasAddress(address) { - logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators) + if !cs.Validators.HasAddress(addr) { + logger.Debug("node is not a validator", "addr", addr, "vals", cs.Validators) return } - if cs.isProposer(address) { + logger.Debug("node is a validator") + + if cs.isProposer(addr) { logger.Debug( "propose step; our turn to propose", - "proposer", address, + "proposer", addr, ) cs.decideProposal(ctx, height, round)