mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-16 09:42:50 +00:00
Compare commits
5 Commits
master
...
callum/han
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2d8666782 | ||
|
|
102f1f5af6 | ||
|
|
64fb13a561 | ||
|
|
5682dd55f9 | ||
|
|
4db618db7a |
@@ -1,19 +1,12 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -191,358 +184,3 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
//---------------------------------------------------
|
||||
// 2. Recover from failure while applying the block.
|
||||
// (by handshaking with the app to figure out where
|
||||
// we were last, and using the WAL to recover there.)
|
||||
//---------------------------------------------------
|
||||
|
||||
type Handshaker struct {
|
||||
stateStore sm.Store
|
||||
initialState sm.State
|
||||
store sm.BlockStore
|
||||
eventBus types.BlockEventPublisher
|
||||
genDoc *types.GenesisDoc
|
||||
logger log.Logger
|
||||
|
||||
nBlocks int // number of blocks applied to the state
|
||||
}
|
||||
|
||||
func NewHandshaker(stateStore sm.Store, state sm.State,
|
||||
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {
|
||||
|
||||
return &Handshaker{
|
||||
stateStore: stateStore,
|
||||
initialState: state,
|
||||
store: store,
|
||||
eventBus: types.NopEventBus{},
|
||||
genDoc: genDoc,
|
||||
logger: log.NewNopLogger(),
|
||||
nBlocks: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handshaker) SetLogger(l log.Logger) {
|
||||
h.logger = l
|
||||
}
|
||||
|
||||
// SetEventBus - sets the event bus for publishing block related events.
|
||||
// If not called, it defaults to types.NopEventBus.
|
||||
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
|
||||
h.eventBus = eventBus
|
||||
}
|
||||
|
||||
// NBlocks returns the number of blocks applied to the state.
|
||||
func (h *Handshaker) NBlocks() int {
|
||||
return h.nBlocks
|
||||
}
|
||||
|
||||
// TODO: retry the handshake/replay if it fails ?
|
||||
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
||||
|
||||
// Handshake is done via ABCI Info on the query conn.
|
||||
res, err := proxyApp.Query().InfoSync(context.Background(), proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling Info: %v", err)
|
||||
}
|
||||
|
||||
blockHeight := res.LastBlockHeight
|
||||
if blockHeight < 0 {
|
||||
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight)
|
||||
}
|
||||
appHash := res.LastBlockAppHash
|
||||
|
||||
h.logger.Info("ABCI Handshake App Info",
|
||||
"height", blockHeight,
|
||||
"hash", appHash,
|
||||
"software-version", res.Version,
|
||||
"protocol-version", res.AppVersion,
|
||||
)
|
||||
|
||||
// Only set the version if there is no existing state.
|
||||
if h.initialState.LastBlockHeight == 0 {
|
||||
h.initialState.Version.Consensus.App = res.AppVersion
|
||||
}
|
||||
|
||||
// Replay blocks up to the latest in the blockstore.
|
||||
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error on replay: %v", err)
|
||||
}
|
||||
|
||||
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
|
||||
"appHeight", blockHeight, "appHash", appHash)
|
||||
|
||||
// TODO: (on restart) replay mempool
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReplayBlocks replays all blocks since appBlockHeight and ensures the result
|
||||
// matches the current state.
|
||||
// Returns the final AppHash or an error.
|
||||
func (h *Handshaker) ReplayBlocks(
|
||||
state sm.State,
|
||||
appHash []byte,
|
||||
appBlockHeight int64,
|
||||
proxyApp proxy.AppConns,
|
||||
) ([]byte, error) {
|
||||
storeBlockBase := h.store.Base()
|
||||
storeBlockHeight := h.store.Height()
|
||||
stateBlockHeight := state.LastBlockHeight
|
||||
h.logger.Info(
|
||||
"ABCI Replay Blocks",
|
||||
"appHeight",
|
||||
appBlockHeight,
|
||||
"storeHeight",
|
||||
storeBlockHeight,
|
||||
"stateHeight",
|
||||
stateBlockHeight)
|
||||
|
||||
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain.
|
||||
if appBlockHeight == 0 {
|
||||
validators := make([]*types.Validator, len(h.genDoc.Validators))
|
||||
for i, val := range h.genDoc.Validators {
|
||||
validators[i] = types.NewValidator(val.PubKey, val.Power)
|
||||
}
|
||||
validatorSet := types.NewValidatorSet(validators)
|
||||
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
|
||||
pbParams := h.genDoc.ConsensusParams.ToProto()
|
||||
req := abci.RequestInitChain{
|
||||
Time: h.genDoc.GenesisTime,
|
||||
ChainId: h.genDoc.ChainID,
|
||||
InitialHeight: h.genDoc.InitialHeight,
|
||||
ConsensusParams: &pbParams,
|
||||
Validators: nextVals,
|
||||
AppStateBytes: h.genDoc.AppState,
|
||||
}
|
||||
res, err := proxyApp.Consensus().InitChainSync(context.Background(), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
appHash = res.AppHash
|
||||
|
||||
if stateBlockHeight == 0 { // we only update state when we are in initial state
|
||||
// If the app did not return an app hash, we keep the one set from the genesis doc in
|
||||
// the state. We don't set appHash since we don't want the genesis doc app hash
|
||||
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
|
||||
if len(res.AppHash) > 0 {
|
||||
state.AppHash = res.AppHash
|
||||
}
|
||||
// If the app returned validators or consensus params, update the state.
|
||||
if len(res.Validators) > 0 {
|
||||
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
state.Validators = types.NewValidatorSet(vals)
|
||||
state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
|
||||
} else if len(h.genDoc.Validators) == 0 {
|
||||
// If validator set is not set in genesis and still empty after InitChain, exit.
|
||||
return nil, fmt.Errorf("validator set is nil in genesis and still empty after InitChain")
|
||||
}
|
||||
|
||||
if res.ConsensusParams != nil {
|
||||
state.ConsensusParams = state.ConsensusParams.UpdateConsensusParams(res.ConsensusParams)
|
||||
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion
|
||||
}
|
||||
// We update the last results hash with the empty hash, to conform with RFC-6962.
|
||||
state.LastResultsHash = merkle.HashFromByteSlices(nil)
|
||||
if err := h.stateStore.Save(state); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase.
|
||||
switch {
|
||||
case storeBlockHeight == 0:
|
||||
assertAppHashEqualsOneFromState(appHash, state)
|
||||
return appHash, nil
|
||||
|
||||
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase:
|
||||
// the app has no state, and the block store is truncated above the initial height
|
||||
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
|
||||
|
||||
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1:
|
||||
// the app is too far behind truncated store (can be 1 behind since we replay the next)
|
||||
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
|
||||
|
||||
case storeBlockHeight < appBlockHeight:
|
||||
// the app should never be ahead of the store (but this is under app's control)
|
||||
return appHash, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight}
|
||||
|
||||
case storeBlockHeight < stateBlockHeight:
|
||||
// the state should never be ahead of the store (this is under tendermint's control)
|
||||
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
|
||||
|
||||
case storeBlockHeight > stateBlockHeight+1:
|
||||
// store should be at most one ahead of the state (this is under tendermint's control)
|
||||
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
|
||||
}
|
||||
|
||||
var err error
|
||||
// Now either store is equal to state, or one ahead.
|
||||
// For each, consider all cases of where the app could be, given app <= store
|
||||
if storeBlockHeight == stateBlockHeight {
|
||||
// Tendermint ran Commit and saved the state.
|
||||
// Either the app is asking for replay, or we're all synced up.
|
||||
if appBlockHeight < storeBlockHeight {
|
||||
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
|
||||
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// We're good!
|
||||
assertAppHashEqualsOneFromState(appHash, state)
|
||||
return appHash, nil
|
||||
}
|
||||
|
||||
} else if storeBlockHeight == stateBlockHeight+1 {
|
||||
// We saved the block in the store but haven't updated the state,
|
||||
// so we'll need to replay a block using the WAL.
|
||||
switch {
|
||||
case appBlockHeight < stateBlockHeight:
|
||||
// the app is further behind than it should be, so replay blocks
|
||||
// but leave the last block to go through the WAL
|
||||
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
|
||||
|
||||
case appBlockHeight == stateBlockHeight:
|
||||
// We haven't run Commit (both the state and app are one block behind),
|
||||
// so replayBlock with the real app.
|
||||
// NOTE: We could instead use the cs.WAL on cs.Start,
|
||||
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
|
||||
h.logger.Info("Replay last block using real app")
|
||||
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
|
||||
return state.AppHash, err
|
||||
|
||||
case appBlockHeight == storeBlockHeight:
|
||||
// We ran Commit, but didn't save the state, so replayBlock with mock app.
|
||||
abciResponses, err := h.stateStore.LoadABCIResponses(storeBlockHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mockApp := newMockProxyApp(appHash, abciResponses)
|
||||
h.logger.Info("Replay last block using mock app")
|
||||
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
|
||||
return state.AppHash, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d",
|
||||
appBlockHeight, storeBlockHeight, stateBlockHeight))
|
||||
}
|
||||
|
||||
func (h *Handshaker) replayBlocks(
|
||||
state sm.State,
|
||||
proxyApp proxy.AppConns,
|
||||
appBlockHeight,
|
||||
storeBlockHeight int64,
|
||||
mutateState bool) ([]byte, error) {
|
||||
// App is further behind than it should be, so we need to replay blocks.
|
||||
// We replay all blocks from appBlockHeight+1.
|
||||
//
|
||||
// Note that we don't have an old version of the state,
|
||||
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
|
||||
// This also means we won't be saving validator sets if they change during this period.
|
||||
// TODO: Load the historical information to fix this and just use state.ApplyBlock
|
||||
//
|
||||
// If mutateState == true, the final block is replayed with h.replayBlock()
|
||||
|
||||
var appHash []byte
|
||||
var err error
|
||||
finalBlock := storeBlockHeight
|
||||
if mutateState {
|
||||
finalBlock--
|
||||
}
|
||||
firstBlock := appBlockHeight + 1
|
||||
if firstBlock == 1 {
|
||||
firstBlock = state.InitialHeight
|
||||
}
|
||||
for i := firstBlock; i <= finalBlock; i++ {
|
||||
h.logger.Info("Applying block", "height", i)
|
||||
block := h.store.LoadBlock(i)
|
||||
// Extra check to ensure the app was not changed in a way it shouldn't have.
|
||||
if len(appHash) > 0 {
|
||||
assertAppHashEqualsOneFromBlock(appHash, block)
|
||||
}
|
||||
|
||||
if i == finalBlock && !mutateState {
|
||||
// We emit events for the index services at the final block due to the sync issue when
|
||||
// the node shutdown during the block committing status.
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
appHash, err = sm.ExecCommitBlock(
|
||||
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
appHash, err = sm.ExecCommitBlock(
|
||||
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
h.nBlocks++
|
||||
}
|
||||
|
||||
if mutateState {
|
||||
// sync the final block
|
||||
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
appHash = state.AppHash
|
||||
}
|
||||
|
||||
assertAppHashEqualsOneFromState(appHash, state)
|
||||
return appHash, nil
|
||||
}
|
||||
|
||||
// ApplyBlock on the proxyApp with the last block.
|
||||
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
|
||||
block := h.store.LoadBlock(height)
|
||||
meta := h.store.LoadBlockMeta(height)
|
||||
|
||||
// Use stubs for both mempool and evidence pool since no transactions nor
|
||||
// evidence are needed here - block already exists.
|
||||
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
|
||||
var err error
|
||||
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
h.nBlocks++
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) {
|
||||
if !bytes.Equal(appHash, block.AppHash) {
|
||||
panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X.
|
||||
|
||||
Block: %v
|
||||
`,
|
||||
appHash, block.AppHash, block))
|
||||
}
|
||||
}
|
||||
|
||||
func assertAppHashEqualsOneFromState(appHash []byte, state sm.State) {
|
||||
if !bytes.Equal(appHash, state.AppHash) {
|
||||
panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got
|
||||
%X, expected %X.
|
||||
|
||||
State: %v
|
||||
|
||||
Did you reset Tendermint without resetting your application's data?`,
|
||||
appHash, state.AppHash, state))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,13 +323,6 @@ func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.Consensu
|
||||
tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
|
||||
}
|
||||
|
||||
handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
|
||||
handshaker.SetEventBus(eventBus)
|
||||
err = handshaker.Handshake(proxyApp)
|
||||
if err != nil {
|
||||
tmos.Exit(fmt.Sprintf("Error on handshake: %v", err))
|
||||
}
|
||||
|
||||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
|
||||
|
||||
@@ -3,12 +3,9 @@ package consensus
|
||||
import (
|
||||
"context"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -47,48 +44,3 @@ func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
|
||||
|
||||
func (emptyMempool) InitWAL() error { return nil }
|
||||
func (emptyMempool) CloseWAL() {}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// mockProxyApp uses ABCIResponses to give the right results.
|
||||
//
|
||||
// Useful because we don't want to call Commit() twice for the same block on
|
||||
// the real app.
|
||||
|
||||
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
|
||||
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
|
||||
appHash: appHash,
|
||||
abciResponses: abciResponses,
|
||||
})
|
||||
cli, _ := clientCreator()
|
||||
err := cli.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return proxy.NewAppConnConsensus(cli, proxy.NopMetrics())
|
||||
}
|
||||
|
||||
type mockProxyApp struct {
|
||||
abci.BaseApplication
|
||||
|
||||
appHash []byte
|
||||
txCount int
|
||||
abciResponses *tmstate.ABCIResponses
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
|
||||
r := mock.abciResponses.DeliverTxs[mock.txCount]
|
||||
mock.txCount++
|
||||
if r == nil {
|
||||
return abci.ResponseDeliverTx{}
|
||||
}
|
||||
return *r
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
||||
mock.txCount = 0
|
||||
return *mock.abciResponses.EndBlock
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
|
||||
return abci.ResponseCommit{Data: mock.appHash}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
@@ -26,16 +24,12 @@ import (
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/encoding"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
sf "github.com/tendermint/tendermint/internal/state/test/factory"
|
||||
"github.com/tendermint/tendermint/internal/store"
|
||||
"github.com/tendermint/tendermint/internal/test/factory"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -560,109 +554,6 @@ func setupSimulator(t *testing.T) *simulatorTestSuite {
|
||||
return sim
|
||||
}
|
||||
|
||||
// Sync from scratch
|
||||
func TestHandshakeReplayAll(t *testing.T) {
|
||||
sim := setupSimulator(t)
|
||||
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, 0, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, 0, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync many, not from scratch
|
||||
func TestHandshakeReplaySome(t *testing.T) {
|
||||
sim := setupSimulator(t)
|
||||
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, 2, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, 2, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync from lagging by one
|
||||
func TestHandshakeReplayOne(t *testing.T) {
|
||||
sim := setupSimulator(t)
|
||||
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, numBlocks-1, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, numBlocks-1, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync from caught up
|
||||
func TestHandshakeReplayNone(t *testing.T) {
|
||||
sim := setupSimulator(t)
|
||||
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, numBlocks, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, sim, numBlocks, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx
|
||||
func TestMockProxyApp(t *testing.T) {
|
||||
sim := setupSimulator(t) // setup config and simulator
|
||||
cfg := sim.Config
|
||||
assert.NotNil(t, cfg)
|
||||
|
||||
logger := log.TestingLogger()
|
||||
var validTxs, invalidTxs = 0, 0
|
||||
txIndex := 0
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
abciResWithEmptyDeliverTx := new(tmstate.ABCIResponses)
|
||||
abciResWithEmptyDeliverTx.DeliverTxs = make([]*abci.ResponseDeliverTx, 0)
|
||||
abciResWithEmptyDeliverTx.DeliverTxs = append(abciResWithEmptyDeliverTx.DeliverTxs, &abci.ResponseDeliverTx{})
|
||||
|
||||
// called when saveABCIResponses:
|
||||
bytes, err := proto.Marshal(abciResWithEmptyDeliverTx)
|
||||
require.NoError(t, err)
|
||||
loadedAbciRes := new(tmstate.ABCIResponses)
|
||||
|
||||
// this also happens sm.LoadABCIResponses
|
||||
err = proto.Unmarshal(bytes, loadedAbciRes)
|
||||
require.NoError(t, err)
|
||||
|
||||
mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes)
|
||||
|
||||
abciRes := new(tmstate.ABCIResponses)
|
||||
abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs))
|
||||
// Execute transactions and get hash.
|
||||
proxyCb := func(req *abci.Request, res *abci.Response) {
|
||||
if r, ok := res.Value.(*abci.Response_DeliverTx); ok {
|
||||
// TODO: make use of res.Log
|
||||
// TODO: make use of this info
|
||||
// Blocks may include invalid txs.
|
||||
txRes := r.DeliverTx
|
||||
if txRes.Code == abci.CodeTypeOK {
|
||||
validTxs++
|
||||
} else {
|
||||
logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
|
||||
invalidTxs++
|
||||
}
|
||||
abciRes.DeliverTxs[txIndex] = txRes
|
||||
txIndex++
|
||||
}
|
||||
}
|
||||
mock.SetResponseCallback(proxyCb)
|
||||
|
||||
someTx := []byte("tx")
|
||||
_, err = mock.DeliverTxAsync(context.Background(), abci.RequestDeliverTx{Tx: someTx})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
assert.True(t, validTxs == 1)
|
||||
assert.True(t, invalidTxs == 0)
|
||||
}
|
||||
|
||||
func tempWALWithData(data []byte) string {
|
||||
walFile, err := ioutil.TempFile("", "wal")
|
||||
if err != nil {
|
||||
@@ -678,138 +569,6 @@ func tempWALWithData(data []byte) string {
|
||||
return walFile.Name()
|
||||
}
|
||||
|
||||
// Make some blocks. Start a fresh app and apply nBlocks blocks.
|
||||
// Then restart the app and sync it up with the remaining blocks
|
||||
func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mode uint, testValidatorsChange bool) {
|
||||
var chain []*types.Block
|
||||
var commits []*types.Commit
|
||||
var store *mockBlockStore
|
||||
var stateDB dbm.DB
|
||||
var genesisState sm.State
|
||||
|
||||
cfg := sim.Config
|
||||
|
||||
if testValidatorsChange {
|
||||
testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
|
||||
defer func() { _ = os.RemoveAll(testConfig.RootDir) }()
|
||||
stateDB = dbm.NewMemDB()
|
||||
|
||||
genesisState = sim.GenesisState
|
||||
cfg = sim.Config
|
||||
chain = append([]*types.Block{}, sim.Chain...) // copy chain
|
||||
commits = sim.Commits
|
||||
store = newMockBlockStore(cfg, genesisState.ConsensusParams)
|
||||
} else { // test single node
|
||||
testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
|
||||
defer func() { _ = os.RemoveAll(testConfig.RootDir) }()
|
||||
walBody, err := WALWithNBlocks(t, numBlocks)
|
||||
require.NoError(t, err)
|
||||
walFile := tempWALWithData(walBody)
|
||||
cfg.Consensus.SetWalFile(walFile)
|
||||
|
||||
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
wal, err := NewWAL(walFile)
|
||||
require.NoError(t, err)
|
||||
wal.SetLogger(log.TestingLogger())
|
||||
err = wal.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := wal.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
chain, commits, err = makeBlockchainFromWAL(wal)
|
||||
require.NoError(t, err)
|
||||
pubKey, err := privVal.GetPubKey(context.Background())
|
||||
require.NoError(t, err)
|
||||
stateDB, genesisState, store = stateAndStore(cfg, pubKey, kvstore.ProtocolVersion)
|
||||
|
||||
}
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
store.chain = chain
|
||||
store.commits = commits
|
||||
|
||||
state := genesisState.Copy()
|
||||
// run the chain through state.ApplyBlock to build up the tendermint state
|
||||
state = buildTMStateFromChain(cfg, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store)
|
||||
latestAppHash := state.AppHash
|
||||
|
||||
// make a new client creator
|
||||
kvstoreApp := kvstore.NewPersistentKVStoreApplication(
|
||||
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
|
||||
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
|
||||
|
||||
clientCreator2 := abciclient.NewLocalCreator(kvstoreApp)
|
||||
if nBlocks > 0 {
|
||||
// run nBlocks against a new client to build up the app state.
|
||||
// use a throwaway tendermint state
|
||||
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
|
||||
stateDB1 := dbm.NewMemDB()
|
||||
stateStore := sm.NewStore(stateDB1)
|
||||
err := stateStore.Save(genesisState)
|
||||
require.NoError(t, err)
|
||||
buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store)
|
||||
}
|
||||
|
||||
// Prune block store if requested
|
||||
expectError := false
|
||||
if mode == 3 {
|
||||
pruned, err := store.PruneBlocks(2)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, pruned)
|
||||
expectError = int64(nBlocks) < 2
|
||||
}
|
||||
|
||||
// now start the app using the handshake - it should sync
|
||||
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
handshaker := NewHandshaker(stateStore, state, store, genDoc)
|
||||
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
|
||||
if err := proxyApp.Start(); err != nil {
|
||||
t.Fatalf("Error starting proxy app connections: %v", err)
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
if err := proxyApp.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
err := handshaker.Handshake(proxyApp)
|
||||
if expectError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
} else if err != nil {
|
||||
t.Fatalf("Error on abci handshake: %v", err)
|
||||
}
|
||||
|
||||
// get the latest app hash from the app
|
||||
res, err := proxyApp.Query().InfoSync(context.Background(), abci.RequestInfo{Version: ""})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// the app hash should be synced up
|
||||
if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
|
||||
t.Fatalf(
|
||||
"Expected app hashes to match after handshake/replay. got %X, expected %X",
|
||||
res.LastBlockAppHash,
|
||||
latestAppHash)
|
||||
}
|
||||
|
||||
expectedBlocksToSync := numBlocks - nBlocks
|
||||
if nBlocks == numBlocks && mode > 0 {
|
||||
expectedBlocksToSync++
|
||||
} else if nBlocks > 0 && mode == 1 {
|
||||
expectedBlocksToSync++
|
||||
}
|
||||
|
||||
if handshaker.NBlocks() != expectedBlocksToSync {
|
||||
t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
|
||||
}
|
||||
}
|
||||
|
||||
func applyBlock(stateStore sm.Store,
|
||||
mempool mempool.Mempool,
|
||||
evpool sm.EvidencePool,
|
||||
@@ -933,75 +692,6 @@ func buildTMStateFromChain(
|
||||
return state
|
||||
}
|
||||
|
||||
func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
// 1. Initialize tendermint and commit 3 blocks with the following app hashes:
|
||||
// - 0x01
|
||||
// - 0x02
|
||||
// - 0x03
|
||||
cfg := ResetConfig("handshake_test_")
|
||||
t.Cleanup(func() { os.RemoveAll(cfg.RootDir) })
|
||||
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
|
||||
require.NoError(t, err)
|
||||
const appVersion = 0x0
|
||||
pubKey, err := privVal.GetPubKey(context.Background())
|
||||
require.NoError(t, err)
|
||||
stateDB, state, store := stateAndStore(cfg, pubKey, appVersion)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
state.LastValidators = state.Validators.Copy()
|
||||
// mode = 0 for committing all the blocks
|
||||
blocks := sf.MakeBlocks(3, &state, privVal)
|
||||
store.chain = blocks
|
||||
|
||||
// 2. Tendermint must panic if app returns wrong hash for the first block
|
||||
// - RANDOM HASH
|
||||
// - 0x02
|
||||
// - 0x03
|
||||
{
|
||||
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
|
||||
err := proxyApp.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := proxyApp.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
assert.Panics(t, func() {
|
||||
h := NewHandshaker(stateStore, state, store, genDoc)
|
||||
if err = h.Handshake(proxyApp); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// 3. Tendermint must panic if app returns wrong hash for the last block
|
||||
// - 0x01
|
||||
// - 0x02
|
||||
// - RANDOM HASH
|
||||
{
|
||||
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
|
||||
err := proxyApp.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := proxyApp.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
assert.Panics(t, func() {
|
||||
h := NewHandshaker(stateStore, state, store, genDoc)
|
||||
if err = h.Handshake(proxyApp); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type badApp struct {
|
||||
abci.BaseApplication
|
||||
numBlocks byte
|
||||
@@ -1219,52 +909,6 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
|
||||
return pruned, nil
|
||||
}
|
||||
|
||||
//---------------------------------------
|
||||
// Test handshake/init chain
|
||||
|
||||
func TestHandshakeUpdatesValidators(t *testing.T) {
|
||||
val, _ := factory.RandValidator(true, 10)
|
||||
vals := types.NewValidatorSet([]*types.Validator{val})
|
||||
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
|
||||
cfg := ResetConfig("handshake_test_")
|
||||
t.Cleanup(func() { _ = os.RemoveAll(cfg.RootDir) })
|
||||
|
||||
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
|
||||
require.NoError(t, err)
|
||||
pubKey, err := privVal.GetPubKey(context.Background())
|
||||
require.NoError(t, err)
|
||||
stateDB, state, store := stateAndStore(cfg, pubKey, 0x0)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
|
||||
oldValAddr := state.Validators.Validators[0].Address
|
||||
|
||||
// now start the app using the handshake - it should sync
|
||||
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
handshaker := NewHandshaker(stateStore, state, store, genDoc)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
|
||||
if err := proxyApp.Start(); err != nil {
|
||||
t.Fatalf("Error starting proxy app connections: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := proxyApp.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
if err := handshaker.Handshake(proxyApp); err != nil {
|
||||
t.Fatalf("Error on abci handshake: %v", err)
|
||||
}
|
||||
// reload the state, check the validator set was updated
|
||||
state, err = stateStore.Load()
|
||||
require.NoError(t, err)
|
||||
|
||||
newValAddr := state.Validators.Validators[0].Address
|
||||
expectValAddr := val.Address
|
||||
assert.NotEqual(t, oldValAddr, newValAddr)
|
||||
assert.Equal(t, newValAddr, expectValAddr)
|
||||
}
|
||||
|
||||
// returns the vals on InitChain
|
||||
type initChainApp struct {
|
||||
abci.BaseApplication
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -59,3 +64,39 @@ func (EmptyEvidencePool) AddEvidence(types.Evidence) error { retu
|
||||
func (EmptyEvidencePool) Update(State, types.EvidenceList) {}
|
||||
func (EmptyEvidencePool) CheckEvidence(evList types.EvidenceList) error { return nil }
|
||||
func (EmptyEvidencePool) ReportConflictingVotes(voteA, voteB *types.Vote) {}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
type emptyMempool struct{}
|
||||
|
||||
var _ mempool.Mempool = emptyMempool{}
|
||||
|
||||
func (emptyMempool) Lock() {}
|
||||
func (emptyMempool) Unlock() {}
|
||||
func (emptyMempool) Size() int { return 0 }
|
||||
func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
|
||||
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
|
||||
func (emptyMempool) Update(
|
||||
_ int64,
|
||||
_ types.Txs,
|
||||
_ []*abci.ResponseDeliverTx,
|
||||
_ mempool.PreCheckFunc,
|
||||
_ mempool.PostCheckFunc,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
func (emptyMempool) Flush() {}
|
||||
func (emptyMempool) RemoveTxByKey(types.TxKey) error { return nil }
|
||||
func (emptyMempool) FlushAppConn() error { return nil }
|
||||
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
|
||||
func (emptyMempool) EnableTxsAvailable() {}
|
||||
func (emptyMempool) SizeBytes() int64 { return 0 }
|
||||
|
||||
func (emptyMempool) TxsFront() *clist.CElement { return nil }
|
||||
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
|
||||
|
||||
func (emptyMempool) InitWAL() error { return nil }
|
||||
func (emptyMempool) CloseWAL() {}
|
||||
|
||||
367
internal/state/sync.go
Normal file
367
internal/state/sync.go
Normal file
@@ -0,0 +1,367 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// SyncWithApplication is called every time on start up. It handshakes with the app
|
||||
// to understand the current state of the app. If this is the first time running
|
||||
// Tendermint it will call InitChain, feeding in the genesis doc and produce
|
||||
// the initial state. If the application is behind, Tendermint will replay all
|
||||
// blocks in between and assert that the app hashes produced are the same as
|
||||
// before.
|
||||
func SyncWithApplication(
|
||||
stateStore Store,
|
||||
blockStore BlockStore,
|
||||
genDoc *types.GenesisDoc,
|
||||
state State,
|
||||
eventBus types.BlockEventPublisher,
|
||||
proxyApp proxy.AppConns,
|
||||
stateSync bool,
|
||||
logger log.Logger,
|
||||
) (State, error) {
|
||||
|
||||
// Handshake is done via ABCI Info on the query conn. This gives the node
|
||||
// information on the app's latest height, hash and version
|
||||
res, err := proxyApp.Query().InfoSync(context.Background(), proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return State{}, fmt.Errorf("error calling Info: %v", err)
|
||||
}
|
||||
appHash := res.LastBlockAppHash
|
||||
appBlockHeight := res.LastBlockHeight
|
||||
|
||||
// Only set the version if there is no existing state.
|
||||
if state.LastBlockHeight == 0 {
|
||||
state.Version.Consensus.App = res.AppVersion
|
||||
}
|
||||
|
||||
// If application's blockHeight is 0 it means that the app is at genesis and
|
||||
// if the node is not running state sync it will need to InitChain. If
|
||||
// Tendermint itself is at genesis it will save this initial state in the
|
||||
// state store.
|
||||
// NOTE: that the Tendermint node might have existing state and will need to
|
||||
// catch up the application. We nonetheless still need to run InitChain.
|
||||
if appBlockHeight == 0 || !stateSync {
|
||||
state, err = initializeChain(proxyApp, stateStore, state, genDoc, logger)
|
||||
if err != nil {
|
||||
return State{}, fmt.Errorf("error initializing chain: %w", err)
|
||||
}
|
||||
|
||||
appHash = state.AppHash
|
||||
}
|
||||
|
||||
// Check if the Tendermint has prior state. If so it may need to replay
|
||||
// blocks to the application
|
||||
storeBlockBase := blockStore.Base()
|
||||
storeBlockHeight := blockStore.Height()
|
||||
stateBlockHeight := state.LastBlockHeight
|
||||
|
||||
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase.
|
||||
switch {
|
||||
case storeBlockHeight == 0:
|
||||
// Fresh instance of Tendermint. Return early
|
||||
return state, nil
|
||||
|
||||
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase:
|
||||
// the app has no state, and the block store is truncated above the
|
||||
// initial height. The node can't replay those blocks.
|
||||
return State{}, ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
|
||||
|
||||
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1:
|
||||
// the app is too far behind truncated store (can be 1 behind since we replay the next)
|
||||
return State{}, ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
|
||||
|
||||
case storeBlockHeight < appBlockHeight:
|
||||
// the app should never be ahead of the store (but this is under app's control)
|
||||
return State{}, ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight}
|
||||
|
||||
case storeBlockHeight < stateBlockHeight:
|
||||
// the state should never be ahead of the store (this is under Tendermint's control)
|
||||
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
|
||||
|
||||
case storeBlockHeight > stateBlockHeight+1:
|
||||
// store should be at most one ahead of the state (this is under Tendermint's control)
|
||||
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
|
||||
}
|
||||
|
||||
// Now either store is equal to state, or one ahead.
|
||||
// For each, consider all cases of where the app could be, given app <= store
|
||||
if storeBlockHeight == stateBlockHeight {
|
||||
// Tendermint ran Commit and saved the state.
|
||||
// Either the app is asking for replay, or we're all synced up.
|
||||
if appBlockHeight < storeBlockHeight {
|
||||
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
|
||||
return state, replayBlocks(
|
||||
state, proxyApp, blockStore, stateStore, appBlockHeight, storeBlockHeight, false, eventBus, genDoc, logger,
|
||||
)
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// We're all synced up
|
||||
return state, nil
|
||||
}
|
||||
|
||||
} else if storeBlockHeight == stateBlockHeight+1 {
|
||||
// We saved the block in the store but haven't updated the state,
|
||||
// so we'll need to replay a block using the WAL.
|
||||
// NOTE: In all these cases we generate a new state and store it to disk
|
||||
switch {
|
||||
case appBlockHeight < stateBlockHeight:
|
||||
// the app is further behind than it should be, so replay blocks
|
||||
// up to storeBlockHeight and run the
|
||||
err = replayBlocks(state, proxyApp, blockStore, stateStore, appBlockHeight, storeBlockHeight,
|
||||
true, eventBus, genDoc, logger)
|
||||
|
||||
case appBlockHeight == stateBlockHeight:
|
||||
// We haven't run Commit (both the state and app are one block behind),
|
||||
// so replayBlock with the real app.
|
||||
logger.Info("Replay last block using real app")
|
||||
state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, proxyApp.Consensus(), eventBus, logger)
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
|
||||
case appBlockHeight == storeBlockHeight:
|
||||
// We ran Commit, but didn't save the state, so replayBlock with mock app.
|
||||
abciResponses, err := stateStore.LoadABCIResponses(storeBlockHeight)
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
mockApp := newMockProxyApp(appHash, abciResponses)
|
||||
logger.Info("Replay last block using mock app")
|
||||
state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, mockApp, eventBus, logger)
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Save the new state.
|
||||
if err := stateStore.Save(state); err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d",
|
||||
appBlockHeight, storeBlockHeight, stateBlockHeight))
|
||||
}
|
||||
|
||||
// initializeChain sends the genDoc information to the app as part of InitChain.
|
||||
// If Tendermint state hasn't been initialized yet it takes any state changes
|
||||
// from the app, applies them and then persists the initial state.
|
||||
func initializeChain(
|
||||
proxyApp proxy.AppConns,
|
||||
stateStore Store,
|
||||
currentState State,
|
||||
genDoc *types.GenesisDoc,
|
||||
logger log.Logger,
|
||||
) (State, error) {
|
||||
logger.Info("Initializing Chain with Application")
|
||||
|
||||
validators := make([]*types.Validator, len(genDoc.Validators))
|
||||
for i, val := range genDoc.Validators {
|
||||
validators[i] = types.NewValidator(val.PubKey, val.Power)
|
||||
}
|
||||
validatorSet := types.NewValidatorSet(validators)
|
||||
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
|
||||
pbParams := genDoc.ConsensusParams.ToProto()
|
||||
req := abci.RequestInitChain{
|
||||
Time: genDoc.GenesisTime,
|
||||
ChainId: genDoc.ChainID,
|
||||
InitialHeight: genDoc.InitialHeight,
|
||||
ConsensusParams: &pbParams,
|
||||
Validators: nextVals,
|
||||
AppStateBytes: genDoc.AppState,
|
||||
}
|
||||
res, err := proxyApp.Consensus().InitChainSync(context.Background(), req)
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
|
||||
// we only update state when we are in initial state
|
||||
if currentState.LastBlockHeight == 0 {
|
||||
// If the app did not return an app hash, we keep the one set from the genesis doc in
|
||||
// the state. We don't set appHash since we don't want the genesis doc app hash
|
||||
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
|
||||
if len(res.AppHash) > 0 {
|
||||
currentState.AppHash = res.AppHash
|
||||
}
|
||||
// If the app returned validators or consensus params, update the state.
|
||||
if len(res.Validators) > 0 {
|
||||
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
currentState.Validators = types.NewValidatorSet(vals)
|
||||
currentState.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
|
||||
} else if len(genDoc.Validators) == 0 {
|
||||
// If validator set is not set in genesis and still empty after InitChain, exit.
|
||||
return State{}, fmt.Errorf("validator set is nil in genesis and still empty after InitChain")
|
||||
}
|
||||
|
||||
if res.ConsensusParams != nil {
|
||||
currentState.ConsensusParams = currentState.ConsensusParams.UpdateConsensusParams(res.ConsensusParams)
|
||||
currentState.Version.Consensus.App = currentState.ConsensusParams.Version.AppVersion
|
||||
}
|
||||
// We update the last results hash with the empty hash, to conform with RFC-6962.
|
||||
currentState.LastResultsHash = merkle.HashFromByteSlices(nil)
|
||||
|
||||
// We now save the initial state to the stateStore
|
||||
if err := stateStore.Save(currentState); err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return currentState, nil
|
||||
}
|
||||
|
||||
// replayBlocks loads blocks from appBlockHeight to storeBlockHeight and
|
||||
// executes each block against the application. It then checks that the app hash
|
||||
// produced from executing the block matches that of the next block. It does not
|
||||
// mutate Tendermint state in anyway except for when mutateState is true in
|
||||
// which case we persist the response from the final block.
|
||||
func replayBlocks(
|
||||
state State,
|
||||
proxyApp proxy.AppConns,
|
||||
blockStore BlockStore,
|
||||
stateStore Store,
|
||||
appBlockHeight,
|
||||
storeBlockHeight int64,
|
||||
mutateState bool,
|
||||
eventBus types.BlockEventPublisher,
|
||||
genDoc *types.GenesisDoc,
|
||||
logger log.Logger,
|
||||
) error {
|
||||
var appHash []byte
|
||||
var err error
|
||||
finalBlock := storeBlockHeight
|
||||
if mutateState {
|
||||
finalBlock--
|
||||
}
|
||||
firstBlock := appBlockHeight + 1
|
||||
if firstBlock == 1 {
|
||||
firstBlock = state.InitialHeight
|
||||
}
|
||||
for i := firstBlock; i <= finalBlock; i++ {
|
||||
logger.Info("Applying block", "height", i)
|
||||
block := blockStore.LoadBlock(i)
|
||||
|
||||
// Extra check to ensure the app was not changed in a way which changes
|
||||
// the app hash
|
||||
if !bytes.Equal(appHash, block.AppHash) {
|
||||
return fmt.Errorf("block.AppHash does not match AppHash at height %d during replay. Got %X, expected %X",
|
||||
block.Height, appHash, block.AppHash)
|
||||
}
|
||||
|
||||
if i == finalBlock && !mutateState {
|
||||
// We emit events for the index services at the final block due to the sync issue when
|
||||
// the node shutdown during the block committing status.
|
||||
blockExec := NewBlockExecutor(
|
||||
stateStore, logger, proxyApp.Consensus(), emptyMempool{}, EmptyEvidencePool{}, blockStore)
|
||||
blockExec.SetEventBus(eventBus)
|
||||
appHash, err = ExecCommitBlock(
|
||||
blockExec, proxyApp.Consensus(), block, logger, stateStore, genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
appHash, err = ExecCommitBlock(
|
||||
nil, proxyApp.Consensus(), block, logger, stateStore, genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if mutateState {
|
||||
// sync the final block
|
||||
state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, proxyApp.Consensus(), eventBus, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
appHash = state.AppHash
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// replayBlock uses the block executor to
|
||||
func replayBlock(
|
||||
state State,
|
||||
store BlockStore,
|
||||
stateStore Store,
|
||||
height int64,
|
||||
proxyApp proxy.AppConnConsensus,
|
||||
eventBus types.BlockEventPublisher,
|
||||
logger log.Logger,
|
||||
) (State, error) {
|
||||
block := store.LoadBlock(height)
|
||||
meta := store.LoadBlockMeta(height)
|
||||
|
||||
// Use stubs for both mempool and evidence pool since no transactions nor
|
||||
// evidence are needed here - block already exists.
|
||||
blockExec := NewBlockExecutor(stateStore, logger, proxyApp, emptyMempool{}, EmptyEvidencePool{}, store)
|
||||
blockExec.SetEventBus(eventBus)
|
||||
|
||||
var err error
|
||||
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// mockProxyApp uses ABCIResponses to give the right results.
|
||||
//
|
||||
// Useful because we don't want to call Commit() twice for the same block on
|
||||
// the real app.
|
||||
|
||||
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
|
||||
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
|
||||
appHash: appHash,
|
||||
abciResponses: abciResponses,
|
||||
})
|
||||
cli, _ := clientCreator.NewABCIClient()
|
||||
err := cli.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return proxy.NewAppConnConsensus(cli)
|
||||
}
|
||||
|
||||
type mockProxyApp struct {
|
||||
abci.BaseApplication
|
||||
|
||||
appHash []byte
|
||||
txCount int
|
||||
abciResponses *tmstate.ABCIResponses
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
|
||||
r := mock.abciResponses.DeliverTxs[mock.txCount]
|
||||
mock.txCount++
|
||||
if r == nil {
|
||||
return abci.ResponseDeliverTx{}
|
||||
}
|
||||
return *r
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
||||
mock.txCount = 0
|
||||
return *mock.abciResponses.EndBlock
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
|
||||
return abci.ResponseCommit{Data: mock.appHash}
|
||||
}
|
||||
1033
internal/state/sync_test.go
Normal file
1033
internal/state/sync_test.go
Normal file
File diff suppressed because it is too large
Load Diff
53
node/node.go
53
node/node.go
@@ -149,6 +149,8 @@ func makeNode(cfg *config.Config,
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
// Either load state from a previous run or generate the genesis state from
|
||||
// genesis doc
|
||||
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
@@ -223,34 +225,13 @@ func makeNode(cfg *config.Config,
|
||||
stateSync = false
|
||||
}
|
||||
|
||||
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
|
||||
// and replays any blocks as necessary to sync tendermint with the app.
|
||||
consensusLogger := logger.With("module", "consensus")
|
||||
if !stateSync {
|
||||
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
// Reload the state. It will have the Version.Consensus.App set by the
|
||||
// Handshake, and may have other modifications as well (ie. depending on
|
||||
// what happened during block replay).
|
||||
state, err = stateStore.Load()
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("cannot load state: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
}
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := cfg.BlockSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
logNodeStartupInfo(state, pubKey, logger, consensusLogger, cfg.Mode)
|
||||
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
|
||||
|
||||
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
|
||||
// TODO: Use a persistent peer database.
|
||||
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
@@ -303,6 +284,7 @@ func makeNode(cfg *config.Config,
|
||||
sm.BlockExecutorWithMetrics(nodeMetrics.state),
|
||||
)
|
||||
|
||||
consensusLogger := logger.With("module", "consensus")
|
||||
csReactor, csState, err := createConsensusReactor(
|
||||
cfg, state, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
|
||||
@@ -540,6 +522,31 @@ func (n *nodeImpl) OnStart() error {
|
||||
time.Sleep(genTime.Sub(now))
|
||||
}
|
||||
|
||||
// Either load state from a previous run or generate the genesis state from
|
||||
// genesis doc
|
||||
state, err := loadStateFromDBOrGenesisDocProvider(n.stateStore, n.genesisDoc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we are not using state sync, we need to a handshake with the
|
||||
// application. This calls `RequestInfo`, sets the AppVersion on
|
||||
// the state, can call `InitChain` if this is the first time that the
|
||||
// application has run and replays any blocks as necessary to sync
|
||||
// tendermint with the app. We do all this before starting any other service
|
||||
state, err = sm.SyncWithApplication(n.stateStore, n.blockStore, n.genesisDoc, state,
|
||||
n.eventBus, n.proxyApp, n.stateSync, n.Logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Retrieve state
|
||||
pubKey, err := n.privValidator.GetPubKey(context.TODO())
|
||||
if err != nil {
|
||||
fmt.Errorf("failed to retrieve pubKey: %w", err)
|
||||
}
|
||||
logNodeStartupInfo(state, pubKey, n.Logger, n.config.Mode)
|
||||
|
||||
// Start the RPC server before the P2P server
|
||||
// so we can eg. receive txs for the first block
|
||||
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
|
||||
@@ -550,6 +557,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
n.rpcListeners = listeners
|
||||
}
|
||||
|
||||
// If we are using prometheus for metric gathering, start that server.
|
||||
if n.config.Instrumentation.Prometheus &&
|
||||
n.config.Instrumentation.PrometheusListenAddr != "" {
|
||||
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
|
||||
@@ -564,6 +572,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the P2P layer
|
||||
n.isListening = true
|
||||
|
||||
if err = n.router.Start(); err != nil {
|
||||
|
||||
@@ -130,25 +130,7 @@ func createAndStartIndexerService(
|
||||
return indexerService, eventSinks, nil
|
||||
}
|
||||
|
||||
func doHandshake(
|
||||
stateStore sm.Store,
|
||||
state sm.State,
|
||||
blockStore sm.BlockStore,
|
||||
genDoc *types.GenesisDoc,
|
||||
eventBus types.BlockEventPublisher,
|
||||
proxyApp proxy.AppConns,
|
||||
consensusLogger log.Logger) error {
|
||||
|
||||
handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc)
|
||||
handshaker.SetLogger(consensusLogger)
|
||||
handshaker.SetEventBus(eventBus)
|
||||
if err := handshaker.Handshake(proxyApp); err != nil {
|
||||
return fmt.Errorf("error during handshake: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) {
|
||||
// Log the version info.
|
||||
logger.Info("Version info",
|
||||
"tmVersion", version.TMVersion,
|
||||
@@ -166,14 +148,14 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
|
||||
}
|
||||
switch {
|
||||
case mode == config.ModeFull:
|
||||
consensusLogger.Info("This node is a fullnode")
|
||||
logger.Info("This node is a fullnode")
|
||||
case mode == config.ModeValidator:
|
||||
addr := pubKey.Address()
|
||||
// Log whether this node is a validator or an observer
|
||||
if state.Validators.HasAddress(addr) {
|
||||
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
|
||||
logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
|
||||
} else {
|
||||
consensusLogger.Info("This node is a validator (NOT in the active validator set)",
|
||||
logger.Info("This node is a validator (NOT in the active validator set)",
|
||||
"addr", addr, "pubKey", pubKey.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user