internal/consensus: proposer waits for previous block time (#7376)

This change introduces the logic to have the proposer wait until the previous block time has passed before attempting to propose the next block.

The change achieves this by by adding a new clause into the enterPropose state machine method. The method now checks if the validator is the proposer and if the validator's clock is behind the previous block's time. If the validator's clock is behind the previous block time, it schedules a timeout to re-enter the enter propose method after enough time has passed.
This commit is contained in:
William Banfield
2021-12-08 11:23:33 -05:00
committed by GitHub
parent a9b2bbd70d
commit e91bac3565
4 changed files with 220 additions and 46 deletions

View File

@@ -41,6 +41,7 @@ Special thanks to external contributors on this release:
- [cli] [#7033](https://github.com/tendermint/tendermint/pull/7033) Add a `rollback` command to rollback to the previous tendermint state in the event of non-determinstic app hash or reverting an upgrade.
- [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish)
- [consensus] \#7376 Update the proposal logic per the Propose-based timestamps specification so that the proposer will wait for the previous block time to occur before proposing the next block. (@williambanfield)
### IMPROVEMENTS

View File

@@ -682,10 +682,17 @@ func ensureRelock(t *testing.T, relockCh <-chan tmpubsub.Message, height int64,
}
func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID) {
msg := ensureMessageBeforeTimeout(t, proposalCh, ensureTimeout)
ensureProposalWithTimeout(t, proposalCh, height, round, propID, ensureTimeout)
}
// nolint: lll
func ensureProposalWithTimeout(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID, timeout time.Duration) {
t.Helper()
msg := ensureMessageBeforeTimeout(t, proposalCh, timeout)
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
if !ok {
t.Fatalf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", msg.Data())
t.Fatalf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
}
if proposalEvent.Height != height {
t.Fatalf("expected height %v, got %v", height, proposalEvent.Height)
@@ -697,7 +704,6 @@ func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int
t.Fatalf("Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID)
}
}
func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
ensureVote(t, voteCh, height, round, tmproto.PrecommitType)

View File

@@ -18,6 +18,12 @@ import (
"github.com/tendermint/tendermint/types"
)
const (
// blockTimeIota is used in the test harness as the time between
// blocks when not otherwise specified.
blockTimeIota = time.Millisecond
)
// pbtsTestHarness constructs a Tendermint network that can be used for testing the
// implementation of the Proposer-Based timestamps algorithm.
// It runs a series of consensus heights and captures timing of votes and events.
@@ -47,7 +53,9 @@ type pbtsTestHarness struct {
// channels for verifying that the observed validator completes certain actions.
ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message
resultCh <-chan heightResult
// channel of events from the observed validator annotated with the timestamp
// the event was received.
eventCh <-chan timestampedEvent
currentHeight int64
currentRound int32
@@ -71,6 +79,11 @@ type pbtsTestConfiguration struct {
// The timestamp of the block proposed at height 2.
height2ProposedBlockTime time.Time
// The timestamp of the block proposed at height 4.
// At height 4, the proposed block time and the deliver time are the same so
// that timely-ness does not affect height 4.
height4ProposedBlockTime time.Time
}
func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfiguration) pbtsTestHarness {
@@ -78,6 +91,15 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
const validators = 4
cfg := configSetup(t)
clock := new(tmtimemocks.Source)
if tc.height4ProposedBlockTime.IsZero() {
// Set a default height4ProposedBlockTime.
// Use a proposed block time that is greater than the time that the
// block at height 2 was delivered. Height 3 is not relevant for testing
// and always occurs blockTimeIota before height 4. If not otherwise specified,
// height 4 therefore occurs 2*blockTimeIota after height 2.
tc.height4ProposedBlockTime = tc.height2ProposalDeliverTime.Add(2 * blockTimeIota)
}
cfg.Consensus.TimeoutPropose = tc.timeoutPropose
consensusParams := types.DefaultConsensusParams()
consensusParams.Timing = tc.timingParams
@@ -101,7 +123,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
pubKey, err := vss[0].PrivValidator.GetPubKey(ctx)
require.NoError(t, err)
resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address())
eventCh := timestampedCollector(ctx, t, cs.eventBus)
return pbtsTestHarness{
pbtsTestConfiguration: tc,
@@ -115,20 +137,21 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal),
blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock),
ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()),
resultCh: resultCh,
eventCh: eventCh,
t: t,
ctx: ctx,
}
}
func (p *pbtsTestHarness) genesisHeight() heightResult {
p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(8)
func (p *pbtsTestHarness) observedValidatorProposerHeight(previousBlockTime time.Time) heightResult {
p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(6)
startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound)
ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound)
propBlock, partSet := p.observedState.createProposalBlock()
bid := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: partSet.Header()}
ensureProposal(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid)
timeout := time.Until(previousBlockTime.Add(ensureTimeout))
ensureProposalWithTimeout(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid, timeout)
ensurePrevote(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
signAddVotes(p.ctx, p.observedState, tmproto.PrevoteType, p.chainID, bid, p.otherValidators...)
@@ -136,19 +159,38 @@ func (p *pbtsTestHarness) genesisHeight() heightResult {
ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
ensureNewBlock(p.t, p.blockCh, p.currentHeight)
vk, err := p.observedValidator.GetPubKey(context.Background())
require.NoError(p.t, err)
res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address())
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
return res
}
func (p *pbtsTestHarness) height2() heightResult {
signer := p.otherValidators[0].PrivValidator
return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, time.Now())
height3BlockTime := p.height2ProposedBlockTime.Add(-blockTimeIota)
return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, height3BlockTime)
}
func (p *pbtsTestHarness) intermediateHeights() {
signer := p.otherValidators[1].PrivValidator
blockTimeHeight3 := p.height4ProposedBlockTime.Add(-blockTimeIota)
p.nextHeight(signer, blockTimeHeight3, blockTimeHeight3, p.height4ProposedBlockTime)
signer = p.otherValidators[2].PrivValidator
p.nextHeight(signer, p.height4ProposedBlockTime, p.height4ProposedBlockTime, time.Now())
}
func (p *pbtsTestHarness) height5() heightResult {
return p.observedValidatorProposerHeight(p.height4ProposedBlockTime)
}
// nolint: lll
func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, proposedTime, nextProposedTime time.Time) heightResult {
p.validatorClock.On("Now").Return(nextProposedTime).Times(8)
p.validatorClock.On("Now").Return(nextProposedTime).Times(6)
ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound)
@@ -182,51 +224,104 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime,
signAddVotes(p.ctx, p.observedState, tmproto.PrecommitType, p.chainID, bid, p.otherValidators...)
ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
vk, err := p.observedValidator.GetPubKey(context.Background())
require.NoError(p.t, err)
res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address())
ensureNewBlock(p.t, p.blockCh, p.currentHeight)
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
return res
}
func timestampedCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus) <-chan timestampedEvent {
t.Helper()
// Since eventCh is not read until the end of each height, it must be large
// enough to hold all of the events produced during a single height.
eventCh := make(chan timestampedEvent, 100)
if err := eb.Observe(ctx, func(msg tmpubsub.Message) error {
eventCh <- timestampedEvent{
ts: time.Now(),
m: msg,
}
return nil
}, types.EventQueryVote, types.EventQueryCompleteProposal); err != nil {
t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err)
}
return eventCh
}
// nolint: lll
func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult {
func collectHeightResults(ctx context.Context, t *testing.T, eventCh <-chan timestampedEvent, height int64, address []byte) heightResult {
t.Helper()
resultCh := make(chan heightResult, 2)
var res heightResult
if err := eb.Observe(ctx, func(msg tmpubsub.Message) error {
ts := time.Now()
vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes
if !bytes.Equal(address, vote.Vote.ValidatorAddress) {
return nil
for event := range eventCh {
switch v := event.m.Data().(type) {
case types.EventDataVote:
if v.Vote.Height > height {
t.Fatalf("received prevote from unexpected height, expected: %d, saw: %d", height, v.Vote.Height)
}
if !bytes.Equal(address, v.Vote.ValidatorAddress) {
continue
}
if v.Vote.Type != tmproto.PrevoteType {
continue
}
res.prevote = v.Vote
res.prevoteIssuedAt = event.ts
case types.EventDataCompleteProposal:
if v.Height > height {
t.Fatalf("received proposal from unexpected height, expected: %d, saw: %d", height, v.Height)
}
res.proposalIssuedAt = event.ts
}
if vote.Vote.Type != tmproto.PrevoteType {
return nil
if res.isComplete() {
return res
}
res.prevoteIssuedAt = ts
res.prevote = vote.Vote
resultCh <- res
return nil
}, types.EventQueryVote); err != nil {
t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err)
}
return resultCh
t.Fatalf("complete height result never seen for height %d", height)
panic("unreachable")
}
type timestampedEvent struct {
ts time.Time
m tmpubsub.Message
}
func (p *pbtsTestHarness) run() resultSet {
p.genesisHeight()
startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound)
r1 := p.observedValidatorProposerHeight(p.genesisTime)
r2 := p.height2()
p.intermediateHeights()
r5 := p.height5()
err := p.observedState.Stop()
require.NoError(p.t, err)
return resultSet{
height2: r2,
genesisHeight: r1,
height2: r2,
height5: r5,
}
}
type resultSet struct {
height2 heightResult
genesisHeight heightResult
height2 heightResult
height5 heightResult
}
type heightResult struct {
prevote *types.Vote
prevoteIssuedAt time.Time
proposalIssuedAt time.Time
prevote *types.Vote
prevoteIssuedAt time.Time
}
func (hr heightResult) isComplete() bool {
return !hr.proposalIssuedAt.IsZero() && !hr.prevoteIssuedAt.IsZero() && hr.prevote != nil
}
// TestReceiveProposalWaitsForPreviousBlockTime tests that a validator receiving
@@ -281,7 +376,7 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) {
},
timeoutPropose: 50 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(610 * time.Millisecond),
height2ProposalDeliverTime: initialTime.Add(660 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(350 * time.Millisecond),
}
@@ -297,6 +392,67 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) {
assert.Nil(t, results.height2.prevote.BlockID.Hash)
}
// TestProposerWaitsForGenesisTime tests that a proposer will not propose a block
// until after the genesis time has passed. The test sets the genesis time in the
// future and then ensures that the observed validator waits to propose a block.
func TestProposerWaitsForGenesisTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create a genesis time far (enough) in the future.
initialTime := time.Now().Add(800 * time.Millisecond)
cfg := pbtsTestConfiguration{
timestampParams: types.TimestampParams{
Precision: 10 * time.Millisecond,
MsgDelay: 10 * time.Millisecond,
},
timeoutPropose: 10 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(10 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(10 * time.Millisecond),
}
pbtsTest := newPBTSTestHarness(ctx, t, cfg)
results := pbtsTest.run()
// ensure that the proposal was issued after the genesis time.
assert.True(t, results.genesisHeight.proposalIssuedAt.After(cfg.genesisTime))
}
// TestProposerWaitsForPreviousBlock tests that the proposer of a block waits until
// the block time of the previous height has passed to propose the next block.
// The test harness ensures that the observed validator will be the proposer at
// height 1 and height 5. The test sets the block time of height 4 in the future
// and then verifies that the observed validator waits until after the block time
// of height 4 to propose a block at height 5.
func TestProposerWaitsForPreviousBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initialTime := time.Now().Add(time.Millisecond * 50)
cfg := pbtsTestConfiguration{
timestampParams: types.TimestampParams{
Precision: 100 * time.Millisecond,
MsgDelay: 500 * time.Millisecond,
},
timeoutPropose: 50 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(150 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(100 * time.Millisecond),
height4ProposedBlockTime: initialTime.Add(800 * time.Millisecond),
}
pbtsTest := newPBTSTestHarness(ctx, t, cfg)
results := pbtsTest.run()
// the observed validator is the proposer at height 5.
// ensure that the observed validator did not propose a block until after
// the time configured for height 4.
assert.True(t, results.height5.proposalIssuedAt.After(cfg.height4ProposedBlockTime))
// Ensure that the validator issued a prevote for a non-nil block.
assert.NotNil(t, results.height5.prevote.BlockID.Hash)
}
func TestProposerWaitTime(t *testing.T) {
genesisTime, err := time.Parse(time.RFC3339, "2019-03-13T23:00:00Z")
require.NoError(t, err)

View File

@@ -1069,9 +1069,10 @@ func (cs *State) enterNewRound(height int64, round int32) {
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)
}
} else {
cs.enterPropose(height, round)
return
}
cs.enterPropose(height, round)
}
// needProofBlock returns true on the first height (so the genesis app hash is signed right away)
@@ -1104,6 +1105,16 @@ func (cs *State) enterPropose(height int64, round int32) {
return
}
// If this validator is the proposer of this round, and the previous block time is later than
// our local clock time, wait to propose until our local clock time has passed the block time.
if cs.privValidatorPubKey != nil && cs.isProposer(cs.privValidatorPubKey.Address()) {
proposerWaitTime := proposerWaitTime(tmtime.DefaultSource{}, cs.state.LastBlockTime)
if proposerWaitTime > 0 {
cs.scheduleTimeout(proposerWaitTime, height, round, cstypes.RoundStepNewRound)
return
}
}
logger.Debug("entering propose step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
defer func() {
@@ -1131,8 +1142,6 @@ func (cs *State) enterPropose(height int64, round int32) {
return
}
logger.Debug("node is a validator")
if cs.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
@@ -1140,18 +1149,20 @@ func (cs *State) enterPropose(height int64, round int32) {
return
}
address := cs.privValidatorPubKey.Address()
addr := cs.privValidatorPubKey.Address()
// if not a validator, we're done
if !cs.Validators.HasAddress(address) {
logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators)
if !cs.Validators.HasAddress(addr) {
logger.Debug("node is not a validator", "addr", addr, "vals", cs.Validators)
return
}
if cs.isProposer(address) {
logger.Debug("node is a validator")
if cs.isProposer(addr) {
logger.Debug(
"propose step; our turn to propose",
"proposer", address,
"proposer", addr,
)
cs.decideProposal(height, round)