internal/consensus: proposer waits for previous block time (#7376)

This change introduces the logic to have the proposer wait until the previous block time has passed before attempting to propose the next block.

The change achieves this by by adding a new clause into the enterPropose state machine method. The method now checks if the validator is the proposer and if the validator's clock is behind the previous block's time. If the validator's clock is behind the previous block time, it schedules a timeout to re-enter the enter propose method after enough time has passed.
This commit is contained in:
William Banfield
2021-12-08 11:23:33 -05:00
committed by William Banfield
parent 3022f9192a
commit fa1afef125
4 changed files with 221 additions and 51 deletions

View File

@@ -47,6 +47,7 @@ Special thanks to external contributors on this release:
- [rpc] [\#7270](https://github.com/tendermint/tendermint/pull/7270) Add `header` and `header_by_hash` RPC Client queries. (@fedekunze)
- [cli] [#7033](https://github.com/tendermint/tendermint/pull/7033) Add a `rollback` command to rollback to the previous tendermint state in the event of non-determinstic app hash or reverting an upgrade.
- [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish)
- [consensus] \#7376 Update the proposal logic per the Propose-based timestamps specification so that the proposer will wait for the previous block time to occur before proposing the next block. (@williambanfield)
### IMPROVEMENTS
- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em)

View File

@@ -40,7 +40,7 @@ const (
testSubscriber = "test-client"
// genesis, chain_id, priv_val
ensureTimeout = time.Millisecond * 200
ensureTimeout = time.Millisecond * 500
)
// A cleanupFunc cleans up any config / test files created for a particular
@@ -656,8 +656,13 @@ func ensureRelock(t *testing.T, relockCh <-chan tmpubsub.Message, height int64,
}
func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID) {
ensureProposalWithTimeout(t, proposalCh, height, round, propID, ensureTimeout)
}
// nolint: lll
func ensureProposalWithTimeout(t *testing.T, proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID, timeout time.Duration) {
t.Helper()
msg := ensureMessageBeforeTimeout(t, proposalCh, ensureTimeout)
msg := ensureMessageBeforeTimeout(t, proposalCh, timeout)
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
require.True(t, ok, "expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data())
@@ -666,7 +671,6 @@ func ensureProposal(t *testing.T, proposalCh <-chan tmpubsub.Message, height int
require.True(t, proposalEvent.BlockID.Equals(propID),
"Proposed block does not match expected block (%v != %v)", proposalEvent.BlockID, propID)
}
func ensurePrecommit(t *testing.T, voteCh <-chan tmpubsub.Message, height int64, round int32) {
t.Helper()
ensureVote(t, voteCh, height, round, tmproto.PrecommitType)

View File

@@ -18,6 +18,12 @@ import (
"github.com/tendermint/tendermint/types"
)
const (
// blockTimeIota is used in the test harness as the time between
// blocks when not otherwise specified.
blockTimeIota = time.Millisecond
)
// pbtsTestHarness constructs a Tendermint network that can be used for testing the
// implementation of the Proposer-Based timestamps algorithm.
// It runs a series of consensus heights and captures timing of votes and events.
@@ -47,7 +53,9 @@ type pbtsTestHarness struct {
// channels for verifying that the observed validator completes certain actions.
ensureProposalCh, roundCh, blockCh, ensureVoteCh <-chan tmpubsub.Message
resultCh <-chan heightResult
// channel of events from the observed validator annotated with the timestamp
// the event was received.
eventCh <-chan timestampedEvent
currentHeight int64
currentRound int32
@@ -71,6 +79,11 @@ type pbtsTestConfiguration struct {
// The timestamp of the block proposed at height 2.
height2ProposedBlockTime time.Time
// The timestamp of the block proposed at height 4.
// At height 4, the proposed block time and the deliver time are the same so
// that timely-ness does not affect height 4.
height4ProposedBlockTime time.Time
}
func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfiguration) pbtsTestHarness {
@@ -78,6 +91,15 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
const validators = 4
cfg := configSetup(t)
clock := new(tmtimemocks.Source)
if tc.height4ProposedBlockTime.IsZero() {
// Set a default height4ProposedBlockTime.
// Use a proposed block time that is greater than the time that the
// block at height 2 was delivered. Height 3 is not relevant for testing
// and always occurs blockTimeIota before height 4. If not otherwise specified,
// height 4 therefore occurs 2*blockTimeIota after height 2.
tc.height4ProposedBlockTime = tc.height2ProposalDeliverTime.Add(2 * blockTimeIota)
}
cfg.Consensus.TimeoutPropose = tc.timeoutPropose
consensusParams := types.DefaultConsensusParams()
consensusParams.Timing = tc.timingParams
@@ -87,7 +109,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
Time: tc.genesisTime,
Validators: validators,
})
cs := newState(ctx, t, log.TestingLogger(), state, privVals[0], kvstore.NewApplication())
cs := newState(ctx, t, log.NewTestingLogger(t), state, privVals[0], kvstore.NewApplication())
vss := make([]*validatorStub, validators)
for i := 0; i < validators; i++ {
vss[i] = newValidatorStub(privVals[i], int32(i))
@@ -100,7 +122,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
pubKey, err := vss[0].PrivValidator.GetPubKey(ctx)
require.NoError(t, err)
resultCh := registerResultCollector(ctx, t, cs.eventBus, pubKey.Address())
eventCh := timestampedCollector(ctx, t, cs.eventBus)
return pbtsTestHarness{
pbtsTestConfiguration: tc,
@@ -114,21 +136,22 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
ensureProposalCh: subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal),
blockCh: subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock),
ensureVoteCh: subscribeToVoterBuffered(ctx, t, cs, pubKey.Address()),
resultCh: resultCh,
eventCh: eventCh,
t: t,
ctx: ctx,
}
}
func (p *pbtsTestHarness) genesisHeight() heightResult {
p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(8)
func (p *pbtsTestHarness) observedValidatorProposerHeight(previousBlockTime time.Time) heightResult {
p.validatorClock.On("Now").Return(p.height2ProposedBlockTime).Times(6)
startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound)
ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound)
propBlock, partSet, err := p.observedState.createProposalBlock()
require.NoError(p.t, err)
bid := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: partSet.Header()}
ensureProposal(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid)
timeout := time.Until(previousBlockTime.Add(ensureTimeout))
ensureProposalWithTimeout(p.t, p.ensureProposalCh, p.currentHeight, p.currentRound, bid, timeout)
ensurePrevote(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
signAddVotes(p.ctx, p.t, p.observedState, tmproto.PrevoteType, p.chainID, bid, p.otherValidators...)
@@ -136,19 +159,37 @@ func (p *pbtsTestHarness) genesisHeight() heightResult {
ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
ensureNewBlock(p.t, p.blockCh, p.currentHeight)
vk, err := p.observedValidator.GetPubKey(context.Background())
require.NoError(p.t, err)
res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address())
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
return res
}
func (p *pbtsTestHarness) height2() heightResult {
signer := p.otherValidators[0].PrivValidator
return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, time.Now())
height3BlockTime := p.height2ProposedBlockTime.Add(-blockTimeIota)
return p.nextHeight(signer, p.height2ProposalDeliverTime, p.height2ProposedBlockTime, height3BlockTime)
}
func (p *pbtsTestHarness) intermediateHeights() {
signer := p.otherValidators[1].PrivValidator
blockTimeHeight3 := p.height4ProposedBlockTime.Add(-blockTimeIota)
p.nextHeight(signer, blockTimeHeight3, blockTimeHeight3, p.height4ProposedBlockTime)
signer = p.otherValidators[2].PrivValidator
p.nextHeight(signer, p.height4ProposedBlockTime, p.height4ProposedBlockTime, time.Now())
}
func (p *pbtsTestHarness) height5() heightResult {
return p.observedValidatorProposerHeight(p.height4ProposedBlockTime)
}
// nolint: lll
func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime, proposedTime, nextProposedTime time.Time) heightResult {
p.validatorClock.On("Now").Return(nextProposedTime).Times(8)
p.validatorClock.On("Now").Return(nextProposedTime).Times(6)
ensureNewRound(p.t, p.roundCh, p.currentHeight, p.currentRound)
@@ -184,51 +225,103 @@ func (p *pbtsTestHarness) nextHeight(proposer types.PrivValidator, deliverTime,
signAddVotes(p.ctx, p.t, p.observedState, tmproto.PrecommitType, p.chainID, bid, p.otherValidators...)
ensurePrecommit(p.t, p.ensureVoteCh, p.currentHeight, p.currentRound)
vk, err := p.observedValidator.GetPubKey(context.Background())
require.NoError(p.t, err)
res := collectHeightResults(p.ctx, p.t, p.eventCh, p.currentHeight, vk.Address())
ensureNewBlock(p.t, p.blockCh, p.currentHeight)
p.currentHeight++
incrementHeight(p.otherValidators...)
return <-p.resultCh
return res
}
// nolint: lll
func registerResultCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus, address []byte) <-chan heightResult {
func timestampedCollector(ctx context.Context, t *testing.T, eb *eventbus.EventBus) <-chan timestampedEvent {
t.Helper()
resultCh := make(chan heightResult, 2)
var res heightResult
// Since eventCh is not read until the end of each height, it must be large
// enough to hold all of the events produced during a single height.
eventCh := make(chan timestampedEvent, 100)
if err := eb.Observe(ctx, func(msg tmpubsub.Message) error {
ts := time.Now()
vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes
if !bytes.Equal(address, vote.Vote.ValidatorAddress) {
return nil
eventCh <- timestampedEvent{
ts: time.Now(),
m: msg,
}
if vote.Vote.Type != tmproto.PrevoteType {
return nil
}
res.prevoteIssuedAt = ts
res.prevote = vote.Vote
resultCh <- res
return nil
}, types.EventQueryVote); err != nil {
}, types.EventQueryVote, types.EventQueryCompleteProposal); err != nil {
t.Fatalf("Failed to observe query %v: %v", types.EventQueryVote, err)
}
return resultCh
return eventCh
}
func collectHeightResults(ctx context.Context, t *testing.T, eventCh <-chan timestampedEvent, height int64, address []byte) heightResult {
t.Helper()
var res heightResult
for event := range eventCh {
switch v := event.m.Data().(type) {
case types.EventDataVote:
if v.Vote.Height > height {
t.Fatalf("received prevote from unexpected height, expected: %d, saw: %d", height, v.Vote.Height)
}
if !bytes.Equal(address, v.Vote.ValidatorAddress) {
continue
}
if v.Vote.Type != tmproto.PrevoteType {
continue
}
res.prevote = v.Vote
res.prevoteIssuedAt = event.ts
case types.EventDataCompleteProposal:
if v.Height > height {
t.Fatalf("received proposal from unexpected height, expected: %d, saw: %d", height, v.Height)
}
res.proposalIssuedAt = event.ts
}
if res.isComplete() {
return res
}
}
t.Fatalf("complete height result never seen for height %d", height)
panic("unreachable")
}
type timestampedEvent struct {
ts time.Time
m tmpubsub.Message
}
func (p *pbtsTestHarness) run() resultSet {
p.genesisHeight()
startTestRound(p.ctx, p.observedState, p.currentHeight, p.currentRound)
r1 := p.observedValidatorProposerHeight(p.genesisTime)
r2 := p.height2()
p.intermediateHeights()
r5 := p.height5()
err := p.observedState.Stop()
require.NoError(p.t, err)
return resultSet{
height2: r2,
genesisHeight: r1,
height2: r2,
height5: r5,
}
}
type resultSet struct {
height2 heightResult
genesisHeight heightResult
height2 heightResult
height5 heightResult
}
type heightResult struct {
prevote *types.Vote
prevoteIssuedAt time.Time
proposalIssuedAt time.Time
prevote *types.Vote
prevoteIssuedAt time.Time
}
func (hr heightResult) isComplete() bool {
return !hr.proposalIssuedAt.IsZero() && !hr.prevoteIssuedAt.IsZero() && hr.prevote != nil
}
// TestReceiveProposalWaitsForPreviousBlockTime tests that a validator receiving
@@ -283,7 +376,7 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) {
},
timeoutPropose: 50 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(610 * time.Millisecond),
height2ProposalDeliverTime: initialTime.Add(660 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(350 * time.Millisecond),
}
@@ -299,6 +392,67 @@ func TestReceiveProposalTimesOutOnSlowDelivery(t *testing.T) {
assert.Nil(t, results.height2.prevote.BlockID.Hash)
}
// TestProposerWaitsForGenesisTime tests that a proposer will not propose a block
// until after the genesis time has passed. The test sets the genesis time in the
// future and then ensures that the observed validator waits to propose a block.
func TestProposerWaitsForGenesisTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create a genesis time far (enough) in the future.
initialTime := time.Now().Add(800 * time.Millisecond)
cfg := pbtsTestConfiguration{
timingParams: types.TimingParams{
Precision: 10 * time.Millisecond,
MessageDelay: 10 * time.Millisecond,
},
timeoutPropose: 10 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(10 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(10 * time.Millisecond),
}
pbtsTest := newPBTSTestHarness(ctx, t, cfg)
results := pbtsTest.run()
// ensure that the proposal was issued after the genesis time.
assert.True(t, results.genesisHeight.proposalIssuedAt.After(cfg.genesisTime))
}
// TestProposerWaitsForPreviousBlock tests that the proposer of a block waits until
// the block time of the previous height has passed to propose the next block.
// The test harness ensures that the observed validator will be the proposer at
// height 1 and height 5. The test sets the block time of height 4 in the future
// and then verifies that the observed validator waits until after the block time
// of height 4 to propose a block at height 5.
func TestProposerWaitsForPreviousBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initialTime := time.Now().Add(time.Millisecond * 50)
cfg := pbtsTestConfiguration{
timingParams: types.TimingParams{
Precision: 100 * time.Millisecond,
MessageDelay: 500 * time.Millisecond,
},
timeoutPropose: 50 * time.Millisecond,
genesisTime: initialTime,
height2ProposalDeliverTime: initialTime.Add(150 * time.Millisecond),
height2ProposedBlockTime: initialTime.Add(100 * time.Millisecond),
height4ProposedBlockTime: initialTime.Add(800 * time.Millisecond),
}
pbtsTest := newPBTSTestHarness(ctx, t, cfg)
results := pbtsTest.run()
// the observed validator is the proposer at height 5.
// ensure that the observed validator did not propose a block until after
// the time configured for height 4.
assert.True(t, results.height5.proposalIssuedAt.After(cfg.height4ProposedBlockTime))
// Ensure that the validator issued a prevote for a non-nil block.
assert.NotNil(t, results.height5.prevote.BlockID.Hash)
}
func TestProposerWaitTime(t *testing.T) {
genesisTime, err := time.Parse(time.RFC3339, "2019-03-13T23:00:00Z")
require.NoError(t, err)
@@ -350,7 +504,7 @@ func TestProposalTimeout(t *testing.T) {
expectedDuration time.Duration
}{
{
name: "MsgDelay + Precision has not quite elapsed",
name: "MessageDelay + Precision has not quite elapsed",
localTime: genesisTime.Add(525 * time.Millisecond),
previousBlockTime: genesisTime.Add(6 * time.Millisecond),
precision: time.Millisecond * 20,
@@ -358,7 +512,7 @@ func TestProposalTimeout(t *testing.T) {
expectedDuration: 1 * time.Millisecond,
},
{
name: "MsgDelay + Precision equals current time",
name: "MessageDelay + Precision equals current time",
localTime: genesisTime.Add(525 * time.Millisecond),
previousBlockTime: genesisTime.Add(5 * time.Millisecond),
precision: time.Millisecond * 20,
@@ -366,7 +520,7 @@ func TestProposalTimeout(t *testing.T) {
expectedDuration: 0,
},
{
name: "MsgDelay + Precision has elapsed",
name: "MessageDelay + Precision has elapsed",
localTime: genesisTime.Add(725 * time.Millisecond),
previousBlockTime: genesisTime.Add(5 * time.Millisecond),
precision: time.Millisecond * 20,

View File

@@ -1116,9 +1116,10 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)
}
} else {
cs.enterPropose(ctx, height, round)
return
}
cs.enterPropose(ctx, height, round)
}
// needProofBlock returns true on the first height (so the genesis app hash is signed right away)
@@ -1151,6 +1152,16 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) {
return
}
// If this validator is the proposer of this round, and the previous block time is later than
// our local clock time, wait to propose until our local clock time has passed the block time.
if cs.privValidatorPubKey != nil && cs.isProposer(cs.privValidatorPubKey.Address()) {
proposerWaitTime := proposerWaitTime(tmtime.DefaultSource{}, cs.state.LastBlockTime)
if proposerWaitTime > 0 {
cs.scheduleTimeout(proposerWaitTime, height, round, cstypes.RoundStepNewRound)
return
}
}
logger.Debug("entering propose step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
defer func() {
@@ -1178,8 +1189,6 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) {
return
}
logger.Debug("node is a validator")
if cs.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
@@ -1187,18 +1196,20 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) {
return
}
address := cs.privValidatorPubKey.Address()
addr := cs.privValidatorPubKey.Address()
// if not a validator, we're done
if !cs.Validators.HasAddress(address) {
logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators)
if !cs.Validators.HasAddress(addr) {
logger.Debug("node is not a validator", "addr", addr, "vals", cs.Validators)
return
}
if cs.isProposer(address) {
logger.Debug("node is a validator")
if cs.isProposer(addr) {
logger.Debug(
"propose step; our turn to propose",
"proposer", address,
"proposer", addr,
)
cs.decideProposal(ctx, height, round)