From 4db618db7aa4df70fe3f8ae30ebb7baf7f2598e7 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 28 Jul 2021 17:39:14 +0200 Subject: [PATCH] separate out replaying of blocks to application and run in OnStart --- internal/consensus/replay.go | 362 ---------- internal/consensus/replay_file.go | 7 - internal/consensus/replay_stubs.go | 47 -- internal/consensus/replay_test.go | 356 ---------- node/handshake.go | 397 +++++++++++ node/handshake_test.go | 1033 ++++++++++++++++++++++++++++ node/node.go | 88 +-- node/setup.go | 26 +- 8 files changed, 1482 insertions(+), 834 deletions(-) create mode 100644 node/handshake.go create mode 100644 node/handshake_test.go diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 9b22f4631..0f1e473e2 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -1,19 +1,12 @@ package consensus import ( - "bytes" - "context" "fmt" "hash/crc32" "io" "reflect" "time" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/crypto/merkle" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" - sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -191,358 +184,3 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc { } } }*/ - -//--------------------------------------------------- -// 2. Recover from failure while applying the block. -// (by handshaking with the app to figure out where -// we were last, and using the WAL to recover there.) -//--------------------------------------------------- - -type Handshaker struct { - stateStore sm.Store - initialState sm.State - store sm.BlockStore - eventBus types.BlockEventPublisher - genDoc *types.GenesisDoc - logger log.Logger - - nBlocks int // number of blocks applied to the state -} - -func NewHandshaker(stateStore sm.Store, state sm.State, - store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker { - - return &Handshaker{ - stateStore: stateStore, - initialState: state, - store: store, - eventBus: types.NopEventBus{}, - genDoc: genDoc, - logger: log.NewNopLogger(), - nBlocks: 0, - } -} - -func (h *Handshaker) SetLogger(l log.Logger) { - h.logger = l -} - -// SetEventBus - sets the event bus for publishing block related events. -// If not called, it defaults to types.NopEventBus. -func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) { - h.eventBus = eventBus -} - -// NBlocks returns the number of blocks applied to the state. -func (h *Handshaker) NBlocks() int { - return h.nBlocks -} - -// TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { - - // Handshake is done via ABCI Info on the query conn. - res, err := proxyApp.Query().InfoSync(context.Background(), proxy.RequestInfo) - if err != nil { - return fmt.Errorf("error calling Info: %v", err) - } - - blockHeight := res.LastBlockHeight - if blockHeight < 0 { - return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight) - } - appHash := res.LastBlockAppHash - - h.logger.Info("ABCI Handshake App Info", - "height", blockHeight, - "hash", appHash, - "software-version", res.Version, - "protocol-version", res.AppVersion, - ) - - // Only set the version if there is no existing state. - if h.initialState.LastBlockHeight == 0 { - h.initialState.Version.Consensus.App = res.AppVersion - } - - // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) - if err != nil { - return fmt.Errorf("error on replay: %v", err) - } - - h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", - "appHeight", blockHeight, "appHash", appHash) - - // TODO: (on restart) replay mempool - - return nil -} - -// ReplayBlocks replays all blocks since appBlockHeight and ensures the result -// matches the current state. -// Returns the final AppHash or an error. -func (h *Handshaker) ReplayBlocks( - state sm.State, - appHash []byte, - appBlockHeight int64, - proxyApp proxy.AppConns, -) ([]byte, error) { - storeBlockBase := h.store.Base() - storeBlockHeight := h.store.Height() - stateBlockHeight := state.LastBlockHeight - h.logger.Info( - "ABCI Replay Blocks", - "appHeight", - appBlockHeight, - "storeHeight", - storeBlockHeight, - "stateHeight", - stateBlockHeight) - - // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain. - if appBlockHeight == 0 { - validators := make([]*types.Validator, len(h.genDoc.Validators)) - for i, val := range h.genDoc.Validators { - validators[i] = types.NewValidator(val.PubKey, val.Power) - } - validatorSet := types.NewValidatorSet(validators) - nextVals := types.TM2PB.ValidatorUpdates(validatorSet) - pbParams := h.genDoc.ConsensusParams.ToProto() - req := abci.RequestInitChain{ - Time: h.genDoc.GenesisTime, - ChainId: h.genDoc.ChainID, - InitialHeight: h.genDoc.InitialHeight, - ConsensusParams: &pbParams, - Validators: nextVals, - AppStateBytes: h.genDoc.AppState, - } - res, err := proxyApp.Consensus().InitChainSync(context.Background(), req) - if err != nil { - return nil, err - } - - appHash = res.AppHash - - if stateBlockHeight == 0 { // we only update state when we are in initial state - // If the app did not return an app hash, we keep the one set from the genesis doc in - // the state. We don't set appHash since we don't want the genesis doc app hash - // recorded in the genesis block. We should probably just remove GenesisDoc.AppHash. - if len(res.AppHash) > 0 { - state.AppHash = res.AppHash - } - // If the app returned validators or consensus params, update the state. - if len(res.Validators) > 0 { - vals, err := types.PB2TM.ValidatorUpdates(res.Validators) - if err != nil { - return nil, err - } - state.Validators = types.NewValidatorSet(vals) - state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1) - } else if len(h.genDoc.Validators) == 0 { - // If validator set is not set in genesis and still empty after InitChain, exit. - return nil, fmt.Errorf("validator set is nil in genesis and still empty after InitChain") - } - - if res.ConsensusParams != nil { - state.ConsensusParams = state.ConsensusParams.UpdateConsensusParams(res.ConsensusParams) - state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion - } - // We update the last results hash with the empty hash, to conform with RFC-6962. - state.LastResultsHash = merkle.HashFromByteSlices(nil) - if err := h.stateStore.Save(state); err != nil { - return nil, err - } - } - } - - // First handle edge cases and constraints on the storeBlockHeight and storeBlockBase. - switch { - case storeBlockHeight == 0: - assertAppHashEqualsOneFromState(appHash, state) - return appHash, nil - - case appBlockHeight == 0 && state.InitialHeight < storeBlockBase: - // the app has no state, and the block store is truncated above the initial height - return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase} - - case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1: - // the app is too far behind truncated store (can be 1 behind since we replay the next) - return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase} - - case storeBlockHeight < appBlockHeight: - // the app should never be ahead of the store (but this is under app's control) - return appHash, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight} - - case storeBlockHeight < stateBlockHeight: - // the state should never be ahead of the store (this is under tendermint's control) - panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight)) - - case storeBlockHeight > stateBlockHeight+1: - // store should be at most one ahead of the state (this is under tendermint's control) - panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) - } - - var err error - // Now either store is equal to state, or one ahead. - // For each, consider all cases of where the app could be, given app <= store - if storeBlockHeight == stateBlockHeight { - // Tendermint ran Commit and saved the state. - // Either the app is asking for replay, or we're all synced up. - if appBlockHeight < storeBlockHeight { - // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) - - } else if appBlockHeight == storeBlockHeight { - // We're good! - assertAppHashEqualsOneFromState(appHash, state) - return appHash, nil - } - - } else if storeBlockHeight == stateBlockHeight+1 { - // We saved the block in the store but haven't updated the state, - // so we'll need to replay a block using the WAL. - switch { - case appBlockHeight < stateBlockHeight: - // the app is further behind than it should be, so replay blocks - // but leave the last block to go through the WAL - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) - - case appBlockHeight == stateBlockHeight: - // We haven't run Commit (both the state and app are one block behind), - // so replayBlock with the real app. - // NOTE: We could instead use the cs.WAL on cs.Start, - // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT - h.logger.Info("Replay last block using real app") - state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) - return state.AppHash, err - - case appBlockHeight == storeBlockHeight: - // We ran Commit, but didn't save the state, so replayBlock with mock app. - abciResponses, err := h.stateStore.LoadABCIResponses(storeBlockHeight) - if err != nil { - return nil, err - } - mockApp := newMockProxyApp(appHash, abciResponses) - h.logger.Info("Replay last block using mock app") - state, err = h.replayBlock(state, storeBlockHeight, mockApp) - return state.AppHash, err - } - - } - - panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d", - appBlockHeight, storeBlockHeight, stateBlockHeight)) -} - -func (h *Handshaker) replayBlocks( - state sm.State, - proxyApp proxy.AppConns, - appBlockHeight, - storeBlockHeight int64, - mutateState bool) ([]byte, error) { - // App is further behind than it should be, so we need to replay blocks. - // We replay all blocks from appBlockHeight+1. - // - // Note that we don't have an old version of the state, - // so we by-pass state validation/mutation using sm.ExecCommitBlock. - // This also means we won't be saving validator sets if they change during this period. - // TODO: Load the historical information to fix this and just use state.ApplyBlock - // - // If mutateState == true, the final block is replayed with h.replayBlock() - - var appHash []byte - var err error - finalBlock := storeBlockHeight - if mutateState { - finalBlock-- - } - firstBlock := appBlockHeight + 1 - if firstBlock == 1 { - firstBlock = state.InitialHeight - } - for i := firstBlock; i <= finalBlock; i++ { - h.logger.Info("Applying block", "height", i) - block := h.store.LoadBlock(i) - // Extra check to ensure the app was not changed in a way it shouldn't have. - if len(appHash) > 0 { - assertAppHashEqualsOneFromBlock(appHash, block) - } - - if i == finalBlock && !mutateState { - // We emit events for the index services at the final block due to the sync issue when - // the node shutdown during the block committing status. - blockExec := sm.NewBlockExecutor( - h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store) - blockExec.SetEventBus(h.eventBus) - appHash, err = sm.ExecCommitBlock( - blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) - if err != nil { - return nil, err - } - } else { - appHash, err = sm.ExecCommitBlock( - nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) - if err != nil { - return nil, err - } - } - - h.nBlocks++ - } - - if mutateState { - // sync the final block - state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) - if err != nil { - return nil, err - } - appHash = state.AppHash - } - - assertAppHashEqualsOneFromState(appHash, state) - return appHash, nil -} - -// ApplyBlock on the proxyApp with the last block. -func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { - block := h.store.LoadBlock(height) - meta := h.store.LoadBlockMeta(height) - - // Use stubs for both mempool and evidence pool since no transactions nor - // evidence are needed here - block already exists. - blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) - blockExec.SetEventBus(h.eventBus) - - var err error - state, err = blockExec.ApplyBlock(state, meta.BlockID, block) - if err != nil { - return sm.State{}, err - } - - h.nBlocks++ - - return state, nil -} - -func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) { - if !bytes.Equal(appHash, block.AppHash) { - panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X. - -Block: %v -`, - appHash, block.AppHash, block)) - } -} - -func assertAppHashEqualsOneFromState(appHash []byte, state sm.State) { - if !bytes.Equal(appHash, state.AppHash) { - panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got -%X, expected %X. - -State: %v - -Did you reset Tendermint without resetting your application's data?`, - appHash, state.AppHash, state)) - } -} diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 51cb090d7..5d15c9a62 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -323,13 +323,6 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err)) } - handshaker := NewHandshaker(stateStore, state, blockStore, gdoc) - handshaker.SetEventBus(eventBus) - err = handshaker.Handshake(proxyApp) - if err != nil { - tmos.Exit(fmt.Sprintf("Error on handshake: %v", err)) - } - mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index c79340a0c..21ac565d8 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -6,8 +6,6 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/libs/clist" mempl "github.com/tendermint/tendermint/internal/mempool" - tmstate "github.com/tendermint/tendermint/proto/tendermint/state" - "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -45,48 +43,3 @@ func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil } func (emptyMempool) InitWAL() error { return nil } func (emptyMempool) CloseWAL() {} - -//----------------------------------------------------------------------------- -// mockProxyApp uses ABCIResponses to give the right results. -// -// Useful because we don't want to call Commit() twice for the same block on -// the real app. - -func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus { - clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ - appHash: appHash, - abciResponses: abciResponses, - }) - cli, _ := clientCreator.NewABCIClient() - err := cli.Start() - if err != nil { - panic(err) - } - return proxy.NewAppConnConsensus(cli) -} - -type mockProxyApp struct { - abci.BaseApplication - - appHash []byte - txCount int - abciResponses *tmstate.ABCIResponses -} - -func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { - r := mock.abciResponses.DeliverTxs[mock.txCount] - mock.txCount++ - if r == nil { - return abci.ResponseDeliverTx{} - } - return *r -} - -func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { - mock.txCount = 0 - return *mock.abciResponses.EndBlock -} - -func (mock *mockProxyApp) Commit() abci.ResponseCommit { - return abci.ResponseCommit{Data: mock.appHash} -} diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index e7c480cea..50abdeaf4 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -1,12 +1,10 @@ package consensus import ( - "bytes" "context" "fmt" "io" "io/ioutil" - "math/rand" "os" "path/filepath" "runtime" @@ -25,15 +23,11 @@ import ( "github.com/tendermint/tendermint/crypto" cryptoenc "github.com/tendermint/tendermint/crypto/encoding" mempl "github.com/tendermint/tendermint/internal/mempool" - "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/tendermint/tendermint/privval" - tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" - sf "github.com/tendermint/tendermint/state/test/factory" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) @@ -559,109 +553,6 @@ func setupSimulator(t *testing.T) *simulatorTestSuite { return sim } -// Sync from scratch -func TestHandshakeReplayAll(t *testing.T) { - sim := setupSimulator(t) - - for _, m := range modes { - testHandshakeReplay(t, sim, 0, m, false) - } - for _, m := range modes { - testHandshakeReplay(t, sim, 0, m, true) - } -} - -// Sync many, not from scratch -func TestHandshakeReplaySome(t *testing.T) { - sim := setupSimulator(t) - - for _, m := range modes { - testHandshakeReplay(t, sim, 2, m, false) - } - for _, m := range modes { - testHandshakeReplay(t, sim, 2, m, true) - } -} - -// Sync from lagging by one -func TestHandshakeReplayOne(t *testing.T) { - sim := setupSimulator(t) - - for _, m := range modes { - testHandshakeReplay(t, sim, numBlocks-1, m, false) - } - for _, m := range modes { - testHandshakeReplay(t, sim, numBlocks-1, m, true) - } -} - -// Sync from caught up -func TestHandshakeReplayNone(t *testing.T) { - sim := setupSimulator(t) - - for _, m := range modes { - testHandshakeReplay(t, sim, numBlocks, m, false) - } - for _, m := range modes { - testHandshakeReplay(t, sim, numBlocks, m, true) - } -} - -// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx -func TestMockProxyApp(t *testing.T) { - sim := setupSimulator(t) // setup config and simulator - config := sim.Config - assert.NotNil(t, config) - - logger := log.TestingLogger() - var validTxs, invalidTxs = 0, 0 - txIndex := 0 - - assert.NotPanics(t, func() { - abciResWithEmptyDeliverTx := new(tmstate.ABCIResponses) - abciResWithEmptyDeliverTx.DeliverTxs = make([]*abci.ResponseDeliverTx, 0) - abciResWithEmptyDeliverTx.DeliverTxs = append(abciResWithEmptyDeliverTx.DeliverTxs, &abci.ResponseDeliverTx{}) - - // called when saveABCIResponses: - bytes, err := proto.Marshal(abciResWithEmptyDeliverTx) - require.NoError(t, err) - loadedAbciRes := new(tmstate.ABCIResponses) - - // this also happens sm.LoadABCIResponses - err = proto.Unmarshal(bytes, loadedAbciRes) - require.NoError(t, err) - - mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes) - - abciRes := new(tmstate.ABCIResponses) - abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs)) - // Execute transactions and get hash. - proxyCb := func(req *abci.Request, res *abci.Response) { - if r, ok := res.Value.(*abci.Response_DeliverTx); ok { - // TODO: make use of res.Log - // TODO: make use of this info - // Blocks may include invalid txs. - txRes := r.DeliverTx - if txRes.Code == abci.CodeTypeOK { - validTxs++ - } else { - logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log) - invalidTxs++ - } - abciRes.DeliverTxs[txIndex] = txRes - txIndex++ - } - } - mock.SetResponseCallback(proxyCb) - - someTx := []byte("tx") - _, err = mock.DeliverTxAsync(context.Background(), abci.RequestDeliverTx{Tx: someTx}) - assert.NoError(t, err) - }) - assert.True(t, validTxs == 1) - assert.True(t, invalidTxs == 0) -} - func tempWALWithData(data []byte) string { walFile, err := ioutil.TempFile("", "wal") if err != nil { @@ -677,138 +568,6 @@ func tempWALWithData(data []byte) string { return walFile.Name() } -// Make some blocks. Start a fresh app and apply nBlocks blocks. -// Then restart the app and sync it up with the remaining blocks -func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mode uint, testValidatorsChange bool) { - var chain []*types.Block - var commits []*types.Commit - var store *mockBlockStore - var stateDB dbm.DB - var genesisState sm.State - - config := sim.Config - - if testValidatorsChange { - testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode)) - defer func() { _ = os.RemoveAll(testConfig.RootDir) }() - stateDB = dbm.NewMemDB() - - genesisState = sim.GenesisState - config = sim.Config - chain = append([]*types.Block{}, sim.Chain...) // copy chain - commits = sim.Commits - store = newMockBlockStore(config, genesisState.ConsensusParams) - } else { // test single node - testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode)) - defer func() { _ = os.RemoveAll(testConfig.RootDir) }() - walBody, err := WALWithNBlocks(t, numBlocks) - require.NoError(t, err) - walFile := tempWALWithData(walBody) - config.Consensus.SetWalFile(walFile) - - privVal, err := privval.LoadFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile()) - require.NoError(t, err) - - wal, err := NewWAL(walFile) - require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) - err = wal.Start() - require.NoError(t, err) - t.Cleanup(func() { - if err := wal.Stop(); err != nil { - t.Error(err) - } - }) - chain, commits, err = makeBlockchainFromWAL(wal) - require.NoError(t, err) - pubKey, err := privVal.GetPubKey(context.Background()) - require.NoError(t, err) - stateDB, genesisState, store = stateAndStore(config, pubKey, kvstore.ProtocolVersion) - - } - stateStore := sm.NewStore(stateDB) - store.chain = chain - store.commits = commits - - state := genesisState.Copy() - // run the chain through state.ApplyBlock to build up the tendermint state - state = buildTMStateFromChain(config, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store) - latestAppHash := state.AppHash - - // make a new client creator - kvstoreApp := kvstore.NewPersistentKVStoreApplication( - filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) - t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) - - clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp) - if nBlocks > 0 { - // run nBlocks against a new client to build up the app state. - // use a throwaway tendermint state - proxyApp := proxy.NewAppConns(clientCreator2) - stateDB1 := dbm.NewMemDB() - stateStore := sm.NewStore(stateDB1) - err := stateStore.Save(genesisState) - require.NoError(t, err) - buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store) - } - - // Prune block store if requested - expectError := false - if mode == 3 { - pruned, err := store.PruneBlocks(2) - require.NoError(t, err) - require.EqualValues(t, 1, pruned) - expectError = int64(nBlocks) < 2 - } - - // now start the app using the handshake - it should sync - genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) - handshaker := NewHandshaker(stateStore, state, store, genDoc) - proxyApp := proxy.NewAppConns(clientCreator2) - if err := proxyApp.Start(); err != nil { - t.Fatalf("Error starting proxy app connections: %v", err) - } - - t.Cleanup(func() { - if err := proxyApp.Stop(); err != nil { - t.Error(err) - } - }) - - err := handshaker.Handshake(proxyApp) - if expectError { - require.Error(t, err) - return - } else if err != nil { - t.Fatalf("Error on abci handshake: %v", err) - } - - // get the latest app hash from the app - res, err := proxyApp.Query().InfoSync(context.Background(), abci.RequestInfo{Version: ""}) - if err != nil { - t.Fatal(err) - } - - // the app hash should be synced up - if !bytes.Equal(latestAppHash, res.LastBlockAppHash) { - t.Fatalf( - "Expected app hashes to match after handshake/replay. got %X, expected %X", - res.LastBlockAppHash, - latestAppHash) - } - - expectedBlocksToSync := numBlocks - nBlocks - if nBlocks == numBlocks && mode > 0 { - expectedBlocksToSync++ - } else if nBlocks > 0 && mode == 1 { - expectedBlocksToSync++ - } - - if handshaker.NBlocks() != expectedBlocksToSync { - t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks()) - } -} - func applyBlock(stateStore sm.Store, mempool mempl.Mempool, evpool sm.EvidencePool, @@ -932,75 +691,6 @@ func buildTMStateFromChain( return state } -func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { - // 1. Initialize tendermint and commit 3 blocks with the following app hashes: - // - 0x01 - // - 0x02 - // - 0x03 - config := ResetConfig("handshake_test_") - t.Cleanup(func() { os.RemoveAll(config.RootDir) }) - privVal, err := privval.LoadFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile()) - require.NoError(t, err) - const appVersion = 0x0 - pubKey, err := privVal.GetPubKey(context.Background()) - require.NoError(t, err) - stateDB, state, store := stateAndStore(config, pubKey, appVersion) - stateStore := sm.NewStore(stateDB) - genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) - state.LastValidators = state.Validators.Copy() - // mode = 0 for committing all the blocks - blocks := sf.MakeBlocks(3, &state, privVal) - store.chain = blocks - - // 2. Tendermint must panic if app returns wrong hash for the first block - // - RANDOM HASH - // - 0x02 - // - 0x03 - { - app := &badApp{numBlocks: 3, allHashesAreWrong: true} - clientCreator := proxy.NewLocalClientCreator(app) - proxyApp := proxy.NewAppConns(clientCreator) - err := proxyApp.Start() - require.NoError(t, err) - t.Cleanup(func() { - if err := proxyApp.Stop(); err != nil { - t.Error(err) - } - }) - - assert.Panics(t, func() { - h := NewHandshaker(stateStore, state, store, genDoc) - if err = h.Handshake(proxyApp); err != nil { - t.Log(err) - } - }) - } - - // 3. Tendermint must panic if app returns wrong hash for the last block - // - 0x01 - // - 0x02 - // - RANDOM HASH - { - app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} - clientCreator := proxy.NewLocalClientCreator(app) - proxyApp := proxy.NewAppConns(clientCreator) - err := proxyApp.Start() - require.NoError(t, err) - t.Cleanup(func() { - if err := proxyApp.Stop(); err != nil { - t.Error(err) - } - }) - - assert.Panics(t, func() { - h := NewHandshaker(stateStore, state, store, genDoc) - if err = h.Handshake(proxyApp); err != nil { - t.Log(err) - } - }) - } -} - type badApp struct { abci.BaseApplication numBlocks byte @@ -1218,52 +908,6 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) { return pruned, nil } -//--------------------------------------- -// Test handshake/init chain - -func TestHandshakeUpdatesValidators(t *testing.T) { - val, _ := factory.RandValidator(true, 10) - vals := types.NewValidatorSet([]*types.Validator{val}) - app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)} - clientCreator := proxy.NewLocalClientCreator(app) - - config := ResetConfig("handshake_test_") - t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) }) - - privVal, err := privval.LoadFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile()) - require.NoError(t, err) - pubKey, err := privVal.GetPubKey(context.Background()) - require.NoError(t, err) - stateDB, state, store := stateAndStore(config, pubKey, 0x0) - stateStore := sm.NewStore(stateDB) - - oldValAddr := state.Validators.Validators[0].Address - - // now start the app using the handshake - it should sync - genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) - handshaker := NewHandshaker(stateStore, state, store, genDoc) - proxyApp := proxy.NewAppConns(clientCreator) - if err := proxyApp.Start(); err != nil { - t.Fatalf("Error starting proxy app connections: %v", err) - } - t.Cleanup(func() { - if err := proxyApp.Stop(); err != nil { - t.Error(err) - } - }) - if err := handshaker.Handshake(proxyApp); err != nil { - t.Fatalf("Error on abci handshake: %v", err) - } - // reload the state, check the validator set was updated - state, err = stateStore.Load() - require.NoError(t, err) - - newValAddr := state.Validators.Validators[0].Address - expectValAddr := val.Address - assert.NotEqual(t, oldValAddr, newValAddr) - assert.Equal(t, newValAddr, expectValAddr) -} - // returns the vals on InitChain type initChainApp struct { abci.BaseApplication diff --git a/node/handshake.go b/node/handshake.go new file mode 100644 index 000000000..cec34a9b5 --- /dev/null +++ b/node/handshake.go @@ -0,0 +1,397 @@ +package node + +import ( + "bytes" + "context" + "fmt" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/crypto/merkle" + "github.com/tendermint/tendermint/internal/libs/clist" + mempl "github.com/tendermint/tendermint/internal/mempool" + "github.com/tendermint/tendermint/libs/log" + tmstate "github.com/tendermint/tendermint/proto/tendermint/state" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +// syncWithApplication is called every time on start up. It handshakes with the app +// to understand the current state of the app. If this is the first time running +// Tendermint it will call InitChain, feeding in the genesis doc and produce +// the initial state. If the application is behind, Tendermint will replay all +// blocks in between and assert that the app hashes produced are the same as +// before. +func syncWithApplication( + stateStore sm.Store, + blockStore sm.BlockStore, + genDoc *types.GenesisDoc, + state sm.State, + eventBus types.BlockEventPublisher, + proxyApp proxy.AppConns, + stateSync bool, + logger log.Logger, +) (sm.State, error) { + + // Handshake is done via ABCI Info on the query conn. This gives the node + // information on the app's latest height, hash and version + res, err := proxyApp.Query().InfoSync(context.Background(), proxy.RequestInfo) + if err != nil { + return sm.State{}, fmt.Errorf("error calling Info: %v", err) + } + appHash := res.LastBlockAppHash + appBlockHeight := res.LastBlockHeight + + // Only set the version if there is no existing state. + if state.LastBlockHeight == 0 { + state.Version.Consensus.App = res.AppVersion + } + + // If application's blockHeight is 0 it means that the app is at genesis and + // if the node is not running state sync it will need to InitChain. If + // Tendermint itself is at genesis it will save this initial state in the + // state store. + // NOTE: that the Tendermint node might have existing state and will need to + // catch up the application. We nonetheless still need to run InitChain. + if appBlockHeight == 0 || !stateSync { + state, err = initializeChain(proxyApp, stateStore, state, genDoc, logger) + if err != nil { + return sm.State{}, fmt.Errorf("error initializing chain: %w", err) + } + + appHash = state.AppHash + } + + // Check if the Tendermint has prior state. If so it may need to replay + // blocks to the application + storeBlockBase := blockStore.Base() + storeBlockHeight := blockStore.Height() + stateBlockHeight := state.LastBlockHeight + + // First handle edge cases and constraints on the storeBlockHeight and storeBlockBase. + switch { + case storeBlockHeight == 0: + // Fresh instance of Tendermint. Return early + return state, nil + + case appBlockHeight == 0 && state.InitialHeight < storeBlockBase: + // the app has no state, and the block store is truncated above the + // initial height. The node can't replay those blocks. + return sm.State{}, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase} + + case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1: + // the app is too far behind truncated store (can be 1 behind since we replay the next) + return sm.State{}, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase} + + case storeBlockHeight < appBlockHeight: + // the app should never be ahead of the store (but this is under app's control) + return sm.State{}, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight} + + case storeBlockHeight < stateBlockHeight: + // the state should never be ahead of the store (this is under Tendermint's control) + panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight)) + + case storeBlockHeight > stateBlockHeight+1: + // store should be at most one ahead of the state (this is under Tendermint's control) + panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) + } + + // Now either store is equal to state, or one ahead. + // For each, consider all cases of where the app could be, given app <= store + if storeBlockHeight == stateBlockHeight { + // Tendermint ran Commit and saved the state. + // Either the app is asking for replay, or we're all synced up. + if appBlockHeight < storeBlockHeight { + // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) + return state, replayBlocks( + state, proxyApp, blockStore, stateStore, appBlockHeight, storeBlockHeight, false, eventBus, genDoc, logger, + ) + + } else if appBlockHeight == storeBlockHeight { + // We're all synced up + return state, nil + } + + } else if storeBlockHeight == stateBlockHeight+1 { + // We saved the block in the store but haven't updated the state, + // so we'll need to replay a block using the WAL. + switch { + case appBlockHeight < stateBlockHeight: + // the app is further behind than it should be, so replay blocks + // up to storeBlockHeight and run the + return state, replayBlocks( + state, proxyApp, blockStore, stateStore, appBlockHeight, storeBlockHeight, true, eventBus, genDoc, logger, + ) + + case appBlockHeight == stateBlockHeight: + // We haven't run Commit (both the state and app are one block behind), + // so replayBlock with the real app. + // NOTE: We could instead use the cs.WAL on cs.Start, + // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT + logger.Info("Replay last block using real app") + _, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, proxyApp.Consensus(), eventBus, logger) + return state, err + + case appBlockHeight == storeBlockHeight: + // We ran Commit, but didn't save the state, so replayBlock with mock app. + abciResponses, err := stateStore.LoadABCIResponses(storeBlockHeight) + if err != nil { + return state, err + } + mockApp := newMockProxyApp(appHash, abciResponses) + logger.Info("Replay last block using mock app") + _, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, mockApp, eventBus, logger) + return state, err + } + + } + + panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d", + appBlockHeight, storeBlockHeight, stateBlockHeight)) +} + +// initializeChain sends the genDoc information to the app as part of InitChain. +// If Tendermint state hasn't been initialized yet it takes any state changes +// from the app, applies them and then persists the initial state. +func initializeChain( + proxyApp proxy.AppConns, + stateStore sm.Store, + currentState sm.State, + genDoc *types.GenesisDoc, + logger log.Logger, +) (sm.State, error) { + logger.Info("Initializing Chain with Application") + + validators := make([]*types.Validator, len(genDoc.Validators)) + for i, val := range genDoc.Validators { + validators[i] = types.NewValidator(val.PubKey, val.Power) + } + validatorSet := types.NewValidatorSet(validators) + nextVals := types.TM2PB.ValidatorUpdates(validatorSet) + pbParams := genDoc.ConsensusParams.ToProto() + req := abci.RequestInitChain{ + Time: genDoc.GenesisTime, + ChainId: genDoc.ChainID, + InitialHeight: genDoc.InitialHeight, + ConsensusParams: &pbParams, + Validators: nextVals, + AppStateBytes: genDoc.AppState, + } + res, err := proxyApp.Consensus().InitChainSync(context.Background(), req) + if err != nil { + return sm.State{}, err + } + + // we only update state when we are in initial state + if currentState.LastBlockHeight == 0 { + // If the app did not return an app hash, we keep the one set from the genesis doc in + // the state. We don't set appHash since we don't want the genesis doc app hash + // recorded in the genesis block. We should probably just remove GenesisDoc.AppHash. + if len(res.AppHash) > 0 { + currentState.AppHash = res.AppHash + } + // If the app returned validators or consensus params, update the state. + if len(res.Validators) > 0 { + vals, err := types.PB2TM.ValidatorUpdates(res.Validators) + if err != nil { + return sm.State{}, err + } + currentState.Validators = types.NewValidatorSet(vals) + currentState.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1) + } else if len(genDoc.Validators) == 0 { + // If validator set is not set in genesis and still empty after InitChain, exit. + return sm.State{}, fmt.Errorf("validator set is nil in genesis and still empty after InitChain") + } + + if res.ConsensusParams != nil { + currentState.ConsensusParams = currentState.ConsensusParams.UpdateConsensusParams(res.ConsensusParams) + currentState.Version.Consensus.App = currentState.ConsensusParams.Version.AppVersion + } + // We update the last results hash with the empty hash, to conform with RFC-6962. + currentState.LastResultsHash = merkle.HashFromByteSlices(nil) + + // We now save the initial state to the stateStore + if err := stateStore.Save(currentState); err != nil { + return sm.State{}, err + } + } + + return currentState, nil +} + +// replayBlocks loads blocks from appBlockHeight to storeBlockHeight and +// executes each block against the application. It then checks that the app hash +// produced from executing the block matches that of the next block. It does not +// mutate Tendermint state in anyway except for when mutateState is true in +// which case we persist the response from the final block. +func replayBlocks( + state sm.State, + proxyApp proxy.AppConns, + blockStore sm.BlockStore, + stateStore sm.Store, + appBlockHeight, + storeBlockHeight int64, + mutateState bool, + eventBus types.BlockEventPublisher, + genDoc *types.GenesisDoc, + logger log.Logger, +) error { + var appHash []byte + var err error + finalBlock := storeBlockHeight + if mutateState { + finalBlock-- + } + firstBlock := appBlockHeight + 1 + if firstBlock == 1 { + firstBlock = state.InitialHeight + } + for i := firstBlock; i <= finalBlock; i++ { + logger.Info("Applying block", "height", i) + block := blockStore.LoadBlock(i) + + // Extra check to ensure the app was not changed in a way which changes + // the app hash + if !bytes.Equal(appHash, block.AppHash) { + return fmt.Errorf("block.AppHash does not match AppHash at height %d during replay. Got %X, expected %X", + block.Height, appHash, block.AppHash) + } + + if i == finalBlock && !mutateState { + // We emit events for the index services at the final block due to the sync issue when + // the node shutdown during the block committing status. + blockExec := sm.NewBlockExecutor( + stateStore, logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, blockStore) + blockExec.SetEventBus(eventBus) + appHash, err = sm.ExecCommitBlock( + blockExec, proxyApp.Consensus(), block, logger, stateStore, genDoc.InitialHeight, state) + if err != nil { + return err + } + } else { + appHash, err = sm.ExecCommitBlock( + nil, proxyApp.Consensus(), block, logger, stateStore, genDoc.InitialHeight, state) + if err != nil { + return err + } + } + } + + if mutateState { + // sync the final block + state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, proxyApp.Consensus(), eventBus, logger) + if err != nil { + return err + } + appHash = state.AppHash + } + + return nil +} + +// replayBlock uses the block executor to +func replayBlock( + state sm.State, + store sm.BlockStore, + stateStore sm.Store, + height int64, + proxyApp proxy.AppConnConsensus, + eventBus types.BlockEventPublisher, + logger log.Logger, +) (sm.State, error) { + block := store.LoadBlock(height) + meta := store.LoadBlockMeta(height) + + // Use stubs for both mempool and evidence pool since no transactions nor + // evidence are needed here - block already exists. + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, store) + blockExec.SetEventBus(eventBus) + + var err error + state, err = blockExec.ApplyBlock(state, meta.BlockID, block) + if err != nil { + return sm.State{}, err + } + + return state, nil +} + +//----------------------------------------------------------------------------- +// mockProxyApp uses ABCIResponses to give the right results. +// +// Useful because we don't want to call Commit() twice for the same block on +// the real app. + +func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus { + clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ + appHash: appHash, + abciResponses: abciResponses, + }) + cli, _ := clientCreator.NewABCIClient() + err := cli.Start() + if err != nil { + panic(err) + } + return proxy.NewAppConnConsensus(cli) +} + +type mockProxyApp struct { + abci.BaseApplication + + appHash []byte + txCount int + abciResponses *tmstate.ABCIResponses +} + +func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { + r := mock.abciResponses.DeliverTxs[mock.txCount] + mock.txCount++ + if r == nil { + return abci.ResponseDeliverTx{} + } + return *r +} + +func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { + mock.txCount = 0 + return *mock.abciResponses.EndBlock +} + +func (mock *mockProxyApp) Commit() abci.ResponseCommit { + return abci.ResponseCommit{Data: mock.appHash} +} + +//----------------------------------------------------------------------------- + +type emptyMempool struct{} + +var _ mempl.Mempool = emptyMempool{} + +func (emptyMempool) Lock() {} +func (emptyMempool) Unlock() {} +func (emptyMempool) Size() int { return 0 } +func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { + return nil +} +func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } +func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } +func (emptyMempool) Update( + _ int64, + _ types.Txs, + _ []*abci.ResponseDeliverTx, + _ mempl.PreCheckFunc, + _ mempl.PostCheckFunc, +) error { + return nil +} +func (emptyMempool) Flush() {} +func (emptyMempool) FlushAppConn() error { return nil } +func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } +func (emptyMempool) EnableTxsAvailable() {} +func (emptyMempool) SizeBytes() int64 { return 0 } + +func (emptyMempool) TxsFront() *clist.CElement { return nil } +func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil } + +func (emptyMempool) InitWAL() error { return nil } +func (emptyMempool) CloseWAL() {} diff --git a/node/handshake_test.go b/node/handshake_test.go new file mode 100644 index 000000000..677e1ef80 --- /dev/null +++ b/node/handshake_test.go @@ -0,0 +1,1033 @@ +package node + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "runtime" + "sort" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/crypto" + cryptoenc "github.com/tendermint/tendermint/crypto/encoding" + mempl "github.com/tendermint/tendermint/internal/mempool" + "github.com/tendermint/tendermint/internal/test/factory" + "github.com/tendermint/tendermint/libs/log" + tmrand "github.com/tendermint/tendermint/libs/rand" + "github.com/tendermint/tendermint/privval" + tmstate "github.com/tendermint/tendermint/proto/tendermint/state" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" + sf "github.com/tendermint/tendermint/state/test/factory" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" +) + +//------------------------------------------------------------------------------------------ +type simulatorTestSuite struct { + GenesisState sm.State + Config *cfg.Config + Chain []*types.Block + Commits []*types.Commit + CleanupFunc cleanupFunc + + Mempool mempl.Mempool + Evpool sm.EvidencePool +} + +const ( + numBlocks = 6 +) + +//--------------------------------------- +// Test handshake/replay + +// 0 - all synced up +// 1 - saved block but app and state are behind +// 2 - save block and committed but state is behind +// 3 - save block and committed with truncated block store and state behind +var modes = []uint{0, 1, 2, 3} + +// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay +func setupSimulator(t *testing.T) *simulatorTestSuite { + t.Helper() + config := configSetup(t) + + sim := &simulatorTestSuite{ + Mempool: emptyMempool{}, + Evpool: sm.EmptyEvidencePool{}, + } + + nPeers := 7 + nVals := 4 + + css, genDoc, config, cleanup := randConsensusNetWithPeers( + config, + nVals, + nPeers, + "replay_test", + newMockTickerFunc(true), + newPersistentKVStoreWithPath) + sim.Config = config + sim.GenesisState, _ = sm.MakeGenesisState(genDoc) + sim.CleanupFunc = cleanup + + partSize := types.BlockPartSizeBytes + + newRoundCh := subscribe(css[0].eventBus, types.EventQueryNewRound) + proposalCh := subscribe(css[0].eventBus, types.EventQueryCompleteProposal) + + vss := make([]*validatorStub, nPeers) + for i := 0; i < nPeers; i++ { + vss[i] = newValidatorStub(css[i].privValidator, int32(i)) + } + height, round := css[0].Height, css[0].Round + + // start the machine + startTestRound(css[0], height, round) + incrementHeight(vss...) + ensureNewRound(newRoundCh, height, 0) + ensureNewProposal(proposalCh, height, round) + rs := css[0].GetRoundState() + + signAddVotes(sim.Config, css[0], tmproto.PrecommitType, + rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), + vss[1:nVals]...) + + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 2 + height++ + incrementHeight(vss...) + newValidatorPubKey1, err := css[nVals].privValidator.GetPubKey(context.Background()) + require.NoError(t, err) + valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1) + require.NoError(t, err) + newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(context.Background(), newValidatorTx1, nil, mempl.TxInfo{}) + assert.Nil(t, err) + propBlock, _ := css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + propBlockParts := propBlock.MakePartSet(partSize) + blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal := types.NewProposal(vss[1].Height, round, -1, blockID) + p := proposal.ToProto() + if err := vss[1].SignProposal(context.Background(), config.ChainID(), p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + signAddVotes(sim.Config, css[0], tmproto.PrecommitType, + rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), + vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 3 + height++ + incrementHeight(vss...) + updateValidatorPubKey1, err := css[nVals].privValidator.GetPubKey(context.Background()) + require.NoError(t, err) + updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1) + require.NoError(t, err) + updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) + err = assertMempool(css[0].txNotifier).CheckTx(context.Background(), updateValidatorTx1, nil, mempl.TxInfo{}) + assert.Nil(t, err) + propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + propBlockParts = propBlock.MakePartSet(partSize) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal = types.NewProposal(vss[2].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[2].SignProposal(context.Background(), config.ChainID(), p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + signAddVotes(sim.Config, css[0], tmproto.PrecommitType, + rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), + vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 4 + height++ + incrementHeight(vss...) + newValidatorPubKey2, err := css[nVals+1].privValidator.GetPubKey(context.Background()) + require.NoError(t, err) + newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2) + require.NoError(t, err) + newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(context.Background(), newValidatorTx2, nil, mempl.TxInfo{}) + assert.Nil(t, err) + newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey(context.Background()) + require.NoError(t, err) + newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3) + require.NoError(t, err) + newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(context.Background(), newValidatorTx3, nil, mempl.TxInfo{}) + assert.Nil(t, err) + propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + propBlockParts = propBlock.MakePartSet(partSize) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + newVss := make([]*validatorStub, nVals+1) + copy(newVss, vss[:nVals+1]) + sort.Sort(ValidatorStubsByPower(newVss)) + + valIndexFn := func(cssIdx int) int { + for i, vs := range newVss { + vsPubKey, err := vs.GetPubKey(context.Background()) + require.NoError(t, err) + + cssPubKey, err := css[cssIdx].privValidator.GetPubKey(context.Background()) + require.NoError(t, err) + + if vsPubKey.Equals(cssPubKey) { + return i + } + } + panic(fmt.Sprintf("validator css[%d] not found in newVss", cssIdx)) + } + + selfIndex := valIndexFn(0) + + proposal = types.NewProposal(vss[3].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[3].SignProposal(context.Background(), config.ChainID(), p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + + removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(context.Background(), removeValidatorTx2, nil, mempl.TxInfo{}) + assert.Nil(t, err) + + rs = css[0].GetRoundState() + for i := 0; i < nVals+1; i++ { + if i == selfIndex { + continue + } + signAddVotes(sim.Config, css[0], + tmproto.PrecommitType, rs.ProposalBlock.Hash(), + rs.ProposalBlockParts.Header(), newVss[i]) + } + + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 5 + height++ + incrementHeight(vss...) + // Reflect the changes to vss[nVals] at height 3 and resort newVss. + newVssIdx := valIndexFn(nVals) + newVss[newVssIdx].VotingPower = 25 + sort.Sort(ValidatorStubsByPower(newVss)) + selfIndex = valIndexFn(0) + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + for i := 0; i < nVals+1; i++ { + if i == selfIndex { + continue + } + signAddVotes(sim.Config, css[0], + tmproto.PrecommitType, rs.ProposalBlock.Hash(), + rs.ProposalBlockParts.Header(), newVss[i]) + } + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 6 + height++ + incrementHeight(vss...) + removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(context.Background(), removeValidatorTx3, nil, mempl.TxInfo{}) + assert.Nil(t, err) + propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + propBlockParts = propBlock.MakePartSet(partSize) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + newVss = make([]*validatorStub, nVals+3) + copy(newVss, vss[:nVals+3]) + sort.Sort(ValidatorStubsByPower(newVss)) + + selfIndex = valIndexFn(0) + proposal = types.NewProposal(vss[1].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[1].SignProposal(context.Background(), config.ChainID(), p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + for i := 0; i < nVals+3; i++ { + if i == selfIndex { + continue + } + signAddVotes(sim.Config, css[0], + tmproto.PrecommitType, rs.ProposalBlock.Hash(), + rs.ProposalBlockParts.Header(), newVss[i]) + } + ensureNewRound(newRoundCh, height+1, 0) + + sim.Chain = make([]*types.Block, 0) + sim.Commits = make([]*types.Commit, 0) + for i := 1; i <= numBlocks; i++ { + sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) + sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) + } + if sim.CleanupFunc != nil { + t.Cleanup(sim.CleanupFunc) + } + + return sim +} + +// Sync from scratch +func TestHandshakeReplayAll(t *testing.T) { + sim := setupSimulator(t) + + for _, m := range modes { + testHandshakeReplay(t, sim, 0, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, sim, 0, m, true) + } +} + +// Sync many, not from scratch +func TestHandshakeReplaySome(t *testing.T) { + sim := setupSimulator(t) + + for _, m := range modes { + testHandshakeReplay(t, sim, 2, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, sim, 2, m, true) + } +} + +// Sync from lagging by one +func TestHandshakeReplayOne(t *testing.T) { + sim := setupSimulator(t) + + for _, m := range modes { + testHandshakeReplay(t, sim, numBlocks-1, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, sim, numBlocks-1, m, true) + } +} + +// Sync from caught up +func TestHandshakeReplayNone(t *testing.T) { + sim := setupSimulator(t) + + for _, m := range modes { + testHandshakeReplay(t, sim, numBlocks, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, sim, numBlocks, m, true) + } +} + +// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx +func TestMockProxyApp(t *testing.T) { + sim := setupSimulator(t) // setup config and simulator + config := sim.Config + assert.NotNil(t, config) + + logger := log.TestingLogger() + var validTxs, invalidTxs = 0, 0 + txIndex := 0 + + assert.NotPanics(t, func() { + abciResWithEmptyDeliverTx := new(tmstate.ABCIResponses) + abciResWithEmptyDeliverTx.DeliverTxs = make([]*abci.ResponseDeliverTx, 0) + abciResWithEmptyDeliverTx.DeliverTxs = append(abciResWithEmptyDeliverTx.DeliverTxs, &abci.ResponseDeliverTx{}) + + // called when saveABCIResponses: + bytes, err := proto.Marshal(abciResWithEmptyDeliverTx) + require.NoError(t, err) + loadedAbciRes := new(tmstate.ABCIResponses) + + // this also happens sm.LoadABCIResponses + err = proto.Unmarshal(bytes, loadedAbciRes) + require.NoError(t, err) + + mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes) + + abciRes := new(tmstate.ABCIResponses) + abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs)) + // Execute transactions and get hash. + proxyCb := func(req *abci.Request, res *abci.Response) { + if r, ok := res.Value.(*abci.Response_DeliverTx); ok { + // TODO: make use of res.Log + // TODO: make use of this info + // Blocks may include invalid txs. + txRes := r.DeliverTx + if txRes.Code == abci.CodeTypeOK { + validTxs++ + } else { + logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log) + invalidTxs++ + } + abciRes.DeliverTxs[txIndex] = txRes + txIndex++ + } + } + mock.SetResponseCallback(proxyCb) + + someTx := []byte("tx") + _, err = mock.DeliverTxAsync(context.Background(), abci.RequestDeliverTx{Tx: someTx}) + assert.NoError(t, err) + }) + assert.True(t, validTxs == 1) + assert.True(t, invalidTxs == 0) +} + +func tempWALWithData(data []byte) string { + walFile, err := ioutil.TempFile("", "wal") + if err != nil { + panic(fmt.Sprintf("failed to create temp WAL file: %v", err)) + } + _, err = walFile.Write(data) + if err != nil { + panic(fmt.Sprintf("failed to write to temp WAL file: %v", err)) + } + if err := walFile.Close(); err != nil { + panic(fmt.Sprintf("failed to close temp WAL file: %v", err)) + } + return walFile.Name() +} + +// Make some blocks. Start a fresh app and apply nBlocks blocks. +// Then restart the app and sync it up with the remaining blocks +func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mode uint, testValidatorsChange bool) { + var chain []*types.Block + var commits []*types.Commit + var store *mockBlockStore + var stateDB dbm.DB + var genesisState sm.State + + config := sim.Config + + if testValidatorsChange { + testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode)) + defer func() { _ = os.RemoveAll(testConfig.RootDir) }() + stateDB = dbm.NewMemDB() + + genesisState = sim.GenesisState + config = sim.Config + chain = append([]*types.Block{}, sim.Chain...) // copy chain + commits = sim.Commits + store = newMockBlockStore(config, genesisState.ConsensusParams) + } else { // test single node + testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode)) + defer func() { _ = os.RemoveAll(testConfig.RootDir) }() + walBody, err := WALWithNBlocks(t, numBlocks) + require.NoError(t, err) + walFile := tempWALWithData(walBody) + config.Consensus.SetWalFile(walFile) + + privVal, err := privval.LoadFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile()) + require.NoError(t, err) + + wal, err := NewWAL(walFile) + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) + err = wal.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := wal.Stop(); err != nil { + t.Error(err) + } + }) + chain, commits, err = makeBlockchainFromWAL(wal) + require.NoError(t, err) + pubKey, err := privVal.GetPubKey(context.Background()) + require.NoError(t, err) + stateDB, genesisState, store = stateAndStore(config, pubKey, kvstore.ProtocolVersion) + + } + stateStore := sm.NewStore(stateDB) + store.chain = chain + store.commits = commits + + state := genesisState.Copy() + // run the chain through state.ApplyBlock to build up the tendermint state + state = buildTMStateFromChain(config, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store) + latestAppHash := state.AppHash + + // make a new client creator + kvstoreApp := kvstore.NewPersistentKVStoreApplication( + filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) + t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) + + clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp) + if nBlocks > 0 { + // run nBlocks against a new client to build up the app state. + // use a throwaway tendermint state + proxyApp := proxy.NewAppConns(clientCreator2) + stateDB1 := dbm.NewMemDB() + stateStore := sm.NewStore(stateDB1) + err := stateStore.Save(genesisState) + require.NoError(t, err) + buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store) + } + + // Prune block store if requested + expectError := false + if mode == 3 { + pruned, err := store.PruneBlocks(2) + require.NoError(t, err) + require.EqualValues(t, 1, pruned) + expectError = int64(nBlocks) < 2 + } + + // now start the app using the handshake - it should sync + genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) + handshaker := NewHandshaker(stateStore, state, store, genDoc) + proxyApp := proxy.NewAppConns(clientCreator2) + if err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + + t.Cleanup(func() { + if err := proxyApp.Stop(); err != nil { + t.Error(err) + } + }) + + err := handshaker.Handshake(proxyApp) + if expectError { + require.Error(t, err) + return + } else if err != nil { + t.Fatalf("Error on abci handshake: %v", err) + } + + // get the latest app hash from the app + res, err := proxyApp.Query().InfoSync(context.Background(), abci.RequestInfo{Version: ""}) + if err != nil { + t.Fatal(err) + } + + // the app hash should be synced up + if !bytes.Equal(latestAppHash, res.LastBlockAppHash) { + t.Fatalf( + "Expected app hashes to match after handshake/replay. got %X, expected %X", + res.LastBlockAppHash, + latestAppHash) + } + + expectedBlocksToSync := numBlocks - nBlocks + if nBlocks == numBlocks && mode > 0 { + expectedBlocksToSync++ + } else if nBlocks > 0 && mode == 1 { + expectedBlocksToSync++ + } + + if handshaker.NBlocks() != expectedBlocksToSync { + t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks()) + } +} + +func applyBlock(stateStore sm.Store, + mempool mempl.Mempool, + evpool sm.EvidencePool, + st sm.State, + blk *types.Block, + proxyApp proxy.AppConns, + blockStore *mockBlockStore) sm.State { + testPartSize := types.BlockPartSizeBytes + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) + + blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()} + newState, err := blockExec.ApplyBlock(st, blkID, blk) + if err != nil { + panic(err) + } + return newState +} + +func buildAppStateFromChain( + proxyApp proxy.AppConns, + stateStore sm.Store, + mempool mempl.Mempool, + evpool sm.EvidencePool, + state sm.State, + chain []*types.Block, + nBlocks int, + mode uint, + blockStore *mockBlockStore) { + // start a new app without handshake, play nBlocks blocks + if err := proxyApp.Start(); err != nil { + panic(err) + } + defer proxyApp.Stop() //nolint:errcheck // ignore + + state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version + validators := types.TM2PB.ValidatorUpdates(state.Validators) + if _, err := proxyApp.Consensus().InitChainSync(context.Background(), abci.RequestInitChain{ + Validators: validators, + }); err != nil { + panic(err) + } + if err := stateStore.Save(state); err != nil { // save height 1's validatorsInfo + panic(err) + } + switch mode { + case 0: + for i := 0; i < nBlocks; i++ { + block := chain[i] + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + } + case 1, 2, 3: + for i := 0; i < nBlocks-1; i++ { + block := chain[i] + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + } + + if mode == 2 || mode == 3 { + // update the kvstore height and apphash + // as if we ran commit but not + state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore) + } + default: + panic(fmt.Sprintf("unknown mode %v", mode)) + } + +} + +func buildTMStateFromChain( + config *cfg.Config, + mempool mempl.Mempool, + evpool sm.EvidencePool, + stateStore sm.Store, + state sm.State, + chain []*types.Block, + nBlocks int, + mode uint, + blockStore *mockBlockStore) sm.State { + // run the whole chain against this client to build up the tendermint state + kvstoreApp := kvstore.NewPersistentKVStoreApplication( + filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) + defer kvstoreApp.Close() + clientCreator := proxy.NewLocalClientCreator(kvstoreApp) + + proxyApp := proxy.NewAppConns(clientCreator) + if err := proxyApp.Start(); err != nil { + panic(err) + } + defer proxyApp.Stop() //nolint:errcheck + + state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version + validators := types.TM2PB.ValidatorUpdates(state.Validators) + if _, err := proxyApp.Consensus().InitChainSync(context.Background(), abci.RequestInitChain{ + Validators: validators, + }); err != nil { + panic(err) + } + if err := stateStore.Save(state); err != nil { // save height 1's validatorsInfo + panic(err) + } + switch mode { + case 0: + // sync right up + for _, block := range chain { + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + } + + case 1, 2, 3: + // sync up to the penultimate as if we stored the block. + // whether we commit or not depends on the appHash + for _, block := range chain[:len(chain)-1] { + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + } + + // apply the final block to a state copy so we can + // get the right next appHash but keep the state back + applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore) + default: + panic(fmt.Sprintf("unknown mode %v", mode)) + } + + return state +} + +func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { + // 1. Initialize tendermint and commit 3 blocks with the following app hashes: + // - 0x01 + // - 0x02 + // - 0x03 + config := ResetConfig("handshake_test_") + t.Cleanup(func() { os.RemoveAll(config.RootDir) }) + privVal, err := privval.LoadFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile()) + require.NoError(t, err) + const appVersion = 0x0 + pubKey, err := privVal.GetPubKey(context.Background()) + require.NoError(t, err) + stateDB, state, store := stateAndStore(config, pubKey, appVersion) + stateStore := sm.NewStore(stateDB) + genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) + state.LastValidators = state.Validators.Copy() + // mode = 0 for committing all the blocks + blocks := sf.MakeBlocks(3, &state, privVal) + store.chain = blocks + + // 2. Tendermint must panic if app returns wrong hash for the first block + // - RANDOM HASH + // - 0x02 + // - 0x03 + { + app := &badApp{numBlocks: 3, allHashesAreWrong: true} + clientCreator := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(clientCreator) + err := proxyApp.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := proxyApp.Stop(); err != nil { + t.Error(err) + } + }) + + assert.Panics(t, func() { + h := NewHandshaker(stateStore, state, store, genDoc) + if err = h.Handshake(proxyApp); err != nil { + t.Log(err) + } + }) + } + + // 3. Tendermint must panic if app returns wrong hash for the last block + // - 0x01 + // - 0x02 + // - RANDOM HASH + { + app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} + clientCreator := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(clientCreator) + err := proxyApp.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := proxyApp.Stop(); err != nil { + t.Error(err) + } + }) + + assert.Panics(t, func() { + h := NewHandshaker(stateStore, state, store, genDoc) + if err = h.Handshake(proxyApp); err != nil { + t.Log(err) + } + }) + } +} + +type badApp struct { + abci.BaseApplication + numBlocks byte + height byte + allHashesAreWrong bool + onlyLastHashIsWrong bool +} + +func (app *badApp) Commit() abci.ResponseCommit { + app.height++ + if app.onlyLastHashIsWrong { + if app.height == app.numBlocks { + return abci.ResponseCommit{Data: tmrand.Bytes(8)} + } + return abci.ResponseCommit{Data: []byte{app.height}} + } else if app.allHashesAreWrong { + return abci.ResponseCommit{Data: tmrand.Bytes(8)} + } + + panic("either allHashesAreWrong or onlyLastHashIsWrong must be set") +} + +//-------------------------- +// utils for making blocks + +func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) { + var height int64 + + // Search for height marker + gr, found, err := wal.SearchForEndHeight(height, &WALSearchOptions{}) + if err != nil { + return nil, nil, err + } + if !found { + return nil, nil, fmt.Errorf("wal does not contain height %d", height) + } + defer gr.Close() + + // log.Notice("Build a blockchain by reading from the WAL") + + var ( + blocks []*types.Block + commits []*types.Commit + thisBlockParts *types.PartSet + thisBlockCommit *types.Commit + ) + + dec := NewWALDecoder(gr) + for { + msg, err := dec.Decode() + if err == io.EOF { + break + } else if err != nil { + return nil, nil, err + } + + piece := readPieceFromWAL(msg) + if piece == nil { + continue + } + + switch p := piece.(type) { + case EndHeightMessage: + // if its not the first one, we have a full block + if thisBlockParts != nil { + var pbb = new(tmproto.Block) + bz, err := ioutil.ReadAll(thisBlockParts.GetReader()) + if err != nil { + panic(err) + } + err = proto.Unmarshal(bz, pbb) + if err != nil { + panic(err) + } + block, err := types.BlockFromProto(pbb) + if err != nil { + panic(err) + } + + if block.Height != height+1 { + panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1)) + } + commitHeight := thisBlockCommit.Height + if commitHeight != height+1 { + panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1)) + } + blocks = append(blocks, block) + commits = append(commits, thisBlockCommit) + height++ + } + case *types.PartSetHeader: + thisBlockParts = types.NewPartSetFromHeader(*p) + case *types.Part: + _, err := thisBlockParts.AddPart(p) + if err != nil { + return nil, nil, err + } + case *types.Vote: + if p.Type == tmproto.PrecommitType { + thisBlockCommit = types.NewCommit(p.Height, p.Round, + p.BlockID, []types.CommitSig{p.CommitSig()}) + } + } + } + // grab the last block too + bz, err := ioutil.ReadAll(thisBlockParts.GetReader()) + if err != nil { + panic(err) + } + var pbb = new(tmproto.Block) + err = proto.Unmarshal(bz, pbb) + if err != nil { + panic(err) + } + block, err := types.BlockFromProto(pbb) + if err != nil { + panic(err) + } + if block.Height != height+1 { + panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1)) + } + commitHeight := thisBlockCommit.Height + if commitHeight != height+1 { + panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1)) + } + blocks = append(blocks, block) + commits = append(commits, thisBlockCommit) + return blocks, commits, nil +} + +func readPieceFromWAL(msg *TimedWALMessage) interface{} { + // for logging + switch m := msg.Msg.(type) { + case msgInfo: + switch msg := m.Msg.(type) { + case *ProposalMessage: + return &msg.Proposal.BlockID.PartSetHeader + case *BlockPartMessage: + return msg.Part + case *VoteMessage: + return msg.Vote + } + case EndHeightMessage: + return m + } + + return nil +} + +// fresh state and mock store +func stateAndStore( + config *cfg.Config, + pubKey crypto.PubKey, + appVersion uint64) (dbm.DB, sm.State, *mockBlockStore) { + stateDB := dbm.NewMemDB() + stateStore := sm.NewStore(stateDB) + state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) + state.Version.Consensus.App = appVersion + store := newMockBlockStore(config, state.ConsensusParams) + if err := stateStore.Save(state); err != nil { + panic(err) + } + return stateDB, state, store +} + +//---------------------------------- +// mock block store + +type mockBlockStore struct { + config *cfg.Config + params types.ConsensusParams + chain []*types.Block + commits []*types.Commit + base int64 +} + +// TODO: NewBlockStore(db.NewMemDB) ... +func newMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore { + return &mockBlockStore{config, params, nil, nil, 0} +} + +func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } +func (bs *mockBlockStore) Base() int64 { return bs.base } +func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } +func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } +func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] } +func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { + return bs.chain[int64(len(bs.chain))-1] +} +func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { + block := bs.chain[height-1] + return &types.BlockMeta{ + BlockID: types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(types.BlockPartSizeBytes).Header()}, + Header: block.Header, + } +} +func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } +func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { +} +func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { + return bs.commits[height-1] +} +func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit { + return bs.commits[height-1] +} + +func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) { + pruned := uint64(0) + for i := int64(0); i < height-1; i++ { + bs.chain[i] = nil + bs.commits[i] = nil + pruned++ + } + bs.base = height + return pruned, nil +} + +//--------------------------------------- +// Test handshake/init chain + +func TestHandshakeUpdatesValidators(t *testing.T) { + val, _ := factory.RandValidator(true, 10) + vals := types.NewValidatorSet([]*types.Validator{val}) + app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)} + clientCreator := proxy.NewLocalClientCreator(app) + + config := ResetConfig("handshake_test_") + t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) }) + + privVal, err := privval.LoadFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile()) + require.NoError(t, err) + pubKey, err := privVal.GetPubKey(context.Background()) + require.NoError(t, err) + stateDB, state, store := stateAndStore(config, pubKey, 0x0) + stateStore := sm.NewStore(stateDB) + + oldValAddr := state.Validators.Validators[0].Address + + // now start the app using the handshake - it should sync + genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) + handshaker := NewHandshaker(stateStore, state, store, genDoc) + proxyApp := proxy.NewAppConns(clientCreator) + if err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + t.Cleanup(func() { + if err := proxyApp.Stop(); err != nil { + t.Error(err) + } + }) + if err := handshaker.Handshake(proxyApp); err != nil { + t.Fatalf("Error on abci handshake: %v", err) + } + // reload the state, check the validator set was updated + state, err = stateStore.Load() + require.NoError(t, err) + + newValAddr := state.Validators.Validators[0].Address + expectValAddr := val.Address + assert.NotEqual(t, oldValAddr, newValAddr) + assert.Equal(t, newValAddr, expectValAddr) +} + +// returns the vals on InitChain +type initChainApp struct { + abci.BaseApplication + vals []abci.ValidatorUpdate +} + +func (ica *initChainApp) InitChain(req abci.RequestInitChain) abci.ResponseInitChain { + return abci.ResponseInitChain{ + Validators: ica.vals, + } +} diff --git a/node/node.go b/node/node.go index cb32c5ce6..dda6b6b1b 100644 --- a/node/node.go +++ b/node/node.go @@ -147,6 +147,8 @@ func makeNode(config *cfg.Config, return nil, fmt.Errorf("error in genesis doc: %w", err) } + // Either load state from a previous run or generate the genesis state from + // genesis doc state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) if err != nil { return nil, err @@ -208,31 +210,11 @@ func makeNode(config *cfg.Config, stateSync = false } - // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, - // and replays any blocks as necessary to sync tendermint with the app. - consensusLogger := logger.With("module", "consensus") - if !stateSync { - if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { - return nil, err - } - - // Reload the state. It will have the Version.Consensus.App set by the - // Handshake, and may have other modifications as well (ie. depending on - // what happened during block replay). - state, err = stateStore.Load() - if err != nil { - return nil, fmt.Errorf("cannot load state: %w", err) - } - } - // Determine whether we should do fast sync. This must happen after the handshake, since the // app may modify the validator set, specifying ourself as the only validator. fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, pubKey) - logNodeStartupInfo(state, pubKey, logger, consensusLogger, config.Mode) - // TODO: Fetch and provide real options and do proper p2p bootstrapping. - // TODO: Use a persistent peer database. nodeInfo, err := makeNodeInfo(config, nodeKey, eventSinks, genDoc, state) if err != nil { return nil, err @@ -279,6 +261,7 @@ func makeNode(config *cfg.Config, sm.BlockExecutorWithMetrics(smMetrics), ) + consensusLogger := logger.With("module", "consensus") csReactorShim, csReactor, csState := createConsensusReactor( config, state, blockExec, blockStore, mp, evPool, privValidator, csMetrics, stateSync || fastSync, eventBus, @@ -347,7 +330,7 @@ func makeNode(config *cfg.Config, config.StateSync.TempDir, ) - // add the channel descriptors to both the transports + // add the channel descriptors to the transport // FIXME: This should be removed when the legacy p2p stack is removed and // transports can either be agnostic to channel descriptors or can be // declared in the constructor. @@ -568,6 +551,30 @@ func (n *nodeImpl) OnStart() error { time.Sleep(genTime.Sub(now)) } + // Either load state from a previous run or generate the genesis state from + // genesis doc + state, err := loadStateFromDBOrGenesisDocProvider(n.stateStore, n.genesisDoc) + if err != nil { + return err + } + + // If we are not using state sync, we need to a handshake with the + // application. This calls `RequestInfo`, sets the AppVersion on + // the state, can call `InitChain` if this is the first time that the + // application has run and replays any blocks as necessary to sync + // tendermint with the app. We do all this before starting any other service + state, err = syncWithApplication(n.stateStore, n.blockStore, n.genesisDoc, state, n.eventBus, n.proxyApp, n.stateSync, n.Logger) + if err != nil { + return err + } + + // Retrieve state + pubKey, err := n.privValidator.GetPubKey(context.TODO()) + if err != nil { + fmt.Errorf("failed to retrieve pubKey: %w", err) + } + logNodeStartupInfo(state, pubKey, n.Logger, n.config.Mode) + // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" && n.config.Mode != cfg.ModeSeed { @@ -578,6 +585,7 @@ func (n *nodeImpl) OnStart() error { n.rpcListeners = listeners } + // If we are using prometheus for metric gathering, start that server. if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) @@ -592,19 +600,32 @@ func (n *nodeImpl) OnStart() error { return err } + // Start the P2P layer n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy) - if n.config.P2P.DisableLegacy { - err = n.router.Start() + if err := n.router.Start(); err != nil { + return err + } } else { // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) - err = n.sw.Start() + if err := n.sw.Start(); err != nil { + return err + } + + // Always connect to persistent peers + err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) + } } - if err != nil { - return err + + // Start the Peer Exhange reactor so we can discover new peers + if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { + if err := n.pexReactorV2.Start(); err != nil { + return err + } } if n.config.Mode != cfg.ModeSeed { @@ -636,19 +657,6 @@ func (n *nodeImpl) OnStart() error { } } - if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { - if err := n.pexReactorV2.Start(); err != nil { - return err - } - } else { - // Always connect to persistent peers - err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) - if err != nil { - return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) - } - - } - // Run state sync if n.stateSync { bcR, ok := n.bcReactor.(cs.FastSyncReactor) diff --git a/node/setup.go b/node/setup.go index cd1875fb6..642fc8c88 100644 --- a/node/setup.go +++ b/node/setup.go @@ -140,25 +140,7 @@ loop: return indexerService, eventSinks, nil } -func doHandshake( - stateStore sm.Store, - state sm.State, - blockStore sm.BlockStore, - genDoc *types.GenesisDoc, - eventBus types.BlockEventPublisher, - proxyApp proxy.AppConns, - consensusLogger log.Logger) error { - - handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) - handshaker.SetLogger(consensusLogger) - handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { - return fmt.Errorf("error during handshake: %v", err) - } - return nil -} - -func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) { +func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) { // Log the version info. logger.Info("Version info", "tmVersion", version.TMVersion, @@ -176,14 +158,14 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL } switch { case mode == cfg.ModeFull: - consensusLogger.Info("This node is a fullnode") + logger.Info("This node is a fullnode") case mode == cfg.ModeValidator: addr := pubKey.Address() // Log whether this node is a validator or an observer if state.Validators.HasAddress(addr) { - consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes()) + logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes()) } else { - consensusLogger.Info("This node is a validator (NOT in the active validator set)", + logger.Info("This node is a validator (NOT in the active validator set)", "addr", addr, "pubKey", pubKey.Bytes()) } }