Divergences in comparison with #9620. Part 3: Fix problems found in blocksync reactor (#9926)

* Fix problems in blocksync reactor logic & test the fixes

* diagnose messages on TestCheckSwitchToConsensusLastHeightZero

* Fixed test logic

* Apply suggestions from code review

Co-authored-by: Lasaro <lasaro@informal.systems>

* Addressed @lasarojc's comments

Co-authored-by: Lasaro <lasaro@informal.systems>
This commit is contained in:
Sergio Mena
2022-12-22 13:58:08 +01:00
committed by GitHub
parent 12f7e31610
commit 1332be0831
4 changed files with 119 additions and 23 deletions

View File

@@ -59,6 +59,8 @@ type Reactor struct {
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
switchToConsensusMs int
metrics *Metrics
}
@@ -279,7 +281,10 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
if bcR.switchToConsensusMs == 0 {
bcR.switchToConsensusMs = switchToConsensusIntervalSeconds * 1000
}
switchToConsensusTicker := time.NewTicker(time.Duration(bcR.switchToConsensusMs) * time.Millisecond)
defer switchToConsensusTicker.Stop()
blocksSynced := uint64(0)
@@ -334,26 +339,35 @@ FOR_LOOP:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
"outbound", outbound, "inbound", inbound, "lastHeight", state.LastBlockHeight)
// The "if" statement below is a bit confusing, so here is a breakdown
// of its logic and purpose:
//
// If VoteExtensions are enabled we cannot switch to consensus without
// the vote extension data for the previous height, i.e. state.LastBlockHeight.
// If we are at genesis (no block in the chain), we don't need VoteExtensions
// because the first block's LastCommit is empty anyway.
//
// If extensions were required during state.LastBlockHeight and we have
// sync'd at least one block, then we are guaranteed to have extensions.
// BlockSync requires that the blocks it fetches have extensions if
// extensions were enabled during the height.
// If VoteExtensions were disabled for the previous height then we don't need
// VoteExtensions.
//
// If extensions were required during state.LastBlockHeight and we have
// not sync'd any blocks, then we can only transition to Consensus
// if we already had extensions for the initial height.
// If any of these conditions is not met, we continue the loop, looking
// for extensions.
if state.ConsensusParams.ABCI.VoteExtensionsEnabled(state.LastBlockHeight) &&
(blocksSynced == 0 && !initialCommitHasExtensions) {
// If we have sync'd at least one block, then we are guaranteed to have extensions
// if we need them by the logic inside loop FOR_LOOP: it requires that the blocks
// it fetches have extensions if extensions were enabled during the height.
//
// If we already had extensions for the initial height (e.g. we are recovering),
// then we are guaranteed to have extensions for the last block (if required) even
// if we did not blocksync any block.
//
missingExtension := true
if state.LastBlockHeight == 0 ||
!state.ConsensusParams.ABCI.VoteExtensionsEnabled(state.LastBlockHeight) ||
blocksSynced > 0 ||
initialCommitHasExtensions {
missingExtension = false
}
// If require extensions, but since we don't have them yet, then we cannot switch to consensus yet.
if missingExtension {
bcR.Logger.Info(
"no extended commit yet",
"height", height,
@@ -396,15 +410,24 @@ FOR_LOOP:
// See if there are any blocks to sync.
first, second, extCommit := bcR.pool.PeekTwoBlocks()
if first != nil && extCommit == nil &&
state.ConsensusParams.ABCI.VoteExtensionsEnabled(first.Height) {
// See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
} else if first == nil || second == nil {
if first == nil || second == nil {
// we need to have fetched two consecutive blocks in order to
// perform blocksync verification
continue FOR_LOOP
}
// Some sanity checks on heights
if state.LastBlockHeight > 0 && state.LastBlockHeight+1 != first.Height {
// Panicking because the block pool's height MUST keep consistent with the state; the block pool is totally under our control
panic(fmt.Errorf("peeked first block has unexpected height; expected %d, got %d", state.LastBlockHeight+1, first.Height))
}
if first.Height+1 != second.Height {
// Panicking because this is an obvious bug in the block pool, which is totally under our control
panic(fmt.Errorf("heights of first and second block are not consecutive; expected %d, got %d", state.LastBlockHeight, first.Height))
}
if extCommit == nil && state.ConsensusParams.ABCI.VoteExtensionsEnabled(first.Height) {
// See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
}
// Try again quickly next loop.
didProcessCh <- struct{}{}

View File

@@ -42,10 +42,13 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
consPar := types.DefaultConsensusParams()
consPar.ABCI.VoteExtensionsEnableHeight = 1
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: test.DefaultTestChainID,
Validators: validators,
GenesisTime: tmtime.Now(),
ChainID: test.DefaultTestChainID,
Validators: validators,
ConsensusParams: consPar,
}, privValidators
}
@@ -310,3 +313,66 @@ func TestBadBlockStopsPeer(t *testing.T) {
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
}
func TestCheckSwitchToConsensusLastHeightZero(t *testing.T) {
const maxBlockHeight = int64(45)
config = test.ResetTestRoot("blocksync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
reactorPairs := make([]ReactorPair, 1, 2)
reactorPairs[0] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[0].reactor.switchToConsensusMs = 50
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}()
reactorPairs = append(reactorPairs, newReactor(t, log.TestingLogger(), genDoc, privVals, maxBlockHeight))
var switches []*p2p.Switch
for _, r := range reactorPairs {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKSYNC", r.reactor)
return s
}, p2p.Connect2Switches)...)
}
time.Sleep(60 * time.Millisecond)
// Connect both switches
p2p.Connect2Switches(switches, 0, 1)
startTime := time.Now()
for {
time.Sleep(20 * time.Millisecond)
caughtUp := true
for _, r := range reactorPairs {
if !r.reactor.pool.IsCaughtUp() {
caughtUp = false
break
}
}
if caughtUp {
break
}
if time.Since(startTime) > 90*time.Second {
msg := "timeout: reactors didn't catch up;"
for i, r := range reactorPairs {
h, p, lr := r.reactor.pool.GetStatus()
c := r.reactor.pool.IsCaughtUp()
msg += fmt.Sprintf(" reactor#%d (h %d, p %d, lr %d, c %t);", i, h, p, lr, c)
}
require.Fail(t, msg)
}
}
for _, r := range reactorPairs {
assert.GreaterOrEqual(t, r.reactor.store.Height(), maxBlockHeight-2)
}
}