Compare commits

...

5 Commits

Author SHA1 Message Date
Callum Waters
e2d8666782 fix merge conflicts 2021-10-20 11:34:44 +02:00
Callum Waters
102f1f5af6 Merge branch 'master' into callum/handshake 2021-10-18 10:25:00 +02:00
Callum Waters
64fb13a561 save state after making changes during sync 2021-08-03 10:20:29 +02:00
Callum Waters
5682dd55f9 Merge branch 'master' into callum/handshake 2021-08-02 12:17:03 +02:00
Callum Waters
4db618db7a separate out replaying of blocks to application and run in OnStart 2021-07-28 17:39:14 +02:00
9 changed files with 1479 additions and 820 deletions

View File

@@ -1,19 +1,12 @@
package consensus
import (
"bytes"
"context"
"fmt"
"hash/crc32"
"io"
"reflect"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
@@ -191,358 +184,3 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
}
}
}*/
//---------------------------------------------------
// 2. Recover from failure while applying the block.
// (by handshaking with the app to figure out where
// we were last, and using the WAL to recover there.)
//---------------------------------------------------
type Handshaker struct {
stateStore sm.Store
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
genDoc *types.GenesisDoc
logger log.Logger
nBlocks int // number of blocks applied to the state
}
func NewHandshaker(stateStore sm.Store, state sm.State,
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {
return &Handshaker{
stateStore: stateStore,
initialState: state,
store: store,
eventBus: types.NopEventBus{},
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
}
}
func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}
// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
h.eventBus = eventBus
}
// NBlocks returns the number of blocks applied to the state.
func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().InfoSync(context.Background(), proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %v", err)
}
blockHeight := res.LastBlockHeight
if blockHeight < 0 {
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight)
}
appHash := res.LastBlockAppHash
h.logger.Info("ABCI Handshake App Info",
"height", blockHeight,
"hash", appHash,
"software-version", res.Version,
"protocol-version", res.AppVersion,
)
// Only set the version if there is no existing state.
if h.initialState.LastBlockHeight == 0 {
h.initialState.Version.Consensus.App = res.AppVersion
}
// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
"appHeight", blockHeight, "appHash", appHash)
// TODO: (on restart) replay mempool
return nil
}
// ReplayBlocks replays all blocks since appBlockHeight and ensures the result
// matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
state sm.State,
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
) ([]byte, error) {
storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height()
stateBlockHeight := state.LastBlockHeight
h.logger.Info(
"ABCI Replay Blocks",
"appHeight",
appBlockHeight,
"storeHeight",
storeBlockHeight,
"stateHeight",
stateBlockHeight)
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain.
if appBlockHeight == 0 {
validators := make([]*types.Validator, len(h.genDoc.Validators))
for i, val := range h.genDoc.Validators {
validators[i] = types.NewValidator(val.PubKey, val.Power)
}
validatorSet := types.NewValidatorSet(validators)
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
pbParams := h.genDoc.ConsensusParams.ToProto()
req := abci.RequestInitChain{
Time: h.genDoc.GenesisTime,
ChainId: h.genDoc.ChainID,
InitialHeight: h.genDoc.InitialHeight,
ConsensusParams: &pbParams,
Validators: nextVals,
AppStateBytes: h.genDoc.AppState,
}
res, err := proxyApp.Consensus().InitChainSync(context.Background(), req)
if err != nil {
return nil, err
}
appHash = res.AppHash
if stateBlockHeight == 0 { // we only update state when we are in initial state
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
if len(res.AppHash) > 0 {
state.AppHash = res.AppHash
}
// If the app returned validators or consensus params, update the state.
if len(res.Validators) > 0 {
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
if err != nil {
return nil, err
}
state.Validators = types.NewValidatorSet(vals)
state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
} else if len(h.genDoc.Validators) == 0 {
// If validator set is not set in genesis and still empty after InitChain, exit.
return nil, fmt.Errorf("validator set is nil in genesis and still empty after InitChain")
}
if res.ConsensusParams != nil {
state.ConsensusParams = state.ConsensusParams.UpdateConsensusParams(res.ConsensusParams)
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
state.LastResultsHash = merkle.HashFromByteSlices(nil)
if err := h.stateStore.Save(state); err != nil {
return nil, err
}
}
}
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase.
switch {
case storeBlockHeight == 0:
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase:
// the app has no state, and the block store is truncated above the initial height
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1:
// the app is too far behind truncated store (can be 1 behind since we replay the next)
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case storeBlockHeight < appBlockHeight:
// the app should never be ahead of the store (but this is under app's control)
return appHash, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight}
case storeBlockHeight < stateBlockHeight:
// the state should never be ahead of the store (this is under tendermint's control)
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
case storeBlockHeight > stateBlockHeight+1:
// store should be at most one ahead of the state (this is under tendermint's control)
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
}
var err error
// Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight {
// Tendermint ran Commit and saved the state.
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight {
// We're good!
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
}
} else if storeBlockHeight == stateBlockHeight+1 {
// We saved the block in the store but haven't updated the state,
// so we'll need to replay a block using the WAL.
switch {
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
// so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
case appBlockHeight == storeBlockHeight:
// We ran Commit, but didn't save the state, so replayBlock with mock app.
abciResponses, err := h.stateStore.LoadABCIResponses(storeBlockHeight)
if err != nil {
return nil, err
}
mockApp := newMockProxyApp(appHash, abciResponses)
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err
}
}
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d",
appBlockHeight, storeBlockHeight, stateBlockHeight))
}
func (h *Handshaker) replayBlocks(
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
storeBlockHeight int64,
mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1.
//
// Note that we don't have an old version of the state,
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
// This also means we won't be saving validator sets if they change during this period.
// TODO: Load the historical information to fix this and just use state.ApplyBlock
//
// If mutateState == true, the final block is replayed with h.replayBlock()
var appHash []byte
var err error
finalBlock := storeBlockHeight
if mutateState {
finalBlock--
}
firstBlock := appBlockHeight + 1
if firstBlock == 1 {
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
}
if i == finalBlock && !mutateState {
// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
} else {
appHash, err = sm.ExecCommitBlock(
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
}
h.nBlocks++
}
if mutateState {
// sync the final block
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
}
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
}
// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
}
h.nBlocks++
return state, nil
}
func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) {
if !bytes.Equal(appHash, block.AppHash) {
panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X.
Block: %v
`,
appHash, block.AppHash, block))
}
}
func assertAppHashEqualsOneFromState(appHash []byte, state sm.State) {
if !bytes.Equal(appHash, state.AppHash) {
panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got
%X, expected %X.
State: %v
Did you reset Tendermint without resetting your application's data?`,
appHash, state.AppHash, state))
}
}

View File

@@ -323,13 +323,6 @@ func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.Consensu
tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}
handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
if err != nil {
tmos.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)

View File

@@ -3,12 +3,9 @@ package consensus
import (
"context"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@@ -47,48 +44,3 @@ func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
func (emptyMempool) InitWAL() error { return nil }
func (emptyMempool) CloseWAL() {}
//-----------------------------------------------------------------------------
// mockProxyApp uses ABCIResponses to give the right results.
//
// Useful because we don't want to call Commit() twice for the same block on
// the real app.
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator()
err := cli.Start()
if err != nil {
panic(err)
}
return proxy.NewAppConnConsensus(cli, proxy.NopMetrics())
}
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
txCount int
abciResponses *tmstate.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTxs[mock.txCount]
mock.txCount++
if r == nil {
return abci.ResponseDeliverTx{}
}
return *r
}
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
mock.txCount = 0
return *mock.abciResponses.EndBlock
}
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{Data: mock.appHash}
}

View File

@@ -1,12 +1,10 @@
package consensus
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"runtime"
@@ -26,16 +24,12 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
sf "github.com/tendermint/tendermint/internal/state/test/factory"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/privval"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/types"
)
@@ -560,109 +554,6 @@ func setupSimulator(t *testing.T) *simulatorTestSuite {
return sim
}
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
sim := setupSimulator(t)
for _, m := range modes {
testHandshakeReplay(t, sim, 0, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, sim, 0, m, true)
}
}
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
sim := setupSimulator(t)
for _, m := range modes {
testHandshakeReplay(t, sim, 2, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, sim, 2, m, true)
}
}
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
sim := setupSimulator(t)
for _, m := range modes {
testHandshakeReplay(t, sim, numBlocks-1, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, sim, numBlocks-1, m, true)
}
}
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
sim := setupSimulator(t)
for _, m := range modes {
testHandshakeReplay(t, sim, numBlocks, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, sim, numBlocks, m, true)
}
}
// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx
func TestMockProxyApp(t *testing.T) {
sim := setupSimulator(t) // setup config and simulator
cfg := sim.Config
assert.NotNil(t, cfg)
logger := log.TestingLogger()
var validTxs, invalidTxs = 0, 0
txIndex := 0
assert.NotPanics(t, func() {
abciResWithEmptyDeliverTx := new(tmstate.ABCIResponses)
abciResWithEmptyDeliverTx.DeliverTxs = make([]*abci.ResponseDeliverTx, 0)
abciResWithEmptyDeliverTx.DeliverTxs = append(abciResWithEmptyDeliverTx.DeliverTxs, &abci.ResponseDeliverTx{})
// called when saveABCIResponses:
bytes, err := proto.Marshal(abciResWithEmptyDeliverTx)
require.NoError(t, err)
loadedAbciRes := new(tmstate.ABCIResponses)
// this also happens sm.LoadABCIResponses
err = proto.Unmarshal(bytes, loadedAbciRes)
require.NoError(t, err)
mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes)
abciRes := new(tmstate.ABCIResponses)
abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs))
// Execute transactions and get hash.
proxyCb := func(req *abci.Request, res *abci.Response) {
if r, ok := res.Value.(*abci.Response_DeliverTx); ok {
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
txRes := r.DeliverTx
if txRes.Code == abci.CodeTypeOK {
validTxs++
} else {
logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++
}
abciRes.DeliverTxs[txIndex] = txRes
txIndex++
}
}
mock.SetResponseCallback(proxyCb)
someTx := []byte("tx")
_, err = mock.DeliverTxAsync(context.Background(), abci.RequestDeliverTx{Tx: someTx})
assert.NoError(t, err)
})
assert.True(t, validTxs == 1)
assert.True(t, invalidTxs == 0)
}
func tempWALWithData(data []byte) string {
walFile, err := ioutil.TempFile("", "wal")
if err != nil {
@@ -678,138 +569,6 @@ func tempWALWithData(data []byte) string {
return walFile.Name()
}
// Make some blocks. Start a fresh app and apply nBlocks blocks.
// Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mode uint, testValidatorsChange bool) {
var chain []*types.Block
var commits []*types.Commit
var store *mockBlockStore
var stateDB dbm.DB
var genesisState sm.State
cfg := sim.Config
if testValidatorsChange {
testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
defer func() { _ = os.RemoveAll(testConfig.RootDir) }()
stateDB = dbm.NewMemDB()
genesisState = sim.GenesisState
cfg = sim.Config
chain = append([]*types.Block{}, sim.Chain...) // copy chain
commits = sim.Commits
store = newMockBlockStore(cfg, genesisState.ConsensusParams)
} else { // test single node
testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
defer func() { _ = os.RemoveAll(testConfig.RootDir) }()
walBody, err := WALWithNBlocks(t, numBlocks)
require.NoError(t, err)
walFile := tempWALWithData(walBody)
cfg.Consensus.SetWalFile(walFile)
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
require.NoError(t, err)
wal, err := NewWAL(walFile)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := wal.Stop(); err != nil {
t.Error(err)
}
})
chain, commits, err = makeBlockchainFromWAL(wal)
require.NoError(t, err)
pubKey, err := privVal.GetPubKey(context.Background())
require.NoError(t, err)
stateDB, genesisState, store = stateAndStore(cfg, pubKey, kvstore.ProtocolVersion)
}
stateStore := sm.NewStore(stateDB)
store.chain = chain
store.commits = commits
state := genesisState.Copy()
// run the chain through state.ApplyBlock to build up the tendermint state
state = buildTMStateFromChain(cfg, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store)
latestAppHash := state.AppHash
// make a new client creator
kvstoreApp := kvstore.NewPersistentKVStoreApplication(
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
clientCreator2 := abciclient.NewLocalCreator(kvstoreApp)
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState)
require.NoError(t, err)
buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store)
}
// Prune block store if requested
expectError := false
if mode == 3 {
pruned, err := store.PruneBlocks(2)
require.NoError(t, err)
require.EqualValues(t, 1, pruned)
expectError = int64(nBlocks) < 2
}
// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
handshaker := NewHandshaker(stateStore, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
t.Cleanup(func() {
if err := proxyApp.Stop(); err != nil {
t.Error(err)
}
})
err := handshaker.Handshake(proxyApp)
if expectError {
require.Error(t, err)
return
} else if err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
// get the latest app hash from the app
res, err := proxyApp.Query().InfoSync(context.Background(), abci.RequestInfo{Version: ""})
if err != nil {
t.Fatal(err)
}
// the app hash should be synced up
if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
t.Fatalf(
"Expected app hashes to match after handshake/replay. got %X, expected %X",
res.LastBlockAppHash,
latestAppHash)
}
expectedBlocksToSync := numBlocks - nBlocks
if nBlocks == numBlocks && mode > 0 {
expectedBlocksToSync++
} else if nBlocks > 0 && mode == 1 {
expectedBlocksToSync++
}
if handshaker.NBlocks() != expectedBlocksToSync {
t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
}
}
func applyBlock(stateStore sm.Store,
mempool mempool.Mempool,
evpool sm.EvidencePool,
@@ -933,75 +692,6 @@ func buildTMStateFromChain(
return state
}
func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// 1. Initialize tendermint and commit 3 blocks with the following app hashes:
// - 0x01
// - 0x02
// - 0x03
cfg := ResetConfig("handshake_test_")
t.Cleanup(func() { os.RemoveAll(cfg.RootDir) })
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
require.NoError(t, err)
const appVersion = 0x0
pubKey, err := privVal.GetPubKey(context.Background())
require.NoError(t, err)
stateDB, state, store := stateAndStore(cfg, pubKey, appVersion)
stateStore := sm.NewStore(stateDB)
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
state.LastValidators = state.Validators.Copy()
// mode = 0 for committing all the blocks
blocks := sf.MakeBlocks(3, &state, privVal)
store.chain = blocks
// 2. Tendermint must panic if app returns wrong hash for the first block
// - RANDOM HASH
// - 0x02
// - 0x03
{
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
err := proxyApp.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := proxyApp.Stop(); err != nil {
t.Error(err)
}
})
assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
t.Log(err)
}
})
}
// 3. Tendermint must panic if app returns wrong hash for the last block
// - 0x01
// - 0x02
// - RANDOM HASH
{
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
err := proxyApp.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := proxyApp.Stop(); err != nil {
t.Error(err)
}
})
assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
t.Log(err)
}
})
}
}
type badApp struct {
abci.BaseApplication
numBlocks byte
@@ -1219,52 +909,6 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
return pruned, nil
}
//---------------------------------------
// Test handshake/init chain
func TestHandshakeUpdatesValidators(t *testing.T) {
val, _ := factory.RandValidator(true, 10)
vals := types.NewValidatorSet([]*types.Validator{val})
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
clientCreator := abciclient.NewLocalCreator(app)
cfg := ResetConfig("handshake_test_")
t.Cleanup(func() { _ = os.RemoveAll(cfg.RootDir) })
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
require.NoError(t, err)
pubKey, err := privVal.GetPubKey(context.Background())
require.NoError(t, err)
stateDB, state, store := stateAndStore(cfg, pubKey, 0x0)
stateStore := sm.NewStore(stateDB)
oldValAddr := state.Validators.Validators[0].Address
// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
handshaker := NewHandshaker(stateStore, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
t.Cleanup(func() {
if err := proxyApp.Stop(); err != nil {
t.Error(err)
}
})
if err := handshaker.Handshake(proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
// reload the state, check the validator set was updated
state, err = stateStore.Load()
require.NoError(t, err)
newValAddr := state.Validators.Validators[0].Address
expectValAddr := val.Address
assert.NotEqual(t, oldValAddr, newValAddr)
assert.Equal(t, newValAddr, expectValAddr)
}
// returns the vals on InitChain
type initChainApp struct {
abci.BaseApplication

View File

@@ -1,6 +1,11 @@
package state
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/types"
)
@@ -59,3 +64,39 @@ func (EmptyEvidencePool) AddEvidence(types.Evidence) error { retu
func (EmptyEvidencePool) Update(State, types.EvidenceList) {}
func (EmptyEvidencePool) CheckEvidence(evList types.EvidenceList) error { return nil }
func (EmptyEvidencePool) ReportConflictingVotes(voteA, voteB *types.Vote) {}
//-----------------------------------------------------------------------------
type emptyMempool struct{}
var _ mempool.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error {
return nil
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) Update(
_ int64,
_ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempool.PreCheckFunc,
_ mempool.PostCheckFunc,
) error {
return nil
}
func (emptyMempool) Flush() {}
func (emptyMempool) RemoveTxByKey(types.TxKey) error { return nil }
func (emptyMempool) FlushAppConn() error { return nil }
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
func (emptyMempool) EnableTxsAvailable() {}
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) TxsFront() *clist.CElement { return nil }
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
func (emptyMempool) InitWAL() error { return nil }
func (emptyMempool) CloseWAL() {}

367
internal/state/sync.go Normal file
View File

@@ -0,0 +1,367 @@
package state
import (
"bytes"
"context"
"fmt"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/libs/log"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
"github.com/tendermint/tendermint/types"
)
// SyncWithApplication is called every time on start up. It handshakes with the app
// to understand the current state of the app. If this is the first time running
// Tendermint it will call InitChain, feeding in the genesis doc and produce
// the initial state. If the application is behind, Tendermint will replay all
// blocks in between and assert that the app hashes produced are the same as
// before.
func SyncWithApplication(
stateStore Store,
blockStore BlockStore,
genDoc *types.GenesisDoc,
state State,
eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns,
stateSync bool,
logger log.Logger,
) (State, error) {
// Handshake is done via ABCI Info on the query conn. This gives the node
// information on the app's latest height, hash and version
res, err := proxyApp.Query().InfoSync(context.Background(), proxy.RequestInfo)
if err != nil {
return State{}, fmt.Errorf("error calling Info: %v", err)
}
appHash := res.LastBlockAppHash
appBlockHeight := res.LastBlockHeight
// Only set the version if there is no existing state.
if state.LastBlockHeight == 0 {
state.Version.Consensus.App = res.AppVersion
}
// If application's blockHeight is 0 it means that the app is at genesis and
// if the node is not running state sync it will need to InitChain. If
// Tendermint itself is at genesis it will save this initial state in the
// state store.
// NOTE: that the Tendermint node might have existing state and will need to
// catch up the application. We nonetheless still need to run InitChain.
if appBlockHeight == 0 || !stateSync {
state, err = initializeChain(proxyApp, stateStore, state, genDoc, logger)
if err != nil {
return State{}, fmt.Errorf("error initializing chain: %w", err)
}
appHash = state.AppHash
}
// Check if the Tendermint has prior state. If so it may need to replay
// blocks to the application
storeBlockBase := blockStore.Base()
storeBlockHeight := blockStore.Height()
stateBlockHeight := state.LastBlockHeight
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase.
switch {
case storeBlockHeight == 0:
// Fresh instance of Tendermint. Return early
return state, nil
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase:
// the app has no state, and the block store is truncated above the
// initial height. The node can't replay those blocks.
return State{}, ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1:
// the app is too far behind truncated store (can be 1 behind since we replay the next)
return State{}, ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case storeBlockHeight < appBlockHeight:
// the app should never be ahead of the store (but this is under app's control)
return State{}, ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight}
case storeBlockHeight < stateBlockHeight:
// the state should never be ahead of the store (this is under Tendermint's control)
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
case storeBlockHeight > stateBlockHeight+1:
// store should be at most one ahead of the state (this is under Tendermint's control)
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
}
// Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight {
// Tendermint ran Commit and saved the state.
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return state, replayBlocks(
state, proxyApp, blockStore, stateStore, appBlockHeight, storeBlockHeight, false, eventBus, genDoc, logger,
)
} else if appBlockHeight == storeBlockHeight {
// We're all synced up
return state, nil
}
} else if storeBlockHeight == stateBlockHeight+1 {
// We saved the block in the store but haven't updated the state,
// so we'll need to replay a block using the WAL.
// NOTE: In all these cases we generate a new state and store it to disk
switch {
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// up to storeBlockHeight and run the
err = replayBlocks(state, proxyApp, blockStore, stateStore, appBlockHeight, storeBlockHeight,
true, eventBus, genDoc, logger)
case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
// so replayBlock with the real app.
logger.Info("Replay last block using real app")
state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, proxyApp.Consensus(), eventBus, logger)
if err != nil {
return State{}, err
}
case appBlockHeight == storeBlockHeight:
// We ran Commit, but didn't save the state, so replayBlock with mock app.
abciResponses, err := stateStore.LoadABCIResponses(storeBlockHeight)
if err != nil {
return State{}, err
}
mockApp := newMockProxyApp(appHash, abciResponses)
logger.Info("Replay last block using mock app")
state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, mockApp, eventBus, logger)
if err != nil {
return State{}, err
}
}
// Save the new state.
if err := stateStore.Save(state); err != nil {
return State{}, err
}
return state, nil
}
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d",
appBlockHeight, storeBlockHeight, stateBlockHeight))
}
// initializeChain sends the genDoc information to the app as part of InitChain.
// If Tendermint state hasn't been initialized yet it takes any state changes
// from the app, applies them and then persists the initial state.
func initializeChain(
proxyApp proxy.AppConns,
stateStore Store,
currentState State,
genDoc *types.GenesisDoc,
logger log.Logger,
) (State, error) {
logger.Info("Initializing Chain with Application")
validators := make([]*types.Validator, len(genDoc.Validators))
for i, val := range genDoc.Validators {
validators[i] = types.NewValidator(val.PubKey, val.Power)
}
validatorSet := types.NewValidatorSet(validators)
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
pbParams := genDoc.ConsensusParams.ToProto()
req := abci.RequestInitChain{
Time: genDoc.GenesisTime,
ChainId: genDoc.ChainID,
InitialHeight: genDoc.InitialHeight,
ConsensusParams: &pbParams,
Validators: nextVals,
AppStateBytes: genDoc.AppState,
}
res, err := proxyApp.Consensus().InitChainSync(context.Background(), req)
if err != nil {
return State{}, err
}
// we only update state when we are in initial state
if currentState.LastBlockHeight == 0 {
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
if len(res.AppHash) > 0 {
currentState.AppHash = res.AppHash
}
// If the app returned validators or consensus params, update the state.
if len(res.Validators) > 0 {
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
if err != nil {
return State{}, err
}
currentState.Validators = types.NewValidatorSet(vals)
currentState.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
} else if len(genDoc.Validators) == 0 {
// If validator set is not set in genesis and still empty after InitChain, exit.
return State{}, fmt.Errorf("validator set is nil in genesis and still empty after InitChain")
}
if res.ConsensusParams != nil {
currentState.ConsensusParams = currentState.ConsensusParams.UpdateConsensusParams(res.ConsensusParams)
currentState.Version.Consensus.App = currentState.ConsensusParams.Version.AppVersion
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
currentState.LastResultsHash = merkle.HashFromByteSlices(nil)
// We now save the initial state to the stateStore
if err := stateStore.Save(currentState); err != nil {
return State{}, err
}
}
return currentState, nil
}
// replayBlocks loads blocks from appBlockHeight to storeBlockHeight and
// executes each block against the application. It then checks that the app hash
// produced from executing the block matches that of the next block. It does not
// mutate Tendermint state in anyway except for when mutateState is true in
// which case we persist the response from the final block.
func replayBlocks(
state State,
proxyApp proxy.AppConns,
blockStore BlockStore,
stateStore Store,
appBlockHeight,
storeBlockHeight int64,
mutateState bool,
eventBus types.BlockEventPublisher,
genDoc *types.GenesisDoc,
logger log.Logger,
) error {
var appHash []byte
var err error
finalBlock := storeBlockHeight
if mutateState {
finalBlock--
}
firstBlock := appBlockHeight + 1
if firstBlock == 1 {
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
logger.Info("Applying block", "height", i)
block := blockStore.LoadBlock(i)
// Extra check to ensure the app was not changed in a way which changes
// the app hash
if !bytes.Equal(appHash, block.AppHash) {
return fmt.Errorf("block.AppHash does not match AppHash at height %d during replay. Got %X, expected %X",
block.Height, appHash, block.AppHash)
}
if i == finalBlock && !mutateState {
// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
blockExec := NewBlockExecutor(
stateStore, logger, proxyApp.Consensus(), emptyMempool{}, EmptyEvidencePool{}, blockStore)
blockExec.SetEventBus(eventBus)
appHash, err = ExecCommitBlock(
blockExec, proxyApp.Consensus(), block, logger, stateStore, genDoc.InitialHeight, state)
if err != nil {
return err
}
} else {
appHash, err = ExecCommitBlock(
nil, proxyApp.Consensus(), block, logger, stateStore, genDoc.InitialHeight, state)
if err != nil {
return err
}
}
}
if mutateState {
// sync the final block
state, err = replayBlock(state, blockStore, stateStore, storeBlockHeight, proxyApp.Consensus(), eventBus, logger)
if err != nil {
return err
}
appHash = state.AppHash
}
return nil
}
// replayBlock uses the block executor to
func replayBlock(
state State,
store BlockStore,
stateStore Store,
height int64,
proxyApp proxy.AppConnConsensus,
eventBus types.BlockEventPublisher,
logger log.Logger,
) (State, error) {
block := store.LoadBlock(height)
meta := store.LoadBlockMeta(height)
// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := NewBlockExecutor(stateStore, logger, proxyApp, emptyMempool{}, EmptyEvidencePool{}, store)
blockExec.SetEventBus(eventBus)
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return State{}, err
}
return state, nil
}
//-----------------------------------------------------------------------------
// mockProxyApp uses ABCIResponses to give the right results.
//
// Useful because we don't want to call Commit() twice for the same block on
// the real app.
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator.NewABCIClient()
err := cli.Start()
if err != nil {
panic(err)
}
return proxy.NewAppConnConsensus(cli)
}
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
txCount int
abciResponses *tmstate.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTxs[mock.txCount]
mock.txCount++
if r == nil {
return abci.ResponseDeliverTx{}
}
return *r
}
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
mock.txCount = 0
return *mock.abciResponses.EndBlock
}
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{Data: mock.appHash}
}

1033
internal/state/sync_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -149,6 +149,8 @@ func makeNode(cfg *config.Config,
makeCloser(closers))
}
// Either load state from a previous run or generate the genesis state from
// genesis doc
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -223,34 +225,13 @@ func makeNode(cfg *config.Config,
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, combineCloseError(
fmt.Errorf("cannot load state: %w", err),
makeCloser(closers))
}
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := cfg.BlockSync.Enable && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, consensusLogger, cfg.Mode)
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -303,6 +284,7 @@ func makeNode(cfg *config.Config,
sm.BlockExecutorWithMetrics(nodeMetrics.state),
)
consensusLogger := logger.With("module", "consensus")
csReactor, csState, err := createConsensusReactor(
cfg, state, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
@@ -540,6 +522,31 @@ func (n *nodeImpl) OnStart() error {
time.Sleep(genTime.Sub(now))
}
// Either load state from a previous run or generate the genesis state from
// genesis doc
state, err := loadStateFromDBOrGenesisDocProvider(n.stateStore, n.genesisDoc)
if err != nil {
return err
}
// If we are not using state sync, we need to a handshake with the
// application. This calls `RequestInfo`, sets the AppVersion on
// the state, can call `InitChain` if this is the first time that the
// application has run and replays any blocks as necessary to sync
// tendermint with the app. We do all this before starting any other service
state, err = sm.SyncWithApplication(n.stateStore, n.blockStore, n.genesisDoc, state,
n.eventBus, n.proxyApp, n.stateSync, n.Logger)
if err != nil {
return err
}
// Retrieve state
pubKey, err := n.privValidator.GetPubKey(context.TODO())
if err != nil {
fmt.Errorf("failed to retrieve pubKey: %w", err)
}
logNodeStartupInfo(state, pubKey, n.Logger, n.config.Mode)
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
@@ -550,6 +557,7 @@ func (n *nodeImpl) OnStart() error {
n.rpcListeners = listeners
}
// If we are using prometheus for metric gathering, start that server.
if n.config.Instrumentation.Prometheus &&
n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
@@ -564,6 +572,7 @@ func (n *nodeImpl) OnStart() error {
return err
}
// Start the P2P layer
n.isListening = true
if err = n.router.Start(); err != nil {

View File

@@ -130,25 +130,7 @@ func createAndStartIndexerService(
return indexerService, eventSinks, nil
}
func doHandshake(
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
genDoc *types.GenesisDoc,
eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns,
consensusLogger log.Logger) error {
handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) {
// Log the version info.
logger.Info("Version info",
"tmVersion", version.TMVersion,
@@ -166,14 +148,14 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
}
switch {
case mode == config.ModeFull:
consensusLogger.Info("This node is a fullnode")
logger.Info("This node is a fullnode")
case mode == config.ModeValidator:
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
} else {
consensusLogger.Info("This node is a validator (NOT in the active validator set)",
logger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr, "pubKey", pubKey.Bytes())
}
}