Files
tendermint/internal/consensus/replay.go
M. J. Fromberger c8e8a62084 abci/client: simplify client interface (#7607)
This change has two main effects:

1. Remove most of the Async methods from the abci.Client interface.
   Remaining are FlushAsync, CommitTxAsync, and DeliverTxAsync.

2. Rename the synchronous methods to remove the "Sync" suffix.

The rest of the change is updating the implementations, subsets, and mocks of
the interface, along with the call sites that point to them.

* Fix stringly-typed mock stubs.
* Rename helper method.
2022-01-19 10:58:56 -08:00

562 lines
18 KiB
Go

package consensus
import (
"bytes"
"context"
"errors"
"fmt"
"hash/crc32"
"io"
"reflect"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Functionality to replay blocks and messages on recovery from a crash.
// There are two general failure scenarios:
//
// 1. failure during consensus
// 2. failure while applying the block
//
// The former is handled by the WAL, the latter by the proxyApp Handshake on
// restart, which ultimately hands off the work to the WAL.
//-----------------------------------------
// 1. Recover from failure during consensus
// (by replaying messages from the WAL)
//-----------------------------------------
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, newStepSub eventbus.Subscription) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
cs.logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
if newStepSub != nil {
ctxto, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
stepMsg, err := newStepSub.Next(ctxto)
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("subscription timed out: %w", err)
} else if err != nil {
return fmt.Errorf("subscription canceled: %w", err)
}
m2 := stepMsg.Data().(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("roundState mismatch. Got %v; Expected %v", m2, m)
}
}
case msgInfo:
peerID := m.PeerID
if peerID == "" {
peerID = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
cs.logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
cs.logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
v := msg.Vote
cs.logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
}
cs.handleMsg(ctx, m)
case timeoutInfo:
cs.logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(ctx, m, cs.RoundState)
default:
return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// Replay only those messages since the last block. `timeoutRoutine` should
// run concurrently to read off tickChan.
func (cs *State) catchupReplay(ctx context.Context, csHeight int64) error {
// Set replayMode to true so we don't log signing errors.
cs.replayMode = true
defer func() { cs.replayMode = false }()
// Ensure that #ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine
// without it, and Handshake could reuse State if it weren't for
// this check (since we can crash after writing #ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil {
return err
}
if gr != nil {
if err := gr.Close(); err != nil {
return err
}
}
if found {
return fmt.Errorf("wal should not contain #ENDHEIGHT %d", csHeight)
}
// Search for last height marker.
//
// Ignore data corruption errors in previous heights because we only care about last height
if csHeight < cs.state.InitialHeight {
return fmt.Errorf("cannot replay height %v, below initial height %v", csHeight, cs.state.InitialHeight)
}
endHeight := csHeight - 1
if csHeight == cs.state.InitialHeight {
endHeight = 0
}
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF {
cs.logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", endHeight)
} else if err != nil {
return err
}
if !found {
return fmt.Errorf("cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, endHeight)
}
defer gr.Close()
cs.logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
dec := WALDecoder{gr}
LOOP:
for {
msg, err = dec.Decode()
switch {
case err == io.EOF:
break LOOP
case IsDataCorruptionError(err):
cs.logger.Error("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
return err
case err != nil:
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(ctx, msg, nil); err != nil {
return err
}
}
cs.logger.Info("Replay: Done")
return nil
}
//--------------------------------------------------------------------------------
// Parses marker lines of the form:
// #ENDHEIGHT: 12345
/*
func makeHeightSearchFunc(height int64) auto.SearchFunc {
return func(line string) (int, error) {
line = strings.TrimRight(line, "\n")
parts := strings.Split(line, " ")
if len(parts) != 2 {
return -1, errors.New("line did not have 2 parts")
}
i, err := strconv.Atoi(parts[1])
if err != nil {
return -1, errors.New("failed to parse INFO: " + err.Error())
}
if height < i {
return 1, nil
} else if height == i {
return 0, nil
} else {
return -1, nil
}
}
}*/
//---------------------------------------------------
// 2. Recover from failure while applying the block.
// (by handshaking with the app to figure out where
// we were last, and using the WAL to recover there.)
//---------------------------------------------------
type Handshaker struct {
stateStore sm.Store
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
genDoc *types.GenesisDoc
logger log.Logger
nBlocks int // number of blocks applied to the state
}
func NewHandshaker(
logger log.Logger,
stateStore sm.Store,
state sm.State,
store sm.BlockStore,
eventBus types.BlockEventPublisher,
genDoc *types.GenesisDoc,
) *Handshaker {
return &Handshaker{
stateStore: stateStore,
initialState: state,
store: store,
eventBus: eventBus,
genDoc: genDoc,
logger: logger,
nBlocks: 0,
}
}
// NBlocks returns the number of blocks applied to the state.
func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {
// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %w", err)
}
blockHeight := res.LastBlockHeight
if blockHeight < 0 {
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight)
}
appHash := res.LastBlockAppHash
h.logger.Info("ABCI Handshake App Info",
"height", blockHeight,
"hash", appHash,
"software-version", res.Version,
"protocol-version", res.AppVersion,
)
// Only set the version if there is no existing state.
if h.initialState.LastBlockHeight == 0 {
h.initialState.Version.Consensus.App = res.AppVersion
}
// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %w", err)
}
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
"appHeight", blockHeight, "appHash", appHash)
// TODO: (on restart) replay mempool
return nil
}
// ReplayBlocks replays all blocks since appBlockHeight and ensures the result
// matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
ctx context.Context,
state sm.State,
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
) ([]byte, error) {
storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height()
stateBlockHeight := state.LastBlockHeight
h.logger.Info(
"ABCI Replay Blocks",
"appHeight",
appBlockHeight,
"storeHeight",
storeBlockHeight,
"stateHeight",
stateBlockHeight)
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain.
if appBlockHeight == 0 {
validators := make([]*types.Validator, len(h.genDoc.Validators))
for i, val := range h.genDoc.Validators {
validators[i] = types.NewValidator(val.PubKey, val.Power)
}
validatorSet := types.NewValidatorSet(validators)
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
pbParams := h.genDoc.ConsensusParams.ToProto()
req := abci.RequestInitChain{
Time: h.genDoc.GenesisTime,
ChainId: h.genDoc.ChainID,
InitialHeight: h.genDoc.InitialHeight,
ConsensusParams: &pbParams,
Validators: nextVals,
AppStateBytes: h.genDoc.AppState,
}
res, err := proxyApp.Consensus().InitChain(ctx, req)
if err != nil {
return nil, err
}
appHash = res.AppHash
if stateBlockHeight == 0 { // we only update state when we are in initial state
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
if len(res.AppHash) > 0 {
state.AppHash = res.AppHash
}
// If the app returned validators or consensus params, update the state.
if len(res.Validators) > 0 {
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
if err != nil {
return nil, err
}
state.Validators = types.NewValidatorSet(vals)
state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
} else if len(h.genDoc.Validators) == 0 {
// If validator set is not set in genesis and still empty after InitChain, exit.
return nil, fmt.Errorf("validator set is nil in genesis and still empty after InitChain")
}
if res.ConsensusParams != nil {
state.ConsensusParams = state.ConsensusParams.UpdateConsensusParams(res.ConsensusParams)
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
state.LastResultsHash = merkle.HashFromByteSlices(nil)
if err := h.stateStore.Save(state); err != nil {
return nil, err
}
}
}
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase.
switch {
case storeBlockHeight == 0:
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase:
// the app has no state, and the block store is truncated above the initial height
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1:
// the app is too far behind truncated store (can be 1 behind since we replay the next)
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case storeBlockHeight < appBlockHeight:
// the app should never be ahead of the store (but this is under app's control)
return appHash, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight}
case storeBlockHeight < stateBlockHeight:
// the state should never be ahead of the store (this is under tendermint's control)
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
case storeBlockHeight > stateBlockHeight+1:
// store should be at most one ahead of the state (this is under tendermint's control)
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
}
var err error
// Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight {
// Tendermint ran Commit and saved the state.
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight {
// We're good!
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
}
} else if storeBlockHeight == stateBlockHeight+1 {
// We saved the block in the store but haven't updated the state,
// so we'll need to replay a block using the WAL.
switch {
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)
case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
// so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
case appBlockHeight == storeBlockHeight:
// We ran Commit, but didn't save the state, so replayBlock with mock app.
abciResponses, err := h.stateStore.LoadABCIResponses(storeBlockHeight)
if err != nil {
return nil, err
}
mockApp, err := newMockProxyApp(ctx, h.logger, appHash, abciResponses)
if err != nil {
return nil, err
}
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp)
if err != nil {
return nil, err
}
return state.AppHash, err
}
}
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d",
appBlockHeight, storeBlockHeight, stateBlockHeight))
}
func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
storeBlockHeight int64,
mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1.
//
// Note that we don't have an old version of the state,
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
// This also means we won't be saving validator sets if they change during this period.
// TODO: Load the historical information to fix this and just use state.ApplyBlock
//
// If mutateState == true, the final block is replayed with h.replayBlock()
var appHash []byte
var err error
finalBlock := storeBlockHeight
if mutateState {
finalBlock--
}
firstBlock := appBlockHeight + 1
if firstBlock == 1 {
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
}
if i == finalBlock && !mutateState {
// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(ctx,
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
} else {
appHash, err = sm.ExecCommitBlock(ctx,
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
}
h.nBlocks++
}
if mutateState {
// sync the final block
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
}
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
}
// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(
ctx context.Context,
state sm.State,
height int64,
proxyApp proxy.AppConnConsensus,
) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
var err error
state, err = blockExec.ApplyBlock(ctx, state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
}
h.nBlocks++
return state, nil
}
func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) {
if !bytes.Equal(appHash, block.AppHash) {
panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X.
Block: %v
`,
appHash, block.AppHash, block))
}
}
func assertAppHashEqualsOneFromState(appHash []byte, state sm.State) {
if !bytes.Equal(appHash, state.AppHash) {
panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got
%X, expected %X.
State: %v
Did you reset Tendermint without resetting your application's data?`,
appHash, state.AppHash, state))
}
}