From c3d5634efa73dd6ce2c9d7c317aeae76038139ae Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 22 Aug 2016 15:57:20 -0400 Subject: [PATCH 01/13] begin block --- proxy/app_conn.go | 10 +++++----- types/protobuf.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) create mode 100644 types/protobuf.go diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 5ba1bc158..e19e6f09f 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -14,7 +14,7 @@ type AppConnConsensus interface { InitChainSync(validators []*types.Validator) (err error) - BeginBlockSync(height uint64) (err error) + BeginBlockSync(header *types.Header) (err error) AppendTxAsync(tx []byte) *tmspcli.ReqRes EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) CommitSync() (res types.Result) @@ -34,7 +34,7 @@ type AppConnQuery interface { Error() error EchoSync(string) (res types.Result) - InfoSync() (res types.Result) + InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) QuerySync(tx []byte) (res types.Result) // SetOptionSync(key string, value string) (res types.Result) @@ -62,8 +62,8 @@ func (app *appConnConsensus) Error() error { func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { return app.appConn.InitChainSync(validators) } -func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) { - return app.appConn.BeginBlockSync(height) +func (app *appConnConsensus) BeginBlockSync(header *types.Header) (err error) { + return app.appConn.BeginBlockSync(header) } func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { return app.appConn.AppendTxAsync(tx) @@ -131,7 +131,7 @@ func (app *appConnQuery) EchoSync(msg string) (res types.Result) { return app.appConn.EchoSync(msg) } -func (app *appConnQuery) InfoSync() (res types.Result) { +func (app *appConnQuery) InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) { return app.appConn.InfoSync() } diff --git a/types/protobuf.go b/types/protobuf.go new file mode 100644 index 000000000..c6d33cf25 --- /dev/null +++ b/types/protobuf.go @@ -0,0 +1,37 @@ +package types + +import ( + "github.com/tendermint/tmsp/types" +) + +// Convert tendermint types to protobuf types +var TM2PB = tm2pb{} + +type tm2pb struct{} + +func (tm2pb) Header(header *Header) *types.Header { + return &types.Header{ + ChainId: header.ChainID, + Height: uint64(header.Height), + Time: uint64(header.Time.Unix()), + NumTxs: uint64(header.NumTxs), + LastBlockHash: header.LastBlockHash, + LastBlockParts: TM2PB.PartSetHeader(header.LastBlockParts), + LastCommitHash: header.LastCommitHash, + DataHash: header.DataHash, + } +} + +func (tm2pb) PartSetHeader(partSetHeader PartSetHeader) *types.PartSetHeader { + return &types.PartSetHeader{ + Total: uint64(partSetHeader.Total), + Hash: partSetHeader.Hash, + } +} + +func (tm2pb) Validator(val *Validator) *types.Validator { + return &types.Validator{ + PubKey: val.PubKey.Bytes(), + Power: uint64(val.VotingPower), + } +} From a0e4253edc22190a8df2145410f08a502b6dd265 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 23 Aug 2016 21:44:07 -0400 Subject: [PATCH 02/13] handshake --- node/node.go | 10 ++-- proxy/app_conn.go | 4 ++ proxy/multi_app_conn.go | 110 ++++++++++++++++++++++++++++++++++++++-- proxy/state.go | 10 +++- state/execution.go | 66 +++++++++++++++++++++--- state/state.go | 22 +++++++- 6 files changed, 206 insertions(+), 16 deletions(-) diff --git a/node/node.go b/node/node.go index 45e7ae001..b34d0e037 100644 --- a/node/node.go +++ b/node/node.go @@ -62,8 +62,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato // Get State state := getState(config, stateDB) - // Create the proxyApp, which houses three connections: - // query, consensus, and mempool + // Create the proxyApp, which manages connections (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) if _, err := proxyApp.Start(); err != nil { Exit(Fmt("Error starting proxy app connections: %v", err)) @@ -391,9 +390,12 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - // Create two proxyAppConn connections, - // one for the consensus and one for the mempool. + // Create proxyAppConn connection (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) + _, err := proxyApp.Start() + if err != nil { + Exit(Fmt("Error starting proxy app conns: %v", err)) + } // add the chainid to the global config config.Set("chain_id", state.ChainID) diff --git a/proxy/app_conn.go b/proxy/app_conn.go index e19e6f09f..382bb3b83 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -56,15 +56,19 @@ func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus { func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) { app.appConn.SetResponseCallback(cb) } + func (app *appConnConsensus) Error() error { return app.appConn.Error() } + func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { return app.appConn.InitChainSync(validators) } + func (app *appConnConsensus) BeginBlockSync(header *types.Header) (err error) { return app.appConn.BeginBlockSync(header) } + func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { return app.appConn.AppendTxAsync(tx) } diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 8e4c84aa2..cfd827853 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -1,10 +1,20 @@ package proxy import ( + "bytes" + "fmt" + "sync" + . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" + "github.com/tendermint/tendermint/types" // ... + tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" ) +//----------------------------- + // Tendermint's interface to the application consists of multiple connections type AppConns interface { Service @@ -19,7 +29,9 @@ func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, bl } // a multiAppConn is made of a few appConns (mempool, consensus, query) -// and manages their underlying tmsp clients, ensuring they reboot together +// and manages their underlying tmsp clients, including the handshake +// which ensures the app and tendermint are synced. +// TODO: on app restart, clients must reboot together type multiAppConn struct { BaseService @@ -57,6 +69,7 @@ func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } +// Returns the query Connection func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } @@ -85,11 +98,102 @@ func (app *multiAppConn) OnStart() error { } app.consensusConn = NewAppConnConsensus(concli) - // TODO: handshake + // ensure app is synced to the latest state + return app.Handshake() +} - // TODO: replay blocks +// TODO: retry the handshake once if it fails the first time +func (app *multiAppConn) Handshake() error { + // handshake is done on the query conn + res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync() + if res.IsErr() { + return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log) + } + + if blockInfo == nil { + log.Warn("blockInfo is nil, aborting handshake") + return nil + } + + log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) + + // TODO: check overflow or change pb to int32 + blockHeight := int(blockInfo.BlockHeight) + blockHash := blockInfo.BlockHash + appHash := blockInfo.AppHash + + if tmspInfo != nil { + // TODO: check tmsp version (or do this in the tmspcli?) + _ = tmspInfo + } + + // of the last block (nil if we starting from 0) + var header *types.Header + var partsHeader types.PartSetHeader + + // check block + // if the blockHeight == 0, we will replay everything + if blockHeight != 0 { + blockMeta := app.blockStore.LoadBlockMeta(blockHeight) + if blockMeta == nil { + return fmt.Errorf("Handshake error. Could not find block #%d", blockHeight) + } + + // check block hash + if !bytes.Equal(blockMeta.Hash, blockHash) { + return fmt.Errorf("Handshake error. Block hash at height %d does not match. Got %X, expected %X", blockHeight, blockHash, blockMeta.Hash) + } + + // check app hash + if !bytes.Equal(blockMeta.Header.AppHash, appHash) { + return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, blockMeta.Header.AppHash) + } + + header = blockMeta.Header + partsHeader = blockMeta.PartsHeader + } + + if configInfo != nil { + // TODO: set config info + _ = configInfo + } + + // replay blocks up to the latest in the blockstore + err := app.state.ReplayBlocks(header, partsHeader, app.consensusConn, app.blockStore) + if err != nil { + return fmt.Errorf("Error on replay: %v", err) + } // TODO: (on restart) replay mempool return nil } + +//-------------------------------- + +// Get a connected tmsp client +func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { + var client tmspcli.Client + + // use local app (for testing) + // TODO: local proxy app conn + switch addr { + case "nilapp": + app := nilapp.NewNilApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + case "dummy": + app := dummy.NewDummyApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + default: + // Run forever in a loop + mustConnect := false + remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) + } + client = remoteApp + } + return client, nil +} diff --git a/proxy/state.go b/proxy/state.go index a52dd0f8a..8091d49b7 100644 --- a/proxy/state.go +++ b/proxy/state.go @@ -1,9 +1,15 @@ package proxy +import ( + "github.com/tendermint/tendermint/types" +) + type State interface { - // TODO + ReplayBlocks(*types.Header, types.PartSetHeader, AppConnConsensus, BlockStore) error } type BlockStore interface { - // TODO + Height() int + LoadBlockMeta(height int) *types.BlockMeta + LoadBlock(height int) *types.Block } diff --git a/state/execution.go b/state/execution.go index e5653e7e7..f5dcadae9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -41,12 +41,9 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC } // All good! + // Update validator accums and set state variables nextValSet.IncrementAccum(1) - s.LastBlockHeight = block.Height - s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} - s.LastBlockTime = block.Time - s.Validators = nextValSet - s.LastValidators = valSet + s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) return nil } @@ -89,7 +86,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox proxyAppConn.SetResponseCallback(proxyCb) // Begin block - err := proxyAppConn.BeginBlockSync(uint64(block.Height)) + err := proxyAppConn.BeginBlockSync(types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) return err @@ -181,3 +178,60 @@ type InvalidTxError struct { func (txErr InvalidTxError) Error() string { return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code) } + +//----------------------------------------------------------------------------- + +// Replay all blocks after blockHeight and ensure the result matches the current state +func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader, + appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error { + + // fresh state to work on + stateCopy := s.Copy() + + // reset to this height (do nothing if its 0) + var blockHeight int + if header != nil { + blockHeight = header.Height + // TODO: put validators in iavl tree so we can set the state with an older validator set + lastVals, nextVals := stateCopy.GetValidators() + stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) + } + + // replay all blocks starting with blockHeight+1 + for i := blockHeight + 1; i <= blockStore.Height(); i++ { + blockMeta := blockStore.LoadBlockMeta(i) + if blockMeta == nil { + PanicSanity(Fmt("Nil blockMeta at height %d when blockStore height is %d", i, blockStore.Height())) + } + + block := blockStore.LoadBlock(i) + if block == nil { + PanicSanity(Fmt("Nil block at height %d when blockStore height is %d", i, blockStore.Height())) + } + + // run the transactions + var eventCache events.Fireable // nil + err := stateCopy.ExecBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader) + if err != nil { + return fmt.Errorf("Error on ExecBlock: %v", err) + } + + // commit the block (app should save the state) + res := appConnConsensus.CommitSync() + if res.IsErr() { + return fmt.Errorf("Error on Commit: %v", res) + } + if res.Log != "" { + log.Debug("Commit.Log: " + res.Log) + } + + // update the state hash + stateCopy.AppHash = res.Data + } + + // The computed state and the previously set state should be identical + if !s.Equals(stateCopy) { + return fmt.Errorf("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", stateCopy, s) + } + return nil +} diff --git a/state/state.go b/state/state.go index 213484863..699f612e2 100644 --- a/state/state.go +++ b/state/state.go @@ -66,13 +66,33 @@ func (s *State) Copy() *State { func (s *State) Save() { s.mtx.Lock() defer s.mtx.Unlock() + s.db.Set(stateKey, s.Bytes()) +} +func (s *State) Equals(s2 *State) bool { + return bytes.Equal(s.Bytes(), s2.Bytes()) +} + +func (s *State) Bytes() []byte { buf, n, err := new(bytes.Buffer), new(int), new(error) wire.WriteBinary(s, buf, n, err) if *err != nil { PanicCrisis(*err) } - s.db.Set(stateKey, buf.Bytes()) + return buf.Bytes() +} + +// Mutate state variables to match block and validators +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { + s.LastBlockHeight = header.Height + s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} + s.LastBlockTime = header.Time + s.Validators = nextValSet + s.LastValidators = prevValSet +} + +func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { + return s.LastValidators, s.Validators } //----------------------------------------------------------------------------- From f37f56d4f1a71b7031b2914787b115f5f1ba9544 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 24 Aug 2016 01:44:20 -0400 Subject: [PATCH 03/13] fixes --- consensus/mempool_test.go | 4 ++-- proxy/app_conn_test.go | 6 +++--- types/protobuf.go | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index b2246ce54..5d36a7118 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -122,8 +122,8 @@ func NewCounterApplication() *CounterApplication { return &CounterApplication{} } -func (app *CounterApplication) Info() string { - return Fmt("txs:%v", app.txCount) +func (app *CounterApplication) Info() (string, *tmsp.TMSPInfo, *tmsp.LastBlockInfo, *tmsp.ConfigInfo) { + return Fmt("txs:%v", app.txCount), nil, nil, nil } func (app *CounterApplication) SetOption(key string, value string) (log string) { diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index 3b7a3413e..d9814ec3c 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -16,7 +16,7 @@ import ( type AppConnTest interface { EchoAsync(string) *tmspcli.ReqRes FlushSync() error - InfoSync() (res types.Result) + InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) } type appConnTest struct { @@ -35,7 +35,7 @@ func (app *appConnTest) FlushSync() error { return app.appConn.FlushSync() } -func (app *appConnTest) InfoSync() types.Result { +func (app *appConnTest) InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) { return app.appConn.InfoSync() } @@ -114,7 +114,7 @@ func TestInfo(t *testing.T) { proxy := NewAppConnTest(cli) t.Log("Connected") - res := proxy.InfoSync() + res, _, _, _ := proxy.InfoSync() if res.IsErr() { t.Errorf("Unexpected error: %v", err) } diff --git a/types/protobuf.go b/types/protobuf.go index c6d33cf25..8d2f9b819 100644 --- a/types/protobuf.go +++ b/types/protobuf.go @@ -19,6 +19,7 @@ func (tm2pb) Header(header *Header) *types.Header { LastBlockParts: TM2PB.PartSetHeader(header.LastBlockParts), LastCommitHash: header.LastCommitHash, DataHash: header.DataHash, + AppHash: header.AppHash, } } From d3ae920bd0c40be9d88f06ac5ef26d5b0711e624 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 25 Aug 2016 00:18:03 -0400 Subject: [PATCH 04/13] state: ApplyBlock --- consensus/race.test | 1 + consensus/state.go | 44 +------------- node/node.go | 13 +--- proxy/multi_app_conn.go | 13 ++-- proxy/multi_app_conn_test.go | 22 +++++++ state/execution.go | 112 +++++++++++++++++++++++++++-------- state/state.go | 12 ++++ 7 files changed, 133 insertions(+), 84 deletions(-) create mode 100644 consensus/race.test create mode 100644 proxy/multi_app_conn_test.go diff --git a/consensus/race.test b/consensus/race.test new file mode 100644 index 000000000..46231439d --- /dev/null +++ b/consensus/race.test @@ -0,0 +1 @@ +ok github.com/tendermint/tendermint/consensus 5.928s diff --git a/consensus/state.go b/consensus/state.go index e5580f192..d26a3ed25 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1265,21 +1265,9 @@ func (cs *ConsensusState) finalizeCommit(height int) { // event cache for txs eventCache := types.NewEventCache(cs.evsw) - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header()) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Exec failed for application: %v", err)) - } - - // lock mempool, commit state, update mempoool - err = cs.commitStateUpdateMempool(stateCopy, block) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for application: %v", err)) - } + // Execute and commit the block + // NOTE: All calls to the proxyAppConn should come here + stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) // txs committed, bad ones removed from mepool; fire events // NOTE: the block.AppHash wont reflect these txs until the next block @@ -1309,32 +1297,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { return } -// mempool must be locked during commit and update -// because state is typically reset on Commit and old txs must be replayed -// against committed state before new txs are run in the mempool, lest they be invalid -func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error { - cs.mempool.Lock() - defer cs.mempool.Unlock() - - // Commit block, get hash back - res := cs.proxyAppConn.CommitSync() - if res.IsErr() { - log.Warn("Error in proxyAppConn.CommitSync", "error", res) - return res - } - if res.Log != "" { - log.Debug("Commit.Log: " + res.Log) - } - - // Set the state's new AppHash - s.AppHash = res.Data - - // Update mempool. - cs.mempool.Update(block.Height, block.Txs) - - return nil -} - //----------------------------------------------------------------------------- func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { diff --git a/node/node.go b/node/node.go index b34d0e037..8ca6662a6 100644 --- a/node/node.go +++ b/node/node.go @@ -60,7 +60,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) // Get State - state := getState(config, stateDB) + state := sm.GetState(config, stateDB) // Create the proxyApp, which manages connections (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) @@ -295,17 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 return nodeInfo } -// Load the most recent state from "state" db, -// or create a new one (and save) from genesis. -func getState(config cfg.Config, stateDB dbm.DB) *sm.State { - state := sm.LoadState(stateDB) - if state == nil { - state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - state.Save() - } - return state -} - //------------------------------------------------------------------------------ // Users wishing to: diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index cfd827853..fc8d1df2b 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -28,6 +28,9 @@ func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, bl return NewMultiAppConn(config, clientCreator, state, blockStore) } +//----------------------------- +// multiAppConn implements AppConns + // a multiAppConn is made of a few appConns (mempool, consensus, query) // and manages their underlying tmsp clients, including the handshake // which ensures the app and tendermint are synced. @@ -103,8 +106,9 @@ func (app *multiAppConn) OnStart() error { } // TODO: retry the handshake once if it fails the first time +// ... let Info take an argument determining its behaviour func (app *multiAppConn) Handshake() error { - // handshake is done on the query conn + // handshake is done via info request on the query conn res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync() if res.IsErr() { return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log) @@ -127,12 +131,12 @@ func (app *multiAppConn) Handshake() error { _ = tmspInfo } - // of the last block (nil if we starting from 0) + // last block (nil if we starting from 0) var header *types.Header var partsHeader types.PartSetHeader - // check block - // if the blockHeight == 0, we will replay everything + // replay all blocks after blockHeight + // if blockHeight == 0, we will replay everything if blockHeight != 0 { blockMeta := app.blockStore.LoadBlockMeta(blockHeight) if blockMeta == nil { @@ -176,7 +180,6 @@ func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { var client tmspcli.Client // use local app (for testing) - // TODO: local proxy app conn switch addr { case "nilapp": app := nilapp.NewNilApplication() diff --git a/proxy/multi_app_conn_test.go b/proxy/multi_app_conn_test.go new file mode 100644 index 000000000..3ff2520f6 --- /dev/null +++ b/proxy/multi_app_conn_test.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "testing" + "time" + + "github.com/tendermint/go-p2p" + "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/node" + "github.com/tendermint/tendermint/types" +) + +func TestPersistence(t *testing.T) { + + // create persistent dummy app + // set state on dummy app + // proxy handshake + + config := tendermint_test.ResetConfig("proxy_test_") + multiApp := NewMultiAppConn(config, state, blockStore) + +} diff --git a/state/execution.go b/state/execution.go index f5dcadae9..f37ce9b42 100644 --- a/state/execution.go +++ b/state/execution.go @@ -181,7 +181,58 @@ func (txErr InvalidTxError) Error() string { //----------------------------------------------------------------------------- -// Replay all blocks after blockHeight and ensure the result matches the current state +// mempool must be locked during commit and update +// because state is typically reset on Commit and old txs must be replayed +// against committed state before new txs are run in the mempool, lest they be invalid +func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool Mempool) error { + mempool.Lock() + defer mempool.Unlock() + + // flush out any CheckTx that have already started + // cs.proxyAppConn.FlushSync() // ?! XXX + + // Commit block, get hash back + res := proxyAppConn.CommitSync() + if res.IsErr() { + log.Warn("Error in proxyAppConn.CommitSync", "error", res) + return res + } + if res.Log != "" { + log.Debug("Commit.Log: " + res.Log) + } + + // Set the state's new AppHash + s.AppHash = res.Data + + // Update mempool. + mempool.Update(block.Height, block.Txs) + + return nil +} + +// Execute and commit block against app, save block and state +func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, + block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) { + + // Run the block on the State: + // + update validator sets + // + run txs on the proxyAppConn + err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Exec failed for application: %v", err)) + } + + // lock mempool, commit state, update mempoool + err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Commit failed for application: %v", err)) + } +} + +// Replay all blocks after blockHeight and ensure the result matches the current state. +// XXX: blockStore must guarantee to have blocks for height <= blockStore.Height() func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader, appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error { @@ -197,36 +248,16 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) } + // run the transactions + var eventCache events.Fireable // nil + // replay all blocks starting with blockHeight+1 for i := blockHeight + 1; i <= blockStore.Height(); i++ { blockMeta := blockStore.LoadBlockMeta(i) - if blockMeta == nil { - PanicSanity(Fmt("Nil blockMeta at height %d when blockStore height is %d", i, blockStore.Height())) - } - block := blockStore.LoadBlock(i) - if block == nil { - PanicSanity(Fmt("Nil block at height %d when blockStore height is %d", i, blockStore.Height())) - } + panicOnNilBlock(i, blockStore.Height(), block, blockMeta) // XXX - // run the transactions - var eventCache events.Fireable // nil - err := stateCopy.ExecBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader) - if err != nil { - return fmt.Errorf("Error on ExecBlock: %v", err) - } - - // commit the block (app should save the state) - res := appConnConsensus.CommitSync() - if res.IsErr() { - return fmt.Errorf("Error on Commit: %v", res) - } - if res.Log != "" { - log.Debug("Commit.Log: " + res.Log) - } - - // update the state hash - stateCopy.AppHash = res.Data + stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) } // The computed state and the previously set state should be identical @@ -235,3 +266,32 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead } return nil } + +func panicOnNilBlock(height, bsHeight int, block *types.Block, blockMeta *types.BlockMeta) { + if block == nil || blockMeta == nil { + // Sanity? + PanicCrisis(Fmt(` +block/blockMeta is nil for height <= blockStore.Height() (%d <= %d). +Block: %v, +BlockMeta: %v +`, height, bsHeight, block, blockMeta)) + + } +} + +//------------------------------------------------ +// Updates to the mempool need to be synchronized with committing a block +// so apps can reset their transient state on Commit + +type Mempool interface { + Lock() + Unlock() + Update(height int, txs []types.Tx) +} + +type mockMempool struct { +} + +func (m mockMempool) Lock() {} +func (m mockMempool) Unlock() {} +func (m mockMempool) Update(height int, txs []types.Tx) {} diff --git a/state/state.go b/state/state.go index 699f612e2..289c7a4d8 100644 --- a/state/state.go +++ b/state/state.go @@ -7,6 +7,7 @@ import ( "time" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -95,6 +96,17 @@ func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { return s.LastValidators, s.Validators } +// Load the most recent state from "state" db, +// or create a new one (and save) from genesis. +func GetState(config cfg.Config, stateDB dbm.DB) *State { + state := LoadState(stateDB) + if state == nil { + state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + state.Save() + } + return state +} + //----------------------------------------------------------------------------- // Genesis From 138de19e1e6c7204f7fd7e835ef67f4c40f69ec4 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 25 Aug 2016 01:39:03 -0400 Subject: [PATCH 05/13] test: app persistence --- consensus/state.go | 3 +- state/execution.go | 1 + test/persist/test.sh | 68 ++++++++++++++++++++++++++++++++++++++++++++ test/run_test.sh | 3 ++ types/events.go | 4 ++- 5 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 test/persist/test.sh diff --git a/consensus/state.go b/consensus/state.go index d26a3ed25..55f37df99 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1251,7 +1251,8 @@ func (cs *ConsensusState) finalizeCommit(height int) { PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) } - log.Notice(Fmt("Finalizing commit of block with %d txs", block.NumTxs), "height", block.Height, "hash", block.Hash()) + log.Notice(Fmt("Finalizing commit of block with %d txs", block.NumTxs), + "height", block.Height, "hash", block.Hash(), "root", block.AppHash) log.Info(Fmt("%v", block)) // Fire off event for new block. diff --git a/state/execution.go b/state/execution.go index f37ce9b42..37df55dc4 100644 --- a/state/execution.go +++ b/state/execution.go @@ -246,6 +246,7 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead // TODO: put validators in iavl tree so we can set the state with an older validator set lastVals, nextVals := stateCopy.GetValidators() stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) + stateCopy.AppHash = header.AppHash } // run the transactions diff --git a/test/persist/test.sh b/test/persist/test.sh new file mode 100644 index 000000000..c51fa7d0a --- /dev/null +++ b/test/persist/test.sh @@ -0,0 +1,68 @@ +#! /bin/bash + + +export TMROOT=$HOME/.tendermint_persist + +rm -rf $TMROOT +tendermint init + +function start_procs(){ + name=$1 + echo "Starting persistent dummy and tendermint" + dummy --persist $TMROOT/dummy &> "dummy_${name}.log" & + PID_DUMMY=$! + tendermint node &> tendermint_${name}.log & + PID_TENDERMINT=$! + sleep 5 +} + +function kill_procs(){ + kill -9 $PID_DUMMY $PID_TENDERMINT +} + + +function send_txs(){ + # send a bunch of txs over a few blocks + echo "Sending txs" +# for i in `seq 1 5`; do +# for j in `seq 1 100`; do + tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'` + curl -s 127.0.0.1:46657/broadcast_tx_async?tx=\"$tx\" &> /dev/null +# done + sleep 1 +# done +} + + +start_procs 1 +send_txs +kill_procs +start_procs 2 + +# wait for node to handshake and make a new block +addr="localhost:46657" +curl -s $addr/status > /dev/null +ERR=$? +i=0 +while [ "$ERR" != 0 ]; do + sleep 1 + curl -s $addr/status > /dev/null + ERR=$? + i=$(($i + 1)) + if [[ $i == 10 ]]; then + echo "Timed out waiting for tendermint to start" + exit 1 + fi +done + +# wait for a new block +h1=`curl -s $addr/status | jq .result[1].latest_block_height` +h2=$h1 +while [ "$h2" == "$h1" ]; do + sleep 1 + h2=`curl -s $addr/status | jq .result[1].latest_block_height` +done + +kill_procs + +echo "Passed Test: Persistence" diff --git a/test/run_test.sh b/test/run_test.sh index 9ef4388f2..6f625798c 100644 --- a/test/run_test.sh +++ b/test/run_test.sh @@ -11,6 +11,9 @@ bash test/test_cover.sh # run the app tests bash test/app/test.sh +# run the persistence test +bash test/persist.test.sh + if [[ "$BRANCH" == "master" || $(echo "$BRANCH" | grep "release-") != "" ]]; then echo "" echo "* branch $BRANCH; testing libs" diff --git a/types/events.go b/types/events.go index fc7e0ac6b..c6eb7611a 100644 --- a/types/events.go +++ b/types/events.go @@ -130,7 +130,9 @@ func NewEventCache(evsw EventSwitch) EventCache { // All events should be based on this FireEvent to ensure they are TMEventData func fireEvent(fireable events.Fireable, event string, data TMEventData) { - fireable.FireEvent(event, data) + if fireable != nil { + fireable.FireEvent(event, data) + } } func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) { From fb9735ef4634394520c617149b659180ba6380cf Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 10 Sep 2016 23:35:18 -0400 Subject: [PATCH 06/13] rebase fixes and BeginBlock(hash,header) --- consensus/state.go | 2 ++ proxy/app_conn.go | 6 +++--- proxy/multi_app_conn.go | 41 +++++-------------------------------- proxy/state.go | 2 +- rpc/core/tmsp.go | 4 ++-- rpc/core/types/responses.go | 5 ++++- state/execution.go | 11 +++++++--- test/run_test.sh | 2 +- 8 files changed, 26 insertions(+), 47 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 55f37df99..e7a06efde 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1278,10 +1278,12 @@ func (cs *ConsensusState) finalizeCommit(height int) { if cs.blockStore.Height() < block.Height { precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() + log.Notice("save block", "height", block.Height) cs.blockStore.SaveBlock(block, blockParts, seenCommit) } // Save the state. + log.Notice("save state", "height", stateCopy.LastBlockHeight, "hash", stateCopy.AppHash) stateCopy.Save() // NewHeightStep! diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 382bb3b83..c2f383d3a 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -14,7 +14,7 @@ type AppConnConsensus interface { InitChainSync(validators []*types.Validator) (err error) - BeginBlockSync(header *types.Header) (err error) + BeginBlockSync(hash []byte, header *types.Header) (err error) AppendTxAsync(tx []byte) *tmspcli.ReqRes EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) CommitSync() (res types.Result) @@ -65,8 +65,8 @@ func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err e return app.appConn.InitChainSync(validators) } -func (app *appConnConsensus) BeginBlockSync(header *types.Header) (err error) { - return app.appConn.BeginBlockSync(header) +func (app *appConnConsensus) BeginBlockSync(hash []byte, header *types.Header) (err error) { + return app.appConn.BeginBlockSync(hash, header) } func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index fc8d1df2b..906486539 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -3,14 +3,10 @@ package proxy import ( "bytes" "fmt" - "sync" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/tendermint/types" // ... - tmspcli "github.com/tendermint/tmsp/client" - "github.com/tendermint/tmsp/example/dummy" - nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tendermint/types" ) //----------------------------- @@ -148,10 +144,11 @@ func (app *multiAppConn) Handshake() error { return fmt.Errorf("Handshake error. Block hash at height %d does not match. Got %X, expected %X", blockHeight, blockHash, blockMeta.Hash) } + // NOTE: app hash should be in the next block ... // check app hash - if !bytes.Equal(blockMeta.Header.AppHash, appHash) { + /*if !bytes.Equal(blockMeta.Header.AppHash, appHash) { return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, blockMeta.Header.AppHash) - } + }*/ header = blockMeta.Header partsHeader = blockMeta.PartsHeader @@ -163,7 +160,7 @@ func (app *multiAppConn) Handshake() error { } // replay blocks up to the latest in the blockstore - err := app.state.ReplayBlocks(header, partsHeader, app.consensusConn, app.blockStore) + err := app.state.ReplayBlocks(appHash, header, partsHeader, app.consensusConn, app.blockStore) if err != nil { return fmt.Errorf("Error on replay: %v", err) } @@ -172,31 +169,3 @@ func (app *multiAppConn) Handshake() error { return nil } - -//-------------------------------- - -// Get a connected tmsp client -func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { - var client tmspcli.Client - - // use local app (for testing) - switch addr { - case "nilapp": - app := nilapp.NewNilApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) - case "dummy": - app := dummy.NewDummyApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) - default: - // Run forever in a loop - mustConnect := false - remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) - if err != nil { - return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) - } - client = remoteApp - } - return client, nil -} diff --git a/proxy/state.go b/proxy/state.go index 8091d49b7..2881fd0c4 100644 --- a/proxy/state.go +++ b/proxy/state.go @@ -5,7 +5,7 @@ import ( ) type State interface { - ReplayBlocks(*types.Header, types.PartSetHeader, AppConnConsensus, BlockStore) error + ReplayBlocks([]byte, *types.Header, types.PartSetHeader, AppConnConsensus, BlockStore) error } type BlockStore interface { diff --git a/rpc/core/tmsp.go b/rpc/core/tmsp.go index 9a19e6eeb..cecd71dbb 100644 --- a/rpc/core/tmsp.go +++ b/rpc/core/tmsp.go @@ -12,6 +12,6 @@ func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) { } func TMSPInfo() (*ctypes.ResultTMSPInfo, error) { - res := proxyAppQuery.InfoSync() - return &ctypes.ResultTMSPInfo{res}, nil + res, tmspInfo, lastBlockInfo, configInfo := proxyAppQuery.InfoSync() + return &ctypes.ResultTMSPInfo{res, tmspInfo, lastBlockInfo, configInfo}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index cd68addd5..f5f6bae02 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -69,7 +69,10 @@ type ResultUnconfirmedTxs struct { } type ResultTMSPInfo struct { - Result tmsp.Result `json:"result"` + Result tmsp.Result `json:"result"` + TMSPInfo *tmsp.TMSPInfo `json:"tmsp_info"` + LastBlockInfo *tmsp.LastBlockInfo `json:"last_block_info"` + ConfigInfo *tmsp.ConfigInfo `json:"config_info"` } type ResultTMSPQuery struct { diff --git a/state/execution.go b/state/execution.go index 37df55dc4..afee2753c 100644 --- a/state/execution.go +++ b/state/execution.go @@ -86,7 +86,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox proxyAppConn.SetResponseCallback(proxyCb) // Begin block - err := proxyAppConn.BeginBlockSync(types.TM2PB.Header(block.Header)) + err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) return err @@ -233,9 +233,14 @@ func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppCon // Replay all blocks after blockHeight and ensure the result matches the current state. // XXX: blockStore must guarantee to have blocks for height <= blockStore.Height() -func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader, +func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader, appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error { + // NOTE/TODO: tendermint may crash after the app commits + // but before it can save the new state root. + // it should save all eg. valset changes before calling Commit. + // then, if tm state is behind app state, the only thing missing can be app hash + // fresh state to work on stateCopy := s.Copy() @@ -246,7 +251,7 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead // TODO: put validators in iavl tree so we can set the state with an older validator set lastVals, nextVals := stateCopy.GetValidators() stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) - stateCopy.AppHash = header.AppHash + stateCopy.AppHash = appHash } // run the transactions diff --git a/test/run_test.sh b/test/run_test.sh index 6f625798c..ba4e1b0e4 100644 --- a/test/run_test.sh +++ b/test/run_test.sh @@ -12,7 +12,7 @@ bash test/test_cover.sh bash test/app/test.sh # run the persistence test -bash test/persist.test.sh +bash test/persist/test.sh if [[ "$BRANCH" == "master" || $(echo "$BRANCH" | grep "release-") != "" ]]; then echo "" From 8ec1839f5d15ea65bce77fda2790935841a5b3cf Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 11 Sep 2016 13:16:23 -0400 Subject: [PATCH 07/13] save block b4 apply; track stale apphash --- consensus/state.go | 39 +++++---- mempool/mempool.go | 3 +- state/execution.go | 211 ++++++++++++++++++++++++++------------------- state/state.go | 23 +++-- 4 files changed, 159 insertions(+), 117 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index e7a06efde..4e06dc0db 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1255,33 +1255,34 @@ func (cs *ConsensusState) finalizeCommit(height int) { "height", block.Height, "hash", block.Hash(), "root", block.AppHash) log.Info(Fmt("%v", block)) - // Fire off event for new block. - // TODO: Handle app failure. See #177 - types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) - types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) - - // Create a copy of the state for staging - stateCopy := cs.state.Copy() - - // event cache for txs - eventCache := types.NewEventCache(cs.evsw) - - // Execute and commit the block - // NOTE: All calls to the proxyAppConn should come here - stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) - - // txs committed, bad ones removed from mepool; fire events - // NOTE: the block.AppHash wont reflect these txs until the next block - eventCache.Flush() - // Save to blockStore. if cs.blockStore.Height() < block.Height { precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() log.Notice("save block", "height", block.Height) cs.blockStore.SaveBlock(block, blockParts, seenCommit) + } else { + log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height) } + // Create a copy of the state for staging + // and an event cache for txs + stateCopy := cs.state.Copy() + + // event cache for txs + eventCache := types.NewEventCache(cs.evsw) + + // Execute and commit the block, and update the mempool. + // All calls to the proxyAppConn should come here. + // NOTE: the block.AppHash wont reflect these txs until the next block + stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + + // Fire off event for new block. + // TODO: Handle app failure. See #177 + types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) + types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) + eventCache.Flush() + // Save the state. log.Notice("save state", "height", stateCopy.LastBlockHeight, "hash", stateCopy.AppHash) stateCopy.Save() diff --git a/mempool/mempool.go b/mempool/mempool.go index a5426991e..80841b171 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -282,8 +282,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int, txs []types.Tx) { - // mem.proxyMtx.Lock() - // defer mem.proxyMtx.Unlock() + // TODO: check err ? mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx // First, create a lookup map of txns in new txs. diff --git a/state/execution.go b/state/execution.go index afee2753c..4d570f0b5 100644 --- a/state/execution.go +++ b/state/execution.go @@ -2,7 +2,6 @@ package state import ( "errors" - "fmt" . "github.com/tendermint/go-common" "github.com/tendermint/tendermint/proxy" @@ -10,10 +9,13 @@ import ( tmsp "github.com/tendermint/tmsp/types" ) -// Validate block -func (s *State) ValidateBlock(block *types.Block) error { - return s.validateBlock(block) -} +//-------------------------------------------------- +// Execute the block + +type ( + ErrInvalidBlock error + ErrProxyAppConn error +) // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. @@ -22,7 +24,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC // Validate the block. err := s.validateBlock(block) if err != nil { - return err + return ErrInvalidBlock(err) } // Update the validator set @@ -37,7 +39,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. - return err + return ErrProxyAppConn(err) } // All good! @@ -45,6 +47,10 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC nextValSet.IncrementAccum(1) s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) + // save state with updated height/blockhash/validators + // but stale apphash, in case we fail between Commit and Save + s.Save() + return nil } @@ -113,33 +119,6 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox return nil } -func (s *State) validateBlock(block *types.Block) error { - // Basic block validation. - err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) - if err != nil { - return err - } - - // Validate block LastCommit. - if block.Height == 1 { - if len(block.LastCommit.Precommits) != 0 { - return errors.New("Block at height 1 (first block) should have no LastCommit precommits") - } - } else { - if len(block.LastCommit.Precommits) != s.LastValidators.Size() { - return fmt.Errorf("Invalid block commit size. Expected %v, got %v", - s.LastValidators.Size(), len(block.LastCommit.Precommits)) - } - err := s.LastValidators.VerifyCommit( - s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) - if err != nil { - return err - } - } - - return nil -} - // Updates the LastCommitHeight of the validators in valSet, in place. // Assumes that lastValSet matches the valset of block.LastCommit // CONTRACT: lastValSet is not mutated. @@ -168,18 +147,62 @@ func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.Val } -//----------------------------------------------------------------------------- +//----------------------------------------------------- +// Validate block -type InvalidTxError struct { - Tx types.Tx - Code tmsp.CodeType +func (s *State) ValidateBlock(block *types.Block) error { + return s.validateBlock(block) } -func (txErr InvalidTxError) Error() string { - return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code) +func (s *State) validateBlock(block *types.Block) error { + // Basic block validation. + err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) + if err != nil { + return err + } + + // Validate block LastCommit. + if block.Height == 1 { + if len(block.LastCommit.Precommits) != 0 { + return errors.New("Block at height 1 (first block) should have no LastCommit precommits") + } + } else { + if len(block.LastCommit.Precommits) != s.LastValidators.Size() { + return errors.New(Fmt("Invalid block commit size. Expected %v, got %v", + s.LastValidators.Size(), len(block.LastCommit.Precommits))) + } + err := s.LastValidators.VerifyCommit( + s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) + if err != nil { + return err + } + } + + return nil } //----------------------------------------------------------------------------- +// ApplyBlock executes the block, then commits and updates the mempool atomically + +// Execute and commit block against app, save block and state +func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, + block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error { + + // Run the block on the State: + // + update validator sets + // + run txs on the proxyAppConn + err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + if err != nil { + return errors.New(Fmt("Exec failed for application: %v", err)) + } + + // lock mempool, commit state, update mempoool + err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) + if err != nil { + return errors.New(Fmt("Commit failed for application: %v", err)) + } + return nil +} // mempool must be locked during commit and update // because state is typically reset on Commit and old txs must be replayed @@ -188,9 +211,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl mempool.Lock() defer mempool.Unlock() - // flush out any CheckTx that have already started - // cs.proxyAppConn.FlushSync() // ?! XXX - // Commit block, get hash back res := proxyAppConn.CommitSync() if res.IsErr() { @@ -210,25 +230,40 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return nil } -// Execute and commit block against app, save block and state -func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, - block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) { +// Updates to the mempool need to be synchronized with committing a block +// so apps can reset their transient state on Commit +type Mempool interface { + Lock() + Unlock() + Update(height int, txs []types.Tx) +} - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Exec failed for application: %v", err)) - } +type mockMempool struct { +} - // lock mempool, commit state, update mempoool - err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for application: %v", err)) - } +func (m mockMempool) Lock() {} +func (m mockMempool) Unlock() {} +func (m mockMempool) Update(height int, txs []types.Tx) {} + +//---------------------------------------------------------------- +// Replay blocks to sync app to latest state of core + +type ErrAppBlockHeightTooHigh struct { + coreHeight int + appHeight int +} + +func (e ErrAppBlockHeightTooHigh) Error() string { + return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) +} + +type ErrStateMismatch struct { + got *State + expected *State +} + +func (e ErrStateMismatch) Error() string { + return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected) } // Replay all blocks after blockHeight and ensure the result matches the current state. @@ -241,34 +276,45 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t // it should save all eg. valset changes before calling Commit. // then, if tm state is behind app state, the only thing missing can be app hash - // fresh state to work on + // get a fresh state and reset to the apps latest stateCopy := s.Copy() - - // reset to this height (do nothing if its 0) - var blockHeight int if header != nil { - blockHeight = header.Height // TODO: put validators in iavl tree so we can set the state with an older validator set lastVals, nextVals := stateCopy.GetValidators() stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) + stateCopy.Stale = false stateCopy.AppHash = appHash } - // run the transactions - var eventCache events.Fireable // nil + appBlockHeight := stateCopy.LastBlockHeight + coreBlockHeight := blockStore.Height() + if coreBlockHeight < appBlockHeight { + return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight} - // replay all blocks starting with blockHeight+1 - for i := blockHeight + 1; i <= blockStore.Height(); i++ { - blockMeta := blockStore.LoadBlockMeta(i) - block := blockStore.LoadBlock(i) - panicOnNilBlock(i, blockStore.Height(), block, blockMeta) // XXX + } else if coreBlockHeight == appBlockHeight { + // if we crashed between Commit and SaveState, + // the state's app hash is stale + if s.Stale { + s.Stale = false + s.AppHash = appHash + } - stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) + } else { + // the app is behind. + // replay all blocks starting with appBlockHeight+1 + for i := appBlockHeight + 1; i <= coreBlockHeight; i++ { + blockMeta := blockStore.LoadBlockMeta(i) + block := blockStore.LoadBlock(i) + panicOnNilBlock(i, coreBlockHeight, block, blockMeta) // XXX + + var eventCache events.Fireable // nil + stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) + } } // The computed state and the previously set state should be identical if !s.Equals(stateCopy) { - return fmt.Errorf("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", stateCopy, s) + return ErrStateMismatch{stateCopy, s} } return nil } @@ -284,20 +330,3 @@ BlockMeta: %v } } - -//------------------------------------------------ -// Updates to the mempool need to be synchronized with committing a block -// so apps can reset their transient state on Commit - -type Mempool interface { - Lock() - Unlock() - Update(height int, txs []types.Tx) -} - -type mockMempool struct { -} - -func (m mockMempool) Lock() {} -func (m mockMempool) Unlock() {} -func (m mockMempool) Update(height int, txs []types.Tx) {} diff --git a/state/state.go b/state/state.go index 289c7a4d8..e1c6f88a5 100644 --- a/state/state.go +++ b/state/state.go @@ -21,16 +21,25 @@ var ( // NOTE: not goroutine-safe. type State struct { - mtx sync.Mutex - db dbm.DB - GenesisDoc *types.GenesisDoc - ChainID string + // mtx for writing to db + mtx sync.Mutex + db dbm.DB + + // should not change + GenesisDoc *types.GenesisDoc + ChainID string + + // updated at end of ExecBlock LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockID types.BlockID LastBlockTime time.Time Validators *types.ValidatorSet LastValidators *types.ValidatorSet - AppHash []byte + + // AppHash is updated after Commit; + // it's stale after ExecBlock and before Commit + Stale bool + AppHash []byte } func LoadState(db dbm.DB) *State { @@ -60,6 +69,7 @@ func (s *State) Copy() *State { LastBlockTime: s.LastBlockTime, Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), + Stale: s.Stale, // but really state shouldnt be copied while its stale AppHash: s.AppHash, } } @@ -84,12 +94,15 @@ func (s *State) Bytes() []byte { } // Mutate state variables to match block and validators +// Since we don't have the AppHash yet, it becomes stale func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { s.LastBlockHeight = header.Height s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} s.LastBlockTime = header.Time s.Validators = nextValSet s.LastValidators = prevValSet + + s.Stale = true } func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { From 3f90fcae48c6642aff2ec910b653e8867c5d2d25 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 11 Sep 2016 15:32:33 -0400 Subject: [PATCH 08/13] fail tests and fix --- consensus/state.go | 12 ++++- state/execution.go | 66 +++++++++++++++++++++++---- test/persist/test.sh | 8 ++-- test/persist/test2.sh | 104 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 14 deletions(-) create mode 100644 test/persist/test2.sh diff --git a/consensus/state.go b/consensus/state.go index 4e06dc0db..f0fa3054a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/ebuchman/fail-test" + . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-wire" @@ -1255,16 +1257,19 @@ func (cs *ConsensusState) finalizeCommit(height int) { "height", block.Height, "hash", block.Hash(), "root", block.AppHash) log.Info(Fmt("%v", block)) + fail.Fail() // XXX + // Save to blockStore. if cs.blockStore.Height() < block.Height { precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() - log.Notice("save block", "height", block.Height) cs.blockStore.SaveBlock(block, blockParts, seenCommit) } else { log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height) } + fail.Fail() // XXX + // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() @@ -1277,6 +1282,8 @@ func (cs *ConsensusState) finalizeCommit(height int) { // NOTE: the block.AppHash wont reflect these txs until the next block stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + fail.Fail() // XXX + // Fire off event for new block. // TODO: Handle app failure. See #177 types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) @@ -1284,9 +1291,10 @@ func (cs *ConsensusState) finalizeCommit(height int) { eventCache.Flush() // Save the state. - log.Notice("save state", "height", stateCopy.LastBlockHeight, "hash", stateCopy.AppHash) stateCopy.Save() + fail.Fail() // XXX + // NewHeightStep! cs.updateToState(stateCopy) diff --git a/state/execution.go b/state/execution.go index 4d570f0b5..3208e067c 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,8 +1,11 @@ package state import ( + "bytes" "errors" + "github.com/ebuchman/fail-test" + . "github.com/tendermint/go-common" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" @@ -98,20 +101,28 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox return err } + fail.Fail() // XXX + // Run txs of block for _, tx := range block.Txs { + fail.FailRand(len(block.Txs)) // XXX proxyAppConn.AppendTxAsync(tx) if err := proxyAppConn.Error(); err != nil { return err } } + fail.Fail() // XXX + // End block changedValidators, err := proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) return err } + + fail.Fail() // XXX + // TODO: Do something with changedValidators log.Debug("TODO: Do something with changedValidators", "changedValidators", changedValidators) @@ -248,6 +259,8 @@ func (m mockMempool) Update(height int, txs []types.Tx) {} //---------------------------------------------------------------- // Replay blocks to sync app to latest state of core +type ErrReplay error + type ErrAppBlockHeightTooHigh struct { coreHeight int appHeight int @@ -257,6 +270,16 @@ func (e ErrAppBlockHeightTooHigh) Error() string { return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) } +type ErrLastStateMismatch struct { + height int + core []byte + app []byte +} + +func (e ErrLastStateMismatch) Error() string { + return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app) +} + type ErrStateMismatch struct { got *State expected *State @@ -289,29 +312,47 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t appBlockHeight := stateCopy.LastBlockHeight coreBlockHeight := blockStore.Height() if coreBlockHeight < appBlockHeight { + // if the app is ahead, there's nothing we can do return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight} } else if coreBlockHeight == appBlockHeight { // if we crashed between Commit and SaveState, - // the state's app hash is stale + // the state's app hash is stale. + // otherwise we're synced if s.Stale { s.Stale = false s.AppHash = appHash } + return checkState(s, stateCopy) + + } else if s.LastBlockHeight == appBlockHeight { + // core is ahead of app but core's state height is at apps height + // this happens if we crashed after saving the block, + // but before committing it. We should be 1 ahead + if coreBlockHeight != appBlockHeight+1 { + PanicSanity(Fmt("core.state.height == app.height but core.height (%d) > app.height+1 (%d)", coreBlockHeight, appBlockHeight+1)) + } + + // check that the blocks last apphash is the states apphash + blockMeta := blockStore.LoadBlockMeta(coreBlockHeight) + if !bytes.Equal(blockMeta.Header.AppHash, appHash) { + return ErrLastStateMismatch{coreBlockHeight, blockMeta.Header.AppHash, appHash} + } + + // replay the block against the actual tendermint state (not the copy) + return loadApplyBlock(coreBlockHeight, s, blockStore, appConnConsensus) } else { - // the app is behind. + // either we're caught up or there's blocks to replay // replay all blocks starting with appBlockHeight+1 for i := appBlockHeight + 1; i <= coreBlockHeight; i++ { - blockMeta := blockStore.LoadBlockMeta(i) - block := blockStore.LoadBlock(i) - panicOnNilBlock(i, coreBlockHeight, block, blockMeta) // XXX - - var eventCache events.Fireable // nil - stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) + loadApplyBlock(i, stateCopy, blockStore, appConnConsensus) } + return checkState(s, stateCopy) } +} +func checkState(s, stateCopy *State) error { // The computed state and the previously set state should be identical if !s.Equals(stateCopy) { return ErrStateMismatch{stateCopy, s} @@ -319,6 +360,15 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t return nil } +func loadApplyBlock(blockIndex int, s *State, blockStore proxy.BlockStore, appConnConsensus proxy.AppConnConsensus) error { + blockMeta := blockStore.LoadBlockMeta(blockIndex) + block := blockStore.LoadBlock(blockIndex) + panicOnNilBlock(blockIndex, blockStore.Height(), block, blockMeta) // XXX + + var eventCache events.Fireable // nil + return s.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) +} + func panicOnNilBlock(height, bsHeight int, block *types.Block, blockMeta *types.BlockMeta) { if block == nil || blockMeta == nil { // Sanity? diff --git a/test/persist/test.sh b/test/persist/test.sh index c51fa7d0a..5c1e12411 100644 --- a/test/persist/test.sh +++ b/test/persist/test.sh @@ -24,13 +24,13 @@ function kill_procs(){ function send_txs(){ # send a bunch of txs over a few blocks echo "Sending txs" -# for i in `seq 1 5`; do -# for j in `seq 1 100`; do + for i in `seq 1 5`; do + for j in `seq 1 100`; do tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'` curl -s 127.0.0.1:46657/broadcast_tx_async?tx=\"$tx\" &> /dev/null -# done + done sleep 1 -# done + done } diff --git a/test/persist/test2.sh b/test/persist/test2.sh new file mode 100644 index 000000000..509deee79 --- /dev/null +++ b/test/persist/test2.sh @@ -0,0 +1,104 @@ +#! /bin/bash + + +export TMROOT=$HOME/.tendermint_persist + +rm -rf $TMROOT +tendermint init + +function start_procs(){ + name=$1 + indexToFail=$2 + echo "Starting persistent dummy and tendermint" + dummy --persist $TMROOT/dummy &> "dummy_${name}.log" & + PID_DUMMY=$! + if [[ "$indexToFail" == "" ]]; then + # run in background, dont fail + tendermint node &> tendermint_${name}.log & + PID_TENDERMINT=$! + else + # run in foreground, fail + FAIL_TEST_INDEX=$indexToFail tendermint node &> tendermint_${name}.log + PID_TENDERMINT=$! + fi +} + +function kill_procs(){ + kill -9 $PID_DUMMY $PID_TENDERMINT + wait $PID_DUMMY + wait $PID_TENDERMINT +} + + +# wait till node is up, send txs +function send_txs(){ + addr="127.0.0.1:46657" + curl -s $addr/status > /dev/null + ERR=$? + while [ "$ERR" != 0 ]; do + sleep 1 + curl -s $addr/status > /dev/null + ERR=$? + done + + # send a bunch of txs over a few blocks + echo "Node is up, sending txs" + for i in `seq 1 5`; do + for j in `seq 1 100`; do + tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'` + curl -s $addr/broadcast_tx_async?tx=\"$tx\" &> /dev/null + done + sleep 1 + done +} + + +failsStart=0 +fails=`grep -r "fail.Fail" --include \*.go . | wc -l` +failsEnd=$(($fails-1)) + +for failIndex in `seq $failsStart $failsEnd`; do + echo "" + echo "* Test FailIndex $failIndex" + # test failure at failIndex + + send_txs & + start_procs 1 $failIndex + + # tendermint should fail when it hits the fail index + kill -9 $PID_DUMMY + wait $PID_DUMMY + + start_procs 2 + + # wait for node to handshake and make a new block + addr="localhost:46657" + curl -s $addr/status > /dev/null + ERR=$? + i=0 + while [ "$ERR" != 0 ]; do + sleep 1 + curl -s $addr/status > /dev/null + ERR=$? + i=$(($i + 1)) + if [[ $i == 10 ]]; then + echo "Timed out waiting for tendermint to start" + exit 1 + fi + done + + # wait for a new block + h1=`curl -s $addr/status | jq .result[1].latest_block_height` + h2=$h1 + while [ "$h2" == "$h1" ]; do + sleep 1 + h2=`curl -s $addr/status | jq .result[1].latest_block_height` + done + + kill_procs + + echo "* Passed Test for FailIndex $failIndex" + echo "" +done + +echo "Passed Test: Persistence" From befd8b0cb26a6a4ff1007ca8ae5f0544dc648ddc Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 3 Nov 2016 20:38:09 -0400 Subject: [PATCH 09/13] post rebase fixes --- glide.lock | 2 +- node/node.go | 3 +-- state/execution.go | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/glide.lock b/glide.lock index b897e4f99..55f0b7716 100644 --- a/glide.lock +++ b/glide.lock @@ -92,7 +92,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: eece35eeebacee1ab94b8338e77e0d1c2d880ecc + version: fec038bdec3495a2a06c6aa8f63e9716bad335dd subpackages: - client - example/counter diff --git a/node/node.go b/node/node.go index 8ca6662a6..86a725dda 100644 --- a/node/node.go +++ b/node/node.go @@ -391,8 +391,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Make event switch eventSwitch := types.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { + if _, err := eventSwitch.Start(); err != nil { Exit(Fmt("Failed to start event switch: %v", err)) } diff --git a/state/execution.go b/state/execution.go index 3208e067c..b3b613578 100644 --- a/state/execution.go +++ b/state/execution.go @@ -196,7 +196,7 @@ func (s *State) validateBlock(block *types.Block) error { // ApplyBlock executes the block, then commits and updates the mempool atomically // Execute and commit block against app, save block and state -func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, +func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error { // Run the block on the State: @@ -365,7 +365,7 @@ func loadApplyBlock(blockIndex int, s *State, blockStore proxy.BlockStore, appCo block := blockStore.LoadBlock(blockIndex) panicOnNilBlock(blockIndex, blockStore.Height(), block, blockMeta) // XXX - var eventCache events.Fireable // nil + var eventCache types.Fireable // nil return s.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) } From 4360c360a48b4a5266e28e15b6da65e008688af7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 6 Nov 2016 01:48:39 +0000 Subject: [PATCH 10/13] move handshake to state, use Handshaker, more tests --- consensus/race.test | 1 - consensus/state.go | 23 +---- node/node.go | 4 +- proxy/multi_app_conn.go | 90 +++-------------- proxy/multi_app_conn_test.go | 22 ----- proxy/state.go | 15 --- state/errors.go | 55 +++++++++++ state/execution.go | 173 +++++++++++++++++++++----------- state/execution_test.go | 186 +++++++++++++++++++++++++++++++++++ state/state.go | 4 +- state/state_test.go | 42 ++++++++ types/block.go | 21 ++++ types/validator.go | 5 +- types/validator_set.go | 2 +- 14 files changed, 439 insertions(+), 204 deletions(-) delete mode 100644 consensus/race.test delete mode 100644 proxy/multi_app_conn_test.go delete mode 100644 proxy/state.go create mode 100644 state/errors.go create mode 100644 state/execution_test.go create mode 100644 state/state_test.go diff --git a/consensus/race.test b/consensus/race.test deleted file mode 100644 index 46231439d..000000000 --- a/consensus/race.test +++ /dev/null @@ -1 +0,0 @@ -ok github.com/tendermint/tendermint/consensus 5.928s diff --git a/consensus/state.go b/consensus/state.go index f0fa3054a..0dc80810e 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -932,25 +932,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Mempool validated transactions txs := cs.mempool.Reap(cs.config.GetInt("block_size")) - block = &types.Block{ - Header: &types.Header{ - ChainID: cs.state.ChainID, - Height: cs.Height, - Time: time.Now(), - NumTxs: len(txs), - LastBlockID: cs.state.LastBlockID, - ValidatorsHash: cs.state.Validators.Hash(), - AppHash: cs.state.AppHash, // state merkle root of txs from the previous block. - }, - LastCommit: commit, - Data: &types.Data{ - Txs: txs, - }, - } - block.FillHeader() - blockParts = block.MakePartSet(cs.config.GetInt("block_part_size")) - - return block, blockParts + return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit, + cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash) } // Enter: `timeoutPropose` after entering Propose. @@ -1273,8 +1256,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() - - // event cache for txs eventCache := types.NewEventCache(cs.evsw) // Execute and commit the block, and update the mempool. diff --git a/node/node.go b/node/node.go index 86a725dda..8b8fe16d6 100644 --- a/node/node.go +++ b/node/node.go @@ -63,7 +63,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato state := sm.GetState(config, stateDB) // Create the proxyApp, which manages connections (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) + proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(state, blockStore)) if _, err := proxyApp.Start(); err != nil { Exit(Fmt("Error starting proxy app connections: %v", err)) } @@ -380,7 +380,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) // Create proxyAppConn connection (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(state, blockStore)) _, err := proxyApp.Start() if err != nil { Exit(Fmt("Error starting proxy app conns: %v", err)) diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 906486539..7095697d2 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -1,12 +1,8 @@ package proxy import ( - "bytes" - "fmt" - . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/tendermint/types" ) //----------------------------- @@ -20,13 +16,17 @@ type AppConns interface { Query() AppConnQuery } -func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns { - return NewMultiAppConn(config, clientCreator, state, blockStore) +func NewAppConns(config cfg.Config, clientCreator ClientCreator, handshaker Handshaker) AppConns { + return NewMultiAppConn(config, clientCreator, handshaker) } //----------------------------- // multiAppConn implements AppConns +type Handshaker interface { + Handshake(AppConns) error +} + // a multiAppConn is made of a few appConns (mempool, consensus, query) // and manages their underlying tmsp clients, including the handshake // which ensures the app and tendermint are synced. @@ -36,8 +36,7 @@ type multiAppConn struct { config cfg.Config - state State - blockStore BlockStore + handshaker Handshaker mempoolConn *appConnMempool consensusConn *appConnConsensus @@ -47,11 +46,10 @@ type multiAppConn struct { } // Make all necessary tmsp connections to the application -func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn { +func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, handshaker Handshaker) *multiAppConn { multiAppConn := &multiAppConn{ config: config, - state: state, - blockStore: blockStore, + handshaker: handshaker, clientCreator: clientCreator, } multiAppConn.BaseService = *NewBaseService(log, "multiAppConn", multiAppConn) @@ -98,74 +96,8 @@ func (app *multiAppConn) OnStart() error { app.consensusConn = NewAppConnConsensus(concli) // ensure app is synced to the latest state - return app.Handshake() -} - -// TODO: retry the handshake once if it fails the first time -// ... let Info take an argument determining its behaviour -func (app *multiAppConn) Handshake() error { - // handshake is done via info request on the query conn - res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync() - if res.IsErr() { - return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log) + if app.handshaker != nil { + return app.handshaker.Handshake(app) } - - if blockInfo == nil { - log.Warn("blockInfo is nil, aborting handshake") - return nil - } - - log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) - - // TODO: check overflow or change pb to int32 - blockHeight := int(blockInfo.BlockHeight) - blockHash := blockInfo.BlockHash - appHash := blockInfo.AppHash - - if tmspInfo != nil { - // TODO: check tmsp version (or do this in the tmspcli?) - _ = tmspInfo - } - - // last block (nil if we starting from 0) - var header *types.Header - var partsHeader types.PartSetHeader - - // replay all blocks after blockHeight - // if blockHeight == 0, we will replay everything - if blockHeight != 0 { - blockMeta := app.blockStore.LoadBlockMeta(blockHeight) - if blockMeta == nil { - return fmt.Errorf("Handshake error. Could not find block #%d", blockHeight) - } - - // check block hash - if !bytes.Equal(blockMeta.Hash, blockHash) { - return fmt.Errorf("Handshake error. Block hash at height %d does not match. Got %X, expected %X", blockHeight, blockHash, blockMeta.Hash) - } - - // NOTE: app hash should be in the next block ... - // check app hash - /*if !bytes.Equal(blockMeta.Header.AppHash, appHash) { - return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, blockMeta.Header.AppHash) - }*/ - - header = blockMeta.Header - partsHeader = blockMeta.PartsHeader - } - - if configInfo != nil { - // TODO: set config info - _ = configInfo - } - - // replay blocks up to the latest in the blockstore - err := app.state.ReplayBlocks(appHash, header, partsHeader, app.consensusConn, app.blockStore) - if err != nil { - return fmt.Errorf("Error on replay: %v", err) - } - - // TODO: (on restart) replay mempool - return nil } diff --git a/proxy/multi_app_conn_test.go b/proxy/multi_app_conn_test.go deleted file mode 100644 index 3ff2520f6..000000000 --- a/proxy/multi_app_conn_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package proxy - -import ( - "testing" - "time" - - "github.com/tendermint/go-p2p" - "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/node" - "github.com/tendermint/tendermint/types" -) - -func TestPersistence(t *testing.T) { - - // create persistent dummy app - // set state on dummy app - // proxy handshake - - config := tendermint_test.ResetConfig("proxy_test_") - multiApp := NewMultiAppConn(config, state, blockStore) - -} diff --git a/proxy/state.go b/proxy/state.go deleted file mode 100644 index 2881fd0c4..000000000 --- a/proxy/state.go +++ /dev/null @@ -1,15 +0,0 @@ -package proxy - -import ( - "github.com/tendermint/tendermint/types" -) - -type State interface { - ReplayBlocks([]byte, *types.Header, types.PartSetHeader, AppConnConsensus, BlockStore) error -} - -type BlockStore interface { - Height() int - LoadBlockMeta(height int) *types.BlockMeta - LoadBlock(height int) *types.Block -} diff --git a/state/errors.go b/state/errors.go new file mode 100644 index 000000000..0d0eae14c --- /dev/null +++ b/state/errors.go @@ -0,0 +1,55 @@ +package state + +import ( + . "github.com/tendermint/go-common" +) + +type ( + ErrInvalidBlock error + ErrProxyAppConn error + + ErrUnknownBlock struct { + height int + } + + ErrBlockHashMismatch struct { + coreHash []byte + appHash []byte + height int + } + + ErrAppBlockHeightTooHigh struct { + coreHeight int + appHeight int + } + + ErrLastStateMismatch struct { + height int + core []byte + app []byte + } + + ErrStateMismatch struct { + got *State + expected *State + } +) + +func (e ErrUnknownBlock) Error() string { + return Fmt("Could not find block #%d", e.height) +} + +func (e ErrBlockHashMismatch) Error() string { + return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.appHash, e.coreHash, e.height) +} + +func (e ErrAppBlockHeightTooHigh) Error() string { + return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) +} +func (e ErrLastStateMismatch) Error() string { + return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app) +} + +func (e ErrStateMismatch) Error() string { + return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected) +} diff --git a/state/execution.go b/state/execution.go index b3b613578..299abbbf3 100644 --- a/state/execution.go +++ b/state/execution.go @@ -15,11 +15,6 @@ import ( //-------------------------------------------------- // Execute the block -type ( - ErrInvalidBlock error - ErrProxyAppConn error -) - // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { @@ -257,42 +252,96 @@ func (m mockMempool) Unlock() {} func (m mockMempool) Update(height int, txs []types.Tx) {} //---------------------------------------------------------------- -// Replay blocks to sync app to latest state of core +// Handshake with app to sync to latest state of core by replaying blocks -type ErrReplay error - -type ErrAppBlockHeightTooHigh struct { - coreHeight int - appHeight int +// TODO: Should we move blockchain/store.go to its own package? +type BlockStore interface { + Height() int + LoadBlock(height int) *types.Block } -func (e ErrAppBlockHeightTooHigh) Error() string { - return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) +type Handshaker struct { + state *State + store BlockStore + + nBlocks int // number of blocks applied to the state } -type ErrLastStateMismatch struct { - height int - core []byte - app []byte +func NewHandshaker(state *State, store BlockStore) *Handshaker { + return &Handshaker{state, store, 0} } -func (e ErrLastStateMismatch) Error() string { - return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app) -} +// TODO: retry the handshake once if it fails the first time +// ... let Info take an argument determining its behaviour +func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { + // handshake is done via info request on the query conn + res, tmspInfo, blockInfo, configInfo := proxyApp.Query().InfoSync() + if res.IsErr() { + return errors.New(Fmt("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log)) + } -type ErrStateMismatch struct { - got *State - expected *State -} + if blockInfo == nil { + log.Warn("blockInfo is nil, aborting handshake") + return nil + } -func (e ErrStateMismatch) Error() string { - return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected) + log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) + + blockHeight := int(blockInfo.BlockHeight) // safe, should be an int32 + blockHash := blockInfo.BlockHash + appHash := blockInfo.AppHash + + if tmspInfo != nil { + // TODO: check tmsp version (or do this in the tmspcli?) + _ = tmspInfo + } + + // last block (nil if we starting from 0) + var header *types.Header + var partsHeader types.PartSetHeader + + // replay all blocks after blockHeight + // if blockHeight == 0, we will replay everything + if blockHeight != 0 { + block := h.store.LoadBlock(blockHeight) + if block == nil { + return ErrUnknownBlock{blockHeight} + } + + // check block hash + if !bytes.Equal(block.Hash(), blockHash) { + return ErrBlockHashMismatch{block.Hash(), blockHash, blockHeight} + } + + // NOTE: app hash should be in the next block ... + // check app hash + /*if !bytes.Equal(block.Header.AppHash, appHash) { + return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, block.Header.AppHash) + }*/ + + header = block.Header + partsHeader = block.MakePartSet().Header() + } + + if configInfo != nil { + // TODO: set config info + _ = configInfo + } + + // replay blocks up to the latest in the blockstore + err := h.ReplayBlocks(appHash, header, partsHeader, proxyApp.Consensus()) + if err != nil { + return errors.New(Fmt("Error on replay: %v", err)) + } + + // TODO: (on restart) replay mempool + + return nil } // Replay all blocks after blockHeight and ensure the result matches the current state. -// XXX: blockStore must guarantee to have blocks for height <= blockStore.Height() -func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader, - appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error { +func (h *Handshaker) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader, + appConnConsensus proxy.AppConnConsensus) error { // NOTE/TODO: tendermint may crash after the app commits // but before it can save the new state root. @@ -300,17 +349,25 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t // then, if tm state is behind app state, the only thing missing can be app hash // get a fresh state and reset to the apps latest - stateCopy := s.Copy() - if header != nil { - // TODO: put validators in iavl tree so we can set the state with an older validator set - lastVals, nextVals := stateCopy.GetValidators() + stateCopy := h.state.Copy() + + // TODO: put validators in iavl tree so we can set the state with an older validator set + lastVals, nextVals := stateCopy.GetValidators() + if header == nil { + stateCopy.LastBlockHeight = 0 + stateCopy.LastBlockHash = nil + stateCopy.LastBlockParts = types.PartSetHeader{} + // stateCopy.LastBlockTime = ... doesnt matter + stateCopy.Validators = nextVals + stateCopy.LastValidators = lastVals + } else { stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) - stateCopy.Stale = false - stateCopy.AppHash = appHash } + stateCopy.Stale = false + stateCopy.AppHash = appHash appBlockHeight := stateCopy.LastBlockHeight - coreBlockHeight := blockStore.Height() + coreBlockHeight := h.store.Height() if coreBlockHeight < appBlockHeight { // if the app is ahead, there's nothing we can do return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight} @@ -319,13 +376,13 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t // if we crashed between Commit and SaveState, // the state's app hash is stale. // otherwise we're synced - if s.Stale { - s.Stale = false - s.AppHash = appHash + if h.state.Stale { + h.state.Stale = false + h.state.AppHash = appHash } - return checkState(s, stateCopy) + return checkState(h.state, stateCopy) - } else if s.LastBlockHeight == appBlockHeight { + } else if h.state.LastBlockHeight == appBlockHeight { // core is ahead of app but core's state height is at apps height // this happens if we crashed after saving the block, // but before committing it. We should be 1 ahead @@ -334,21 +391,21 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t } // check that the blocks last apphash is the states apphash - blockMeta := blockStore.LoadBlockMeta(coreBlockHeight) - if !bytes.Equal(blockMeta.Header.AppHash, appHash) { - return ErrLastStateMismatch{coreBlockHeight, blockMeta.Header.AppHash, appHash} + block := h.store.LoadBlock(coreBlockHeight) + if !bytes.Equal(block.Header.AppHash, appHash) { + return ErrLastStateMismatch{coreBlockHeight, block.Header.AppHash, appHash} } // replay the block against the actual tendermint state (not the copy) - return loadApplyBlock(coreBlockHeight, s, blockStore, appConnConsensus) + return h.loadApplyBlock(coreBlockHeight, h.state, appConnConsensus) } else { // either we're caught up or there's blocks to replay // replay all blocks starting with appBlockHeight+1 for i := appBlockHeight + 1; i <= coreBlockHeight; i++ { - loadApplyBlock(i, stateCopy, blockStore, appConnConsensus) + h.loadApplyBlock(i, stateCopy, appConnConsensus) } - return checkState(s, stateCopy) + return checkState(h.state, stateCopy) } } @@ -360,23 +417,21 @@ func checkState(s, stateCopy *State) error { return nil } -func loadApplyBlock(blockIndex int, s *State, blockStore proxy.BlockStore, appConnConsensus proxy.AppConnConsensus) error { - blockMeta := blockStore.LoadBlockMeta(blockIndex) - block := blockStore.LoadBlock(blockIndex) - panicOnNilBlock(blockIndex, blockStore.Height(), block, blockMeta) // XXX - - var eventCache types.Fireable // nil - return s.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) +func (h *Handshaker) loadApplyBlock(blockIndex int, state *State, appConnConsensus proxy.AppConnConsensus) error { + h.nBlocks += 1 + block := h.store.LoadBlock(blockIndex) + panicOnNilBlock(blockIndex, h.store.Height(), block) // XXX + var eventCache types.Fireable // nil + return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet().Header(), mockMempool{}) } -func panicOnNilBlock(height, bsHeight int, block *types.Block, blockMeta *types.BlockMeta) { - if block == nil || blockMeta == nil { +func panicOnNilBlock(height, bsHeight int, block *types.Block) { + if block == nil { // Sanity? PanicCrisis(Fmt(` -block/blockMeta is nil for height <= blockStore.Height() (%d <= %d). +block is nil for height <= blockStore.Height() (%d <= %d). Block: %v, -BlockMeta: %v -`, height, bsHeight, block, blockMeta)) +`, height, bsHeight, block)) } } diff --git a/state/execution_test.go b/state/execution_test.go new file mode 100644 index 000000000..db724de88 --- /dev/null +++ b/state/execution_test.go @@ -0,0 +1,186 @@ +package state + +import ( + "bytes" + //"fmt" + "path" + "testing" + + "github.com/tendermint/tendermint/config/tendermint_test" + // . "github.com/tendermint/go-common" + "github.com/tendermint/go-crypto" + dbm "github.com/tendermint/go-db" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/dummy" +) + +var ( + privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("handshake_test")) + chainID = "handshake_chain" + nBlocks = 5 + mempool = mockMempool{} +) + +func TestExecBlock(t *testing.T) { + // TODO +} + +// Sync from scratch +func TestHandshakeReplayAll(t *testing.T) { + testHandshakeReplay(t, 0) +} + +// Sync many, not from scratch +func TestHandshakeReplaySome(t *testing.T) { + testHandshakeReplay(t, 1) +} + +// Sync from lagging by one +func TestHandshakeReplayOne(t *testing.T) { + testHandshakeReplay(t, nBlocks-1) +} + +// Sync from caught up +func TestHandshakeReplayNone(t *testing.T) { + testHandshakeReplay(t, nBlocks) +} + +// Make some blocks. Start a fresh app and apply n blocks. Then restart the app and sync it up with the remaining blocks +func testHandshakeReplay(t *testing.T, n int) { + config := tendermint_test.ResetConfig("proxy_test_") + + state, store := stateAndStore() + clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1"))) + clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))) + proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(state, store)) + if _, err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + chain := makeBlockchain(t, proxyApp, state) + store.chain = chain // + latestAppHash := state.AppHash + proxyApp.Stop() + + if n > 0 { + // start a new app without handshake, play n blocks + proxyApp = proxy.NewAppConns(config, clientCreator2, nil) + if _, err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + state2, _ := stateAndStore() + for i := 0; i < n; i++ { + block := chain[i] + err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet().Header(), mempool) + if err != nil { + t.Fatal(err) + } + } + proxyApp.Stop() + } + + // now start it with the handshake + handshaker := NewHandshaker(state, store) + proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker) + if _, err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + + // get the latest app hash from the app + r, _, blockInfo, _ := proxyApp.Query().InfoSync() + if r.IsErr() { + t.Fatal(r) + } + + // the app hash should be synced up + if !bytes.Equal(latestAppHash, blockInfo.AppHash) { + t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", blockInfo.AppHash, latestAppHash) + } + + if handshaker.nBlocks != nBlocks-n { + t.Fatalf("Expected handshake to sync %d blocks, got %d", nBlocks-n, handshaker.nBlocks) + } + +} + +//-------------------------- + +// make some bogus txs +func txsFunc(blockNum int) (txs []types.Tx) { + for i := 0; i < 10; i++ { + txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)})) + } + return txs +} + +// sign a commit vote +func signCommit(height, round int, hash []byte, header types.PartSetHeader) *types.Vote { + vote := &types.Vote{ + Height: height, + Round: round, + Type: types.VoteTypePrecommit, + BlockHash: hash, + BlockPartsHeader: header, + } + + sig := privKey.Sign(types.SignBytes(chainID, vote)) + vote.Signature = sig.(crypto.SignatureEd25519) + return vote +} + +// make a blockchain with one validator +func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockchain []*types.Block) { + + prevHash := state.LastBlockHash + lastCommit := new(types.Commit) + prevParts := types.PartSetHeader{} + valHash := state.Validators.Hash() + + for i := 1; i < nBlocks+1; i++ { + block, parts := types.MakeBlock(i, chainID, txsFunc(i), lastCommit, + prevParts, prevHash, valHash, state.AppHash) + err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet().Header(), mempool) + if err != nil { + t.Fatal(i, err) + } + + voteSet := types.NewVoteSet(chainID, i, 0, types.VoteTypePrecommit, state.Validators) + vote := signCommit(i, 0, block.Hash(), parts.Header()) + _, _, err = voteSet.AddByIndex(0, vote) + if err != nil { + t.Fatal(err) + } + + blockchain = append(blockchain, block) + prevHash = block.Hash() + prevParts = parts.Header() + lastCommit = voteSet.MakeCommit() + } + return blockchain +} + +// fresh state and mock store +func stateAndStore() (*State, *mockBlockStore) { + stateDB := dbm.NewMemDB() + return MakeGenesisState(stateDB, &types.GenesisDoc{ + ChainID: chainID, + Validators: []types.GenesisValidator{ + types.GenesisValidator{privKey.PubKey(), 10000, "test"}, + }, + AppHash: nil, + }), NewMockBlockStore(nil) +} + +//---------------------------------- +// mock block store + +type mockBlockStore struct { + chain []*types.Block +} + +func NewMockBlockStore(chain []*types.Block) *mockBlockStore { + return &mockBlockStore{chain} +} + +func (bs *mockBlockStore) Height() int { return len(bs.chain) } +func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] } diff --git a/state/state.go b/state/state.go index e1c6f88a5..033c24132 100644 --- a/state/state.go +++ b/state/state.go @@ -69,7 +69,7 @@ func (s *State) Copy() *State { LastBlockTime: s.LastBlockTime, Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), - Stale: s.Stale, // but really state shouldnt be copied while its stale + Stale: s.Stale, // XXX: but really state shouldnt be copied while its stale AppHash: s.AppHash, } } @@ -94,7 +94,7 @@ func (s *State) Bytes() []byte { } // Mutate state variables to match block and validators -// Since we don't have the AppHash yet, it becomes stale +// Since we don't have the new AppHash yet, we set s.Stale=true func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { s.LastBlockHeight = header.Height s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} diff --git a/state/state_test.go b/state/state_test.go new file mode 100644 index 000000000..a534cb695 --- /dev/null +++ b/state/state_test.go @@ -0,0 +1,42 @@ +package state + +import ( + "testing" + + dbm "github.com/tendermint/go-db" + "github.com/tendermint/tendermint/config/tendermint_test" +) + +func TestStateCopyEquals(t *testing.T) { + config := tendermint_test.ResetConfig("state_") + // Get State db + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + stateCopy := state.Copy() + + if !state.Equals(stateCopy) { + t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", stateCopy, state) + } + + stateCopy.LastBlockHeight += 1 + + if state.Equals(stateCopy) { + t.Fatal("expected states to be different. got same %v", state) + } +} + +func TestStateSaveLoad(t *testing.T) { + config := tendermint_test.ResetConfig("state_") + // Get State db + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + state.LastBlockHeight += 1 + state.Save() + + loadedState := LoadState(stateDB) + if !state.Equals(loadedState) { + t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state) + } +} diff --git a/types/block.go b/types/block.go index bf6ec2b2a..ec04e4ab8 100644 --- a/types/block.go +++ b/types/block.go @@ -21,6 +21,27 @@ type Block struct { LastCommit *Commit `json:"last_commit"` } +func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, + prevBlockID BlockID, valHash, appHash []byte) (*Block, *PartSet) { + block := &Block{ + Header: &Header{ + ChainID: chainID, + Height: height, + Time: time.Now(), + NumTxs: len(txs), + LastBlockID: prevBlockID, + ValidatorsHash: valHash, + AppHash: appHash, // state merkle root of txs from the previous block. + }, + LastCommit: commit, + Data: &Data{ + Txs: txs, + }, + } + block.FillHeader() + return block, block.MakePartSet() +} + // Basic validation that doesn't involve state data. func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockID BlockID, lastBlockTime time.Time, appHash []byte) error { diff --git a/types/validator.go b/types/validator.go index 699114f22..2e45ebba9 100644 --- a/types/validator.go +++ b/types/validator.go @@ -11,8 +11,9 @@ import ( ) // Volatile state for each Validator -// Also persisted with the state, but fields change -// every height|round so they don't go in merkle.Tree +// TODO: make non-volatile identity +// - Remove LastCommitHeight, send bitarray of vals that signed in BeginBlock +// - Remove Accum - it can be computed, and now valset becomes identifying type Validator struct { Address []byte `json:"address"` PubKey crypto.PubKey `json:"pub_key"` diff --git a/types/validator_set.go b/types/validator_set.go index 92400f67a..3f5a17d9a 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -20,7 +20,7 @@ import ( // NOTE: Not goroutine-safe. // NOTE: All get/set to validators should copy the value for safety. // TODO: consider validator Accum overflow -// TODO: replace validators []*Validator with github.com/jaekwon/go-ibbs? +// TODO: move valset into an iavl tree where key is 'blockbonded|pubkey' type ValidatorSet struct { Validators []*Validator // NOTE: persisted via reflect, must be exported. From 07597dfd45dc514d0c2f7fc0690faf4ac4929771 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 16 Nov 2016 16:13:17 -0500 Subject: [PATCH 11/13] post rebase fixes for BlockID, partSize --- consensus/state.go | 2 +- glide.lock | 2 +- node/node.go | 4 ++-- state/execution.go | 17 +++++++++-------- state/execution_test.go | 35 +++++++++++++++++++++-------------- state/state.go | 2 +- types/block.go | 4 ++-- types/protobuf.go | 12 +++++++++--- 8 files changed, 46 insertions(+), 32 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 0dc80810e..59bea2315 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -933,7 +933,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts txs := cs.mempool.Reap(cs.config.GetInt("block_size")) return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit, - cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash) + cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.GetInt("block_part_size")) } // Enter: `timeoutPropose` after entering Propose. diff --git a/glide.lock b/glide.lock index 55f0b7716..613a97ec6 100644 --- a/glide.lock +++ b/glide.lock @@ -92,7 +92,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: fec038bdec3495a2a06c6aa8f63e9716bad335dd + version: 60e0842ef9a87c840d0bf95eea7b54a1e3d312b3 subpackages: - client - example/counter diff --git a/node/node.go b/node/node.go index 8b8fe16d6..f4a04d82c 100644 --- a/node/node.go +++ b/node/node.go @@ -63,7 +63,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato state := sm.GetState(config, stateDB) // Create the proxyApp, which manages connections (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(state, blockStore)) + proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore)) if _, err := proxyApp.Start(); err != nil { Exit(Fmt("Error starting proxy app connections: %v", err)) } @@ -380,7 +380,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) // Create proxyAppConn connection (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(state, blockStore)) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore)) _, err := proxyApp.Start() if err != nil { Exit(Fmt("Error starting proxy app conns: %v", err)) diff --git a/state/execution.go b/state/execution.go index 299abbbf3..c52be41dc 100644 --- a/state/execution.go +++ b/state/execution.go @@ -7,6 +7,7 @@ import ( "github.com/ebuchman/fail-test" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -261,14 +262,15 @@ type BlockStore interface { } type Handshaker struct { - state *State - store BlockStore + config cfg.Config + state *State + store BlockStore nBlocks int // number of blocks applied to the state } -func NewHandshaker(state *State, store BlockStore) *Handshaker { - return &Handshaker{state, store, 0} +func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshaker { + return &Handshaker{config, state, store, 0} } // TODO: retry the handshake once if it fails the first time @@ -320,7 +322,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { }*/ header = block.Header - partsHeader = block.MakePartSet().Header() + partsHeader = block.MakePartSet(h.config.GetInt("block_part_size")).Header() } if configInfo != nil { @@ -355,8 +357,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, header *types.Header, partsHea lastVals, nextVals := stateCopy.GetValidators() if header == nil { stateCopy.LastBlockHeight = 0 - stateCopy.LastBlockHash = nil - stateCopy.LastBlockParts = types.PartSetHeader{} + stateCopy.LastBlockID = types.BlockID{} // stateCopy.LastBlockTime = ... doesnt matter stateCopy.Validators = nextVals stateCopy.LastValidators = lastVals @@ -422,7 +423,7 @@ func (h *Handshaker) loadApplyBlock(blockIndex int, state *State, appConnConsens block := h.store.LoadBlock(blockIndex) panicOnNilBlock(blockIndex, h.store.Height(), block) // XXX var eventCache types.Fireable // nil - return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet().Header(), mockMempool{}) + return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet(h.config.GetInt("block_part_size")).Header(), mockMempool{}) } func panicOnNilBlock(height, bsHeight int, block *types.Block) { diff --git a/state/execution_test.go b/state/execution_test.go index db724de88..dabcada63 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -2,7 +2,7 @@ package state import ( "bytes" - //"fmt" + "fmt" "path" "testing" @@ -16,10 +16,11 @@ import ( ) var ( - privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("handshake_test")) - chainID = "handshake_chain" - nBlocks = 5 - mempool = mockMempool{} + privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("handshake_test")) + chainID = "handshake_chain" + nBlocks = 5 + mempool = mockMempool{} + testPartSize = 65536 ) func TestExecBlock(t *testing.T) { @@ -53,7 +54,7 @@ func testHandshakeReplay(t *testing.T, n int) { state, store := stateAndStore() clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1"))) clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))) - proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(state, store)) + proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(config, state, store)) if _, err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } @@ -71,7 +72,7 @@ func testHandshakeReplay(t *testing.T, n int) { state2, _ := stateAndStore() for i := 0; i < n; i++ { block := chain[i] - err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet().Header(), mempool) + err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) if err != nil { t.Fatal(err) } @@ -80,7 +81,7 @@ func testHandshakeReplay(t *testing.T, n int) { } // now start it with the handshake - handshaker := NewHandshaker(state, store) + handshaker := NewHandshaker(config, state, store) proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker) if _, err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) @@ -116,11 +117,12 @@ func txsFunc(blockNum int) (txs []types.Tx) { // sign a commit vote func signCommit(height, round int, hash []byte, header types.PartSetHeader) *types.Vote { vote := &types.Vote{ + ValidatorIndex: 0, + ValidatorAddress: privKey.PubKey().Address(), Height: height, Round: round, Type: types.VoteTypePrecommit, - BlockHash: hash, - BlockPartsHeader: header, + BlockID: types.BlockID{hash, header}, } sig := privKey.Sign(types.SignBytes(chainID, vote)) @@ -131,22 +133,26 @@ func signCommit(height, round int, hash []byte, header types.PartSetHeader) *typ // make a blockchain with one validator func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockchain []*types.Block) { - prevHash := state.LastBlockHash + prevHash := state.LastBlockID.Hash lastCommit := new(types.Commit) prevParts := types.PartSetHeader{} valHash := state.Validators.Hash() + prevBlockID := types.BlockID{prevHash, prevParts} for i := 1; i < nBlocks+1; i++ { block, parts := types.MakeBlock(i, chainID, txsFunc(i), lastCommit, - prevParts, prevHash, valHash, state.AppHash) - err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet().Header(), mempool) + prevBlockID, valHash, state.AppHash, testPartSize) + fmt.Println(i) + fmt.Println(prevBlockID) + fmt.Println(block.LastBlockID) + err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) if err != nil { t.Fatal(i, err) } voteSet := types.NewVoteSet(chainID, i, 0, types.VoteTypePrecommit, state.Validators) vote := signCommit(i, 0, block.Hash(), parts.Header()) - _, _, err = voteSet.AddByIndex(0, vote) + _, err = voteSet.AddVote(vote) if err != nil { t.Fatal(err) } @@ -155,6 +161,7 @@ func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockc prevHash = block.Hash() prevParts = parts.Header() lastCommit = voteSet.MakeCommit() + prevBlockID = types.BlockID{prevHash, prevParts} } return blockchain } diff --git a/state/state.go b/state/state.go index 033c24132..4a54dfe6d 100644 --- a/state/state.go +++ b/state/state.go @@ -97,7 +97,7 @@ func (s *State) Bytes() []byte { // Since we don't have the new AppHash yet, we set s.Stale=true func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { s.LastBlockHeight = header.Height - s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} + s.LastBlockID = types.BlockID{header.Hash(), blockPartsHeader} s.LastBlockTime = header.Time s.Validators = nextValSet s.LastValidators = prevValSet diff --git a/types/block.go b/types/block.go index ec04e4ab8..80a59f8a3 100644 --- a/types/block.go +++ b/types/block.go @@ -22,7 +22,7 @@ type Block struct { } func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, - prevBlockID BlockID, valHash, appHash []byte) (*Block, *PartSet) { + prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) { block := &Block{ Header: &Header{ ChainID: chainID, @@ -39,7 +39,7 @@ func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, }, } block.FillHeader() - return block, block.MakePartSet() + return block, block.MakePartSet(partSize) } // Basic validation that doesn't involve state data. diff --git a/types/protobuf.go b/types/protobuf.go index 8d2f9b819..a7a95d121 100644 --- a/types/protobuf.go +++ b/types/protobuf.go @@ -12,17 +12,23 @@ type tm2pb struct{} func (tm2pb) Header(header *Header) *types.Header { return &types.Header{ ChainId: header.ChainID, - Height: uint64(header.Height), + Height: int32(header.Height), Time: uint64(header.Time.Unix()), NumTxs: uint64(header.NumTxs), - LastBlockHash: header.LastBlockHash, - LastBlockParts: TM2PB.PartSetHeader(header.LastBlockParts), + LastBlockId: TM2PB.BlockID(header.LastBlockID), LastCommitHash: header.LastCommitHash, DataHash: header.DataHash, AppHash: header.AppHash, } } +func (tm2pb) BlockID(blockID BlockID) *types.BlockID { + return &types.BlockID{ + Hash: blockID.Hash, + Parts: TM2PB.PartSetHeader(blockID.PartsHeader), + } +} + func (tm2pb) PartSetHeader(partSetHeader PartSetHeader) *types.PartSetHeader { return &types.PartSetHeader{ Total: uint64(partSetHeader.Total), From 904eeddf36e878edb788f99edcf9679a7544fa65 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 16 Nov 2016 16:25:52 -0500 Subject: [PATCH 12/13] update glide --- glide.lock | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/glide.lock b/glide.lock index 613a97ec6..6853c7668 100644 --- a/glide.lock +++ b/glide.lock @@ -1,14 +1,16 @@ hash: 20cb38481a78b73ba3a42af08e34cd825ddb7c826833d67cc61e45c1b3a4c484 -updated: 2016-11-15T15:54:25.75591193-05:00 +updated: 2016-11-16T16:25:10.693961906-05:00 imports: - name: github.com/btcsuite/btcd - version: d9a674e1b7bc09d0830d6986c71cf5f535d753c3 + version: b134beb3b7809de6370a93cc5f6a684d6942e2e8 subpackages: - btcec - name: github.com/btcsuite/fastsha256 version: 637e656429416087660c84436a2a035d69d54e2e - name: github.com/BurntSushi/toml version: 99064174e013895bbd9b025c31100bd1d9b590ca +- name: github.com/ebuchman/fail-test + version: c1eddaa09da2b4017351245b0d43234955276798 - name: github.com/go-stack/stack version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 - name: github.com/gogo/protobuf @@ -16,7 +18,7 @@ imports: subpackages: - proto - name: github.com/golang/protobuf - version: da116c3771bf4a398a43f44e069195ef1c9688ef + version: 224aaba33b1ac32a92a165f27489409fb8133d08 subpackages: - proto - name: github.com/golang/snappy @@ -92,7 +94,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: 60e0842ef9a87c840d0bf95eea7b54a1e3d312b3 + version: 0bdb3b887e70b1ef16d32eece0248ec071fd8490 subpackages: - client - example/counter @@ -112,7 +114,7 @@ imports: - ripemd160 - salsa20/salsa - name: golang.org/x/net - version: cac22060de4e495155959e69adcb4b45763ccb10 + version: 4971afdc2f162e82d185353533d3cf16188a9f4e subpackages: - context - http2 @@ -126,7 +128,7 @@ imports: subpackages: - unix - name: google.golang.org/grpc - version: 0d9891286aca15aeb2b0a73be9f5946c3cfefa85 + version: 941cc894cea3c87a12943fd12b594964541b6d28 subpackages: - codes - credentials From c6a648fad74179ddd8bae8cd8da705b4c51e6ea5 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 16 Nov 2016 16:47:31 -0500 Subject: [PATCH 13/13] consensus: lock before loading commit --- consensus/reactor.go | 7 +------ consensus/state.go | 9 +++++++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 0ece82ce1..60d5a8f5d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -658,12 +658,7 @@ OUTER_LOOP: { prs := ps.GetRoundState() if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() { - var commit *types.Commit - if prs.Height == conR.conS.blockStore.Height() { - commit = conR.conS.blockStore.LoadSeenCommit(prs.Height) - } else { - commit = conR.conS.blockStore.LoadBlockCommit(prs.Height) - } + commit := conR.conS.LoadCommit(prs.Height) peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ Height: prs.Height, Round: commit.Round(), diff --git a/consensus/state.go b/consensus/state.go index 59bea2315..85d4d1679 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -319,6 +319,15 @@ func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) { cs.privValidator = priv } +func (cs *ConsensusState) LoadCommit(height int) *types.Commit { + cs.mtx.Lock() + defer cs.mtx.Unlock() + if height == cs.blockStore.Height() { + return cs.blockStore.LoadSeenCommit(height) + } + return cs.blockStore.LoadBlockCommit(height) +} + func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart()