fix replay test

This commit is contained in:
Callum Waters
2022-11-23 16:21:23 +01:00
parent 39ff07e648
commit 6ce3ccca49
3 changed files with 69 additions and 36 deletions

View File

@@ -77,9 +77,6 @@ func NewInMemoryApplication() *Application {
// Tendermint will ensure it is in sync with the application by potentially replaying the blocks it has. If the
// Application returns a 0 appBlockHeight, Tendermint will call InitChain to initialize the application with consensus related data
func (app *Application) Info(_ context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
appHash := make([]byte, 8)
binary.PutVarint(appHash, app.state.Size)
// Tendermint expects the application to persist validators, on start-up we need to reload them to memory if they exist
if len(app.valAddrToPubKeyMap) == 0 && app.state.Height > 0 {
validators := app.getValidators()
@@ -97,7 +94,7 @@ func (app *Application) Info(_ context.Context, req *types.RequestInfo) (*types.
Version: version.ABCIVersion,
AppVersion: AppVersion,
LastBlockHeight: app.state.Height,
LastBlockAppHash: appHash,
LastBlockAppHash: app.state.Hash(),
}, nil
}

View File

@@ -266,7 +266,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
}
// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
@@ -391,6 +391,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
fmt.Println("here3")
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight {
@@ -404,6 +405,7 @@ func (h *Handshaker) ReplayBlocks(
// so we'll need to replay a block using the WAL.
switch {
case appBlockHeight < stateBlockHeight:
fmt.Println("here2")
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
@@ -414,10 +416,12 @@ func (h *Handshaker) ReplayBlocks(
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app")
fmt.Println("here4")
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
case appBlockHeight == storeBlockHeight:
fmt.Println("here1")
// We ran Commit, but didn't save the state, so replayBlock with mock app.
finalizeBlockResponse, err := h.stateStore.LoadLastFinalizeBlockResponse(storeBlockHeight)
if err != nil {

View File

@@ -32,6 +32,7 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
smmocks "github.com/tendermint/tendermint/state/mocks"
"github.com/tendermint/tendermint/types"
)
@@ -311,7 +312,7 @@ const numBlocks = 6
var modes = []uint{0, 1, 2, 3}
// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay
func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, []*types.Block, []*types.Commit, sm.State) {
func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*cfg.Config, []*types.Block, []*types.Commit, sm.State) {
nPeers := 7
nVals := 4
css, genDoc, config, cleanup := randConsensusNetWithPeers(
@@ -320,7 +321,9 @@ func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, [
nPeers,
name,
newMockTickerFunc(true),
newPersistentKVStoreWithPath)
func(_ string) abci.Application {
return newKVStore()
})
genesisState, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
t.Cleanup(cleanup)
@@ -536,7 +539,7 @@ func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, [
chain := make([]*types.Block, 0)
commits := make([]*types.Commit, 0)
for i := 1; i <= numBlocks; i++ {
for i := 1; i <= nBlocks; i++ {
chain = append(chain, css[0].blockStore.LoadBlock(int64(i)))
commits = append(commits, css[0].blockStore.LoadBlockCommit(int64(i)))
}
@@ -558,24 +561,36 @@ func TestHandshakeReplayAll(t *testing.T) {
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, false)
testHandshakeReplay(t, config, 2, m, true)
t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) {
testHandshakeReplay(t, config, 2, m, false)
})
t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) {
testHandshakeReplay(t, config, 2, m, true)
})
}
}
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, false)
testHandshakeReplay(t, config, numBlocks-1, m, true)
t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) {
testHandshakeReplay(t, config, numBlocks-1, m, false)
})
t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) {
testHandshakeReplay(t, config, numBlocks-1, m, true)
})
}
}
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, false)
testHandshakeReplay(t, config, numBlocks, m, true)
t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) {
testHandshakeReplay(t, config, numBlocks, m, false)
})
t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) {
testHandshakeReplay(t, config, numBlocks, m, true)
})
}
}
@@ -609,7 +624,8 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
)
if testValidatorsChange {
testConfig, chain, commits, genesisState = setupChainWithChangingValidators(t, fmt.Sprintf("%d_%d_m", nBlocks, mode))
testConfig, chain, commits, genesisState = setupChainWithChangingValidators(t, fmt.Sprintf("%d_%d_m", nBlocks, mode), numBlocks)
stateDB = dbm.NewMemDB()
store = newMockBlockStore(t, config, genesisState.ConsensusParams)
} else {
testConfig = ResetConfig(fmt.Sprintf("%d_%d_s", nBlocks, mode))
@@ -619,9 +635,9 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
walBody, err := WALWithNBlocks(t, numBlocks, testConfig)
require.NoError(t, err)
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
testConfig.Consensus.SetWalFile(walFile)
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
privVal := privval.LoadFilePV(testConfig.PrivValidatorKeyFile(), testConfig.PrivValidatorStateFile())
wal, err := NewWAL(walFile)
require.NoError(t, err)
@@ -637,7 +653,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
require.NoError(t, err)
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
stateDB, genesisState, store = stateAndStore(t, config, pubKey, kvstore.AppVersion)
stateDB, genesisState, store = stateAndStore(t, testConfig, pubKey, kvstore.AppVersion)
}
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
@@ -651,12 +667,11 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
state := genesisState.Copy()
// run the chain through state.ApplyBlock to build up the tendermint state
state = buildTMStateFromChain(t, config, stateStore, mempool, evpool, state, chain, nBlocks, mode, store)
latestAppHash := state.AppHash
state, latestAppHash := buildTMStateFromChain(t, testConfig, stateStore, mempool, evpool, state, chain, nBlocks, mode, store)
// make a new client creator
kvstoreApp := kvstore.NewPersistentApplication(
filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode)))
filepath.Join(testConfig.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode)))
t.Cleanup(func() {
_ = kvstoreApp.Close()
})
@@ -667,12 +682,12 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
// use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1, sm.StoreOptions{
dummyStateStore := sm.NewStore(stateDB1, sm.StoreOptions{
DiscardABCIResponses: false,
})
err := stateStore.Save(genesisState)
err := dummyStateStore.Save(genesisState)
require.NoError(t, err)
buildAppStateFromChain(t, proxyApp, stateStore, mempool, evpool, genesisState, chain, nBlocks, mode, store)
buildAppStateFromChain(t, proxyApp, dummyStateStore, mempool, evpool, genesisState, chain, nBlocks, mode, store)
}
// Prune block store if requested
@@ -685,7 +700,8 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
}
// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
genDoc, err := sm.MakeGenesisDocFromFile(testConfig.GenesisFile())
require.NoError(t, err)
handshaker := NewHandshaker(stateStore, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
@@ -698,19 +714,29 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
}
})
err := handshaker.Handshake(proxyApp)
// perform the replay protocol to sync Tendermint and the application
err = handshaker.Handshake(proxyApp)
if expectError {
require.Error(t, err)
return
} else if err != nil {
t.Fatalf("Error on abci handshake: %v", err)
} else {
require.NoError(t, err)
}
// get the latest app hash from the app
res, err := proxyApp.Query().Info(context.Background(), &abci.RequestInfo{Version: ""})
if err != nil {
t.Fatal(err)
}
res, err := proxyApp.Query().Info(context.Background(), proxy.RequestInfo)
require.NoError(t, err)
// block store and app height should be in sync
require.Equal(t, store.Height(), res.LastBlockHeight)
// tendermint state height and app height should be in sync
state, err = stateStore.Load()
require.NoError(t, err)
require.Equal(t, state.LastBlockHeight, res.LastBlockHeight)
require.Equal(t, int64(numBlocks), res.LastBlockHeight)
fmt.Printf("mode: %d, appHash: %X, data: %s\n", mode, latestAppHash, res.Data)
// the app hash should be synced up
if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
@@ -795,7 +821,7 @@ func buildTMStateFromChain(
chain []*types.Block,
nBlocks int,
mode uint,
bs sm.BlockStore) sm.State {
bs sm.BlockStore) (sm.State, []byte) {
// run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(
kvstore.NewPersistentApplication(
@@ -822,6 +848,7 @@ func buildTMStateFromChain(
for _, block := range chain {
state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs)
}
return state, state.AppHash
case 1, 2, 3:
// sync up to the penultimate as if we stored the block.
@@ -830,14 +857,19 @@ func buildTMStateFromChain(
state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs)
}
dummyStateStore := &smmocks.Store{}
vals, _ := stateStore.LoadValidators(int64(len(chain) - 1))
dummyStateStore.On("LoadValidators", int64(5)).Return(vals, nil)
dummyStateStore.On("Save", mock.Anything).Return(nil)
dummyStateStore.On("SaveFinalizeBlockResponse", mock.Anything, mock.Anything).Return(nil)
// apply the final block to a state copy so we can
// get the right next appHash but keep the state back
applyBlock(t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, bs)
s := applyBlock(t, dummyStateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, bs)
return state, s.AppHash
default:
panic(fmt.Sprintf("unknown mode %v", mode))
}
return state
}
func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {