diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index b2246ce54..5d36a7118 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -122,8 +122,8 @@ func NewCounterApplication() *CounterApplication { return &CounterApplication{} } -func (app *CounterApplication) Info() string { - return Fmt("txs:%v", app.txCount) +func (app *CounterApplication) Info() (string, *tmsp.TMSPInfo, *tmsp.LastBlockInfo, *tmsp.ConfigInfo) { + return Fmt("txs:%v", app.txCount), nil, nil, nil } func (app *CounterApplication) SetOption(key string, value string) (log string) { diff --git a/consensus/reactor.go b/consensus/reactor.go index 0ece82ce1..60d5a8f5d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -658,12 +658,7 @@ OUTER_LOOP: { prs := ps.GetRoundState() if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() { - var commit *types.Commit - if prs.Height == conR.conS.blockStore.Height() { - commit = conR.conS.blockStore.LoadSeenCommit(prs.Height) - } else { - commit = conR.conS.blockStore.LoadBlockCommit(prs.Height) - } + commit := conR.conS.LoadCommit(prs.Height) peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ Height: prs.Height, Round: commit.Round(), diff --git a/consensus/state.go b/consensus/state.go index e5580f192..85d4d1679 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/ebuchman/fail-test" + . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-wire" @@ -317,6 +319,15 @@ func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) { cs.privValidator = priv } +func (cs *ConsensusState) LoadCommit(height int) *types.Commit { + cs.mtx.Lock() + defer cs.mtx.Unlock() + if height == cs.blockStore.Height() { + return cs.blockStore.LoadSeenCommit(height) + } + return cs.blockStore.LoadBlockCommit(height) +} + func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() @@ -930,25 +941,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Mempool validated transactions txs := cs.mempool.Reap(cs.config.GetInt("block_size")) - block = &types.Block{ - Header: &types.Header{ - ChainID: cs.state.ChainID, - Height: cs.Height, - Time: time.Now(), - NumTxs: len(txs), - LastBlockID: cs.state.LastBlockID, - ValidatorsHash: cs.state.Validators.Hash(), - AppHash: cs.state.AppHash, // state merkle root of txs from the previous block. - }, - LastCommit: commit, - Data: &types.Data{ - Txs: txs, - }, - } - block.FillHeader() - blockParts = block.MakePartSet(cs.config.GetInt("block_part_size")) - - return block, blockParts + return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit, + cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.GetInt("block_part_size")) } // Enter: `timeoutPropose` after entering Propose. @@ -1251,50 +1245,46 @@ func (cs *ConsensusState) finalizeCommit(height int) { PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) } - log.Notice(Fmt("Finalizing commit of block with %d txs", block.NumTxs), "height", block.Height, "hash", block.Hash()) + log.Notice(Fmt("Finalizing commit of block with %d txs", block.NumTxs), + "height", block.Height, "hash", block.Hash(), "root", block.AppHash) log.Info(Fmt("%v", block)) - // Fire off event for new block. - // TODO: Handle app failure. See #177 - types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) - types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) - - // Create a copy of the state for staging - stateCopy := cs.state.Copy() - - // event cache for txs - eventCache := types.NewEventCache(cs.evsw) - - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header()) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Exec failed for application: %v", err)) - } - - // lock mempool, commit state, update mempoool - err = cs.commitStateUpdateMempool(stateCopy, block) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for application: %v", err)) - } - - // txs committed, bad ones removed from mepool; fire events - // NOTE: the block.AppHash wont reflect these txs until the next block - eventCache.Flush() + fail.Fail() // XXX // Save to blockStore. if cs.blockStore.Height() < block.Height { precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() cs.blockStore.SaveBlock(block, blockParts, seenCommit) + } else { + log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height) } + fail.Fail() // XXX + + // Create a copy of the state for staging + // and an event cache for txs + stateCopy := cs.state.Copy() + eventCache := types.NewEventCache(cs.evsw) + + // Execute and commit the block, and update the mempool. + // All calls to the proxyAppConn should come here. + // NOTE: the block.AppHash wont reflect these txs until the next block + stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + + fail.Fail() // XXX + + // Fire off event for new block. + // TODO: Handle app failure. See #177 + types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) + types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) + eventCache.Flush() + // Save the state. stateCopy.Save() + fail.Fail() // XXX + // NewHeightStep! cs.updateToState(stateCopy) @@ -1309,32 +1299,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { return } -// mempool must be locked during commit and update -// because state is typically reset on Commit and old txs must be replayed -// against committed state before new txs are run in the mempool, lest they be invalid -func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error { - cs.mempool.Lock() - defer cs.mempool.Unlock() - - // Commit block, get hash back - res := cs.proxyAppConn.CommitSync() - if res.IsErr() { - log.Warn("Error in proxyAppConn.CommitSync", "error", res) - return res - } - if res.Log != "" { - log.Debug("Commit.Log: " + res.Log) - } - - // Set the state's new AppHash - s.AppHash = res.Data - - // Update mempool. - cs.mempool.Update(block.Height, block.Txs) - - return nil -} - //----------------------------------------------------------------------------- func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { diff --git a/glide.lock b/glide.lock index b897e4f99..6853c7668 100644 --- a/glide.lock +++ b/glide.lock @@ -1,14 +1,16 @@ hash: 20cb38481a78b73ba3a42af08e34cd825ddb7c826833d67cc61e45c1b3a4c484 -updated: 2016-11-15T15:54:25.75591193-05:00 +updated: 2016-11-16T16:25:10.693961906-05:00 imports: - name: github.com/btcsuite/btcd - version: d9a674e1b7bc09d0830d6986c71cf5f535d753c3 + version: b134beb3b7809de6370a93cc5f6a684d6942e2e8 subpackages: - btcec - name: github.com/btcsuite/fastsha256 version: 637e656429416087660c84436a2a035d69d54e2e - name: github.com/BurntSushi/toml version: 99064174e013895bbd9b025c31100bd1d9b590ca +- name: github.com/ebuchman/fail-test + version: c1eddaa09da2b4017351245b0d43234955276798 - name: github.com/go-stack/stack version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 - name: github.com/gogo/protobuf @@ -16,7 +18,7 @@ imports: subpackages: - proto - name: github.com/golang/protobuf - version: da116c3771bf4a398a43f44e069195ef1c9688ef + version: 224aaba33b1ac32a92a165f27489409fb8133d08 subpackages: - proto - name: github.com/golang/snappy @@ -92,7 +94,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: eece35eeebacee1ab94b8338e77e0d1c2d880ecc + version: 0bdb3b887e70b1ef16d32eece0248ec071fd8490 subpackages: - client - example/counter @@ -112,7 +114,7 @@ imports: - ripemd160 - salsa20/salsa - name: golang.org/x/net - version: cac22060de4e495155959e69adcb4b45763ccb10 + version: 4971afdc2f162e82d185353533d3cf16188a9f4e subpackages: - context - http2 @@ -126,7 +128,7 @@ imports: subpackages: - unix - name: google.golang.org/grpc - version: 0d9891286aca15aeb2b0a73be9f5946c3cfefa85 + version: 941cc894cea3c87a12943fd12b594964541b6d28 subpackages: - codes - credentials diff --git a/mempool/mempool.go b/mempool/mempool.go index a5426991e..80841b171 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -282,8 +282,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int, txs []types.Tx) { - // mem.proxyMtx.Lock() - // defer mem.proxyMtx.Unlock() + // TODO: check err ? mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx // First, create a lookup map of txns in new txs. diff --git a/node/node.go b/node/node.go index 45e7ae001..f4a04d82c 100644 --- a/node/node.go +++ b/node/node.go @@ -60,11 +60,10 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) // Get State - state := getState(config, stateDB) + state := sm.GetState(config, stateDB) - // Create the proxyApp, which houses three connections: - // query, consensus, and mempool - proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) + // Create the proxyApp, which manages connections (consensus, mempool, query) + proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore)) if _, err := proxyApp.Start(); err != nil { Exit(Fmt("Error starting proxy app connections: %v", err)) } @@ -296,17 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 return nodeInfo } -// Load the most recent state from "state" db, -// or create a new one (and save) from genesis. -func getState(config cfg.Config, stateDB dbm.DB) *sm.State { - state := sm.LoadState(stateDB) - if state == nil { - state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - state.Save() - } - return state -} - //------------------------------------------------------------------------------ // Users wishing to: @@ -391,17 +379,19 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - // Create two proxyAppConn connections, - // one for the consensus and one for the mempool. - proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) + // Create proxyAppConn connection (consensus, mempool, query) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore)) + _, err := proxyApp.Start() + if err != nil { + Exit(Fmt("Error starting proxy app conns: %v", err)) + } // add the chainid to the global config config.Set("chain_id", state.ChainID) // Make event switch eventSwitch := types.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { + if _, err := eventSwitch.Start(); err != nil { Exit(Fmt("Failed to start event switch: %v", err)) } diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 5ba1bc158..c2f383d3a 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -14,7 +14,7 @@ type AppConnConsensus interface { InitChainSync(validators []*types.Validator) (err error) - BeginBlockSync(height uint64) (err error) + BeginBlockSync(hash []byte, header *types.Header) (err error) AppendTxAsync(tx []byte) *tmspcli.ReqRes EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) CommitSync() (res types.Result) @@ -34,7 +34,7 @@ type AppConnQuery interface { Error() error EchoSync(string) (res types.Result) - InfoSync() (res types.Result) + InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) QuerySync(tx []byte) (res types.Result) // SetOptionSync(key string, value string) (res types.Result) @@ -56,15 +56,19 @@ func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus { func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) { app.appConn.SetResponseCallback(cb) } + func (app *appConnConsensus) Error() error { return app.appConn.Error() } + func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { return app.appConn.InitChainSync(validators) } -func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) { - return app.appConn.BeginBlockSync(height) + +func (app *appConnConsensus) BeginBlockSync(hash []byte, header *types.Header) (err error) { + return app.appConn.BeginBlockSync(hash, header) } + func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { return app.appConn.AppendTxAsync(tx) } @@ -131,7 +135,7 @@ func (app *appConnQuery) EchoSync(msg string) (res types.Result) { return app.appConn.EchoSync(msg) } -func (app *appConnQuery) InfoSync() (res types.Result) { +func (app *appConnQuery) InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) { return app.appConn.InfoSync() } diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index 3b7a3413e..d9814ec3c 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -16,7 +16,7 @@ import ( type AppConnTest interface { EchoAsync(string) *tmspcli.ReqRes FlushSync() error - InfoSync() (res types.Result) + InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) } type appConnTest struct { @@ -35,7 +35,7 @@ func (app *appConnTest) FlushSync() error { return app.appConn.FlushSync() } -func (app *appConnTest) InfoSync() types.Result { +func (app *appConnTest) InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) { return app.appConn.InfoSync() } @@ -114,7 +114,7 @@ func TestInfo(t *testing.T) { proxy := NewAppConnTest(cli) t.Log("Connected") - res := proxy.InfoSync() + res, _, _, _ := proxy.InfoSync() if res.IsErr() { t.Errorf("Unexpected error: %v", err) } diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 8e4c84aa2..7095697d2 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -5,6 +5,8 @@ import ( cfg "github.com/tendermint/go-config" ) +//----------------------------- + // Tendermint's interface to the application consists of multiple connections type AppConns interface { Service @@ -14,19 +16,27 @@ type AppConns interface { Query() AppConnQuery } -func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns { - return NewMultiAppConn(config, clientCreator, state, blockStore) +func NewAppConns(config cfg.Config, clientCreator ClientCreator, handshaker Handshaker) AppConns { + return NewMultiAppConn(config, clientCreator, handshaker) +} + +//----------------------------- +// multiAppConn implements AppConns + +type Handshaker interface { + Handshake(AppConns) error } // a multiAppConn is made of a few appConns (mempool, consensus, query) -// and manages their underlying tmsp clients, ensuring they reboot together +// and manages their underlying tmsp clients, including the handshake +// which ensures the app and tendermint are synced. +// TODO: on app restart, clients must reboot together type multiAppConn struct { BaseService config cfg.Config - state State - blockStore BlockStore + handshaker Handshaker mempoolConn *appConnMempool consensusConn *appConnConsensus @@ -36,11 +46,10 @@ type multiAppConn struct { } // Make all necessary tmsp connections to the application -func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn { +func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, handshaker Handshaker) *multiAppConn { multiAppConn := &multiAppConn{ config: config, - state: state, - blockStore: blockStore, + handshaker: handshaker, clientCreator: clientCreator, } multiAppConn.BaseService = *NewBaseService(log, "multiAppConn", multiAppConn) @@ -57,6 +66,7 @@ func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } +// Returns the query Connection func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } @@ -85,11 +95,9 @@ func (app *multiAppConn) OnStart() error { } app.consensusConn = NewAppConnConsensus(concli) - // TODO: handshake - - // TODO: replay blocks - - // TODO: (on restart) replay mempool - + // ensure app is synced to the latest state + if app.handshaker != nil { + return app.handshaker.Handshake(app) + } return nil } diff --git a/proxy/state.go b/proxy/state.go deleted file mode 100644 index a52dd0f8a..000000000 --- a/proxy/state.go +++ /dev/null @@ -1,9 +0,0 @@ -package proxy - -type State interface { - // TODO -} - -type BlockStore interface { - // TODO -} diff --git a/rpc/core/tmsp.go b/rpc/core/tmsp.go index 9a19e6eeb..cecd71dbb 100644 --- a/rpc/core/tmsp.go +++ b/rpc/core/tmsp.go @@ -12,6 +12,6 @@ func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) { } func TMSPInfo() (*ctypes.ResultTMSPInfo, error) { - res := proxyAppQuery.InfoSync() - return &ctypes.ResultTMSPInfo{res}, nil + res, tmspInfo, lastBlockInfo, configInfo := proxyAppQuery.InfoSync() + return &ctypes.ResultTMSPInfo{res, tmspInfo, lastBlockInfo, configInfo}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index cd68addd5..f5f6bae02 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -69,7 +69,10 @@ type ResultUnconfirmedTxs struct { } type ResultTMSPInfo struct { - Result tmsp.Result `json:"result"` + Result tmsp.Result `json:"result"` + TMSPInfo *tmsp.TMSPInfo `json:"tmsp_info"` + LastBlockInfo *tmsp.LastBlockInfo `json:"last_block_info"` + ConfigInfo *tmsp.ConfigInfo `json:"config_info"` } type ResultTMSPQuery struct { diff --git a/state/errors.go b/state/errors.go new file mode 100644 index 000000000..0d0eae14c --- /dev/null +++ b/state/errors.go @@ -0,0 +1,55 @@ +package state + +import ( + . "github.com/tendermint/go-common" +) + +type ( + ErrInvalidBlock error + ErrProxyAppConn error + + ErrUnknownBlock struct { + height int + } + + ErrBlockHashMismatch struct { + coreHash []byte + appHash []byte + height int + } + + ErrAppBlockHeightTooHigh struct { + coreHeight int + appHeight int + } + + ErrLastStateMismatch struct { + height int + core []byte + app []byte + } + + ErrStateMismatch struct { + got *State + expected *State + } +) + +func (e ErrUnknownBlock) Error() string { + return Fmt("Could not find block #%d", e.height) +} + +func (e ErrBlockHashMismatch) Error() string { + return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.appHash, e.coreHash, e.height) +} + +func (e ErrAppBlockHeightTooHigh) Error() string { + return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) +} +func (e ErrLastStateMismatch) Error() string { + return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app) +} + +func (e ErrStateMismatch) Error() string { + return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected) +} diff --git a/state/execution.go b/state/execution.go index e5653e7e7..c52be41dc 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,19 +1,20 @@ package state import ( + "bytes" "errors" - "fmt" + + "github.com/ebuchman/fail-test" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" ) -// Validate block -func (s *State) ValidateBlock(block *types.Block) error { - return s.validateBlock(block) -} +//-------------------------------------------------- +// Execute the block // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. @@ -22,7 +23,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC // Validate the block. err := s.validateBlock(block) if err != nil { - return err + return ErrInvalidBlock(err) } // Update the validator set @@ -37,16 +38,17 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. - return err + return ErrProxyAppConn(err) } // All good! + // Update validator accums and set state variables nextValSet.IncrementAccum(1) - s.LastBlockHeight = block.Height - s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} - s.LastBlockTime = block.Time - s.Validators = nextValSet - s.LastValidators = valSet + s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) + + // save state with updated height/blockhash/validators + // but stale apphash, in case we fail between Commit and Save + s.Save() return nil } @@ -89,26 +91,34 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox proxyAppConn.SetResponseCallback(proxyCb) // Begin block - err := proxyAppConn.BeginBlockSync(uint64(block.Height)) + err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) return err } + fail.Fail() // XXX + // Run txs of block for _, tx := range block.Txs { + fail.FailRand(len(block.Txs)) // XXX proxyAppConn.AppendTxAsync(tx) if err := proxyAppConn.Error(); err != nil { return err } } + fail.Fail() // XXX + // End block changedValidators, err := proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) return err } + + fail.Fail() // XXX + // TODO: Do something with changedValidators log.Debug("TODO: Do something with changedValidators", "changedValidators", changedValidators) @@ -116,33 +126,6 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox return nil } -func (s *State) validateBlock(block *types.Block) error { - // Basic block validation. - err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) - if err != nil { - return err - } - - // Validate block LastCommit. - if block.Height == 1 { - if len(block.LastCommit.Precommits) != 0 { - return errors.New("Block at height 1 (first block) should have no LastCommit precommits") - } - } else { - if len(block.LastCommit.Precommits) != s.LastValidators.Size() { - return fmt.Errorf("Invalid block commit size. Expected %v, got %v", - s.LastValidators.Size(), len(block.LastCommit.Precommits)) - } - err := s.LastValidators.VerifyCommit( - s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) - if err != nil { - return err - } - } - - return nil -} - // Updates the LastCommitHeight of the validators in valSet, in place. // Assumes that lastValSet matches the valset of block.LastCommit // CONTRACT: lastValSet is not mutated. @@ -171,13 +154,285 @@ func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.Val } +//----------------------------------------------------- +// Validate block + +func (s *State) ValidateBlock(block *types.Block) error { + return s.validateBlock(block) +} + +func (s *State) validateBlock(block *types.Block) error { + // Basic block validation. + err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) + if err != nil { + return err + } + + // Validate block LastCommit. + if block.Height == 1 { + if len(block.LastCommit.Precommits) != 0 { + return errors.New("Block at height 1 (first block) should have no LastCommit precommits") + } + } else { + if len(block.LastCommit.Precommits) != s.LastValidators.Size() { + return errors.New(Fmt("Invalid block commit size. Expected %v, got %v", + s.LastValidators.Size(), len(block.LastCommit.Precommits))) + } + err := s.LastValidators.VerifyCommit( + s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) + if err != nil { + return err + } + } + + return nil +} + //----------------------------------------------------------------------------- +// ApplyBlock executes the block, then commits and updates the mempool atomically -type InvalidTxError struct { - Tx types.Tx - Code tmsp.CodeType +// Execute and commit block against app, save block and state +func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, + block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error { + + // Run the block on the State: + // + update validator sets + // + run txs on the proxyAppConn + err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + if err != nil { + return errors.New(Fmt("Exec failed for application: %v", err)) + } + + // lock mempool, commit state, update mempoool + err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) + if err != nil { + return errors.New(Fmt("Commit failed for application: %v", err)) + } + return nil } -func (txErr InvalidTxError) Error() string { - return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code) +// mempool must be locked during commit and update +// because state is typically reset on Commit and old txs must be replayed +// against committed state before new txs are run in the mempool, lest they be invalid +func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool Mempool) error { + mempool.Lock() + defer mempool.Unlock() + + // Commit block, get hash back + res := proxyAppConn.CommitSync() + if res.IsErr() { + log.Warn("Error in proxyAppConn.CommitSync", "error", res) + return res + } + if res.Log != "" { + log.Debug("Commit.Log: " + res.Log) + } + + // Set the state's new AppHash + s.AppHash = res.Data + + // Update mempool. + mempool.Update(block.Height, block.Txs) + + return nil +} + +// Updates to the mempool need to be synchronized with committing a block +// so apps can reset their transient state on Commit +type Mempool interface { + Lock() + Unlock() + Update(height int, txs []types.Tx) +} + +type mockMempool struct { +} + +func (m mockMempool) Lock() {} +func (m mockMempool) Unlock() {} +func (m mockMempool) Update(height int, txs []types.Tx) {} + +//---------------------------------------------------------------- +// Handshake with app to sync to latest state of core by replaying blocks + +// TODO: Should we move blockchain/store.go to its own package? +type BlockStore interface { + Height() int + LoadBlock(height int) *types.Block +} + +type Handshaker struct { + config cfg.Config + state *State + store BlockStore + + nBlocks int // number of blocks applied to the state +} + +func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshaker { + return &Handshaker{config, state, store, 0} +} + +// TODO: retry the handshake once if it fails the first time +// ... let Info take an argument determining its behaviour +func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { + // handshake is done via info request on the query conn + res, tmspInfo, blockInfo, configInfo := proxyApp.Query().InfoSync() + if res.IsErr() { + return errors.New(Fmt("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log)) + } + + if blockInfo == nil { + log.Warn("blockInfo is nil, aborting handshake") + return nil + } + + log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) + + blockHeight := int(blockInfo.BlockHeight) // safe, should be an int32 + blockHash := blockInfo.BlockHash + appHash := blockInfo.AppHash + + if tmspInfo != nil { + // TODO: check tmsp version (or do this in the tmspcli?) + _ = tmspInfo + } + + // last block (nil if we starting from 0) + var header *types.Header + var partsHeader types.PartSetHeader + + // replay all blocks after blockHeight + // if blockHeight == 0, we will replay everything + if blockHeight != 0 { + block := h.store.LoadBlock(blockHeight) + if block == nil { + return ErrUnknownBlock{blockHeight} + } + + // check block hash + if !bytes.Equal(block.Hash(), blockHash) { + return ErrBlockHashMismatch{block.Hash(), blockHash, blockHeight} + } + + // NOTE: app hash should be in the next block ... + // check app hash + /*if !bytes.Equal(block.Header.AppHash, appHash) { + return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, block.Header.AppHash) + }*/ + + header = block.Header + partsHeader = block.MakePartSet(h.config.GetInt("block_part_size")).Header() + } + + if configInfo != nil { + // TODO: set config info + _ = configInfo + } + + // replay blocks up to the latest in the blockstore + err := h.ReplayBlocks(appHash, header, partsHeader, proxyApp.Consensus()) + if err != nil { + return errors.New(Fmt("Error on replay: %v", err)) + } + + // TODO: (on restart) replay mempool + + return nil +} + +// Replay all blocks after blockHeight and ensure the result matches the current state. +func (h *Handshaker) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader, + appConnConsensus proxy.AppConnConsensus) error { + + // NOTE/TODO: tendermint may crash after the app commits + // but before it can save the new state root. + // it should save all eg. valset changes before calling Commit. + // then, if tm state is behind app state, the only thing missing can be app hash + + // get a fresh state and reset to the apps latest + stateCopy := h.state.Copy() + + // TODO: put validators in iavl tree so we can set the state with an older validator set + lastVals, nextVals := stateCopy.GetValidators() + if header == nil { + stateCopy.LastBlockHeight = 0 + stateCopy.LastBlockID = types.BlockID{} + // stateCopy.LastBlockTime = ... doesnt matter + stateCopy.Validators = nextVals + stateCopy.LastValidators = lastVals + } else { + stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) + } + stateCopy.Stale = false + stateCopy.AppHash = appHash + + appBlockHeight := stateCopy.LastBlockHeight + coreBlockHeight := h.store.Height() + if coreBlockHeight < appBlockHeight { + // if the app is ahead, there's nothing we can do + return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight} + + } else if coreBlockHeight == appBlockHeight { + // if we crashed between Commit and SaveState, + // the state's app hash is stale. + // otherwise we're synced + if h.state.Stale { + h.state.Stale = false + h.state.AppHash = appHash + } + return checkState(h.state, stateCopy) + + } else if h.state.LastBlockHeight == appBlockHeight { + // core is ahead of app but core's state height is at apps height + // this happens if we crashed after saving the block, + // but before committing it. We should be 1 ahead + if coreBlockHeight != appBlockHeight+1 { + PanicSanity(Fmt("core.state.height == app.height but core.height (%d) > app.height+1 (%d)", coreBlockHeight, appBlockHeight+1)) + } + + // check that the blocks last apphash is the states apphash + block := h.store.LoadBlock(coreBlockHeight) + if !bytes.Equal(block.Header.AppHash, appHash) { + return ErrLastStateMismatch{coreBlockHeight, block.Header.AppHash, appHash} + } + + // replay the block against the actual tendermint state (not the copy) + return h.loadApplyBlock(coreBlockHeight, h.state, appConnConsensus) + + } else { + // either we're caught up or there's blocks to replay + // replay all blocks starting with appBlockHeight+1 + for i := appBlockHeight + 1; i <= coreBlockHeight; i++ { + h.loadApplyBlock(i, stateCopy, appConnConsensus) + } + return checkState(h.state, stateCopy) + } +} + +func checkState(s, stateCopy *State) error { + // The computed state and the previously set state should be identical + if !s.Equals(stateCopy) { + return ErrStateMismatch{stateCopy, s} + } + return nil +} + +func (h *Handshaker) loadApplyBlock(blockIndex int, state *State, appConnConsensus proxy.AppConnConsensus) error { + h.nBlocks += 1 + block := h.store.LoadBlock(blockIndex) + panicOnNilBlock(blockIndex, h.store.Height(), block) // XXX + var eventCache types.Fireable // nil + return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet(h.config.GetInt("block_part_size")).Header(), mockMempool{}) +} + +func panicOnNilBlock(height, bsHeight int, block *types.Block) { + if block == nil { + // Sanity? + PanicCrisis(Fmt(` +block is nil for height <= blockStore.Height() (%d <= %d). +Block: %v, +`, height, bsHeight, block)) + + } } diff --git a/state/execution_test.go b/state/execution_test.go new file mode 100644 index 000000000..dabcada63 --- /dev/null +++ b/state/execution_test.go @@ -0,0 +1,193 @@ +package state + +import ( + "bytes" + "fmt" + "path" + "testing" + + "github.com/tendermint/tendermint/config/tendermint_test" + // . "github.com/tendermint/go-common" + "github.com/tendermint/go-crypto" + dbm "github.com/tendermint/go-db" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/dummy" +) + +var ( + privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("handshake_test")) + chainID = "handshake_chain" + nBlocks = 5 + mempool = mockMempool{} + testPartSize = 65536 +) + +func TestExecBlock(t *testing.T) { + // TODO +} + +// Sync from scratch +func TestHandshakeReplayAll(t *testing.T) { + testHandshakeReplay(t, 0) +} + +// Sync many, not from scratch +func TestHandshakeReplaySome(t *testing.T) { + testHandshakeReplay(t, 1) +} + +// Sync from lagging by one +func TestHandshakeReplayOne(t *testing.T) { + testHandshakeReplay(t, nBlocks-1) +} + +// Sync from caught up +func TestHandshakeReplayNone(t *testing.T) { + testHandshakeReplay(t, nBlocks) +} + +// Make some blocks. Start a fresh app and apply n blocks. Then restart the app and sync it up with the remaining blocks +func testHandshakeReplay(t *testing.T, n int) { + config := tendermint_test.ResetConfig("proxy_test_") + + state, store := stateAndStore() + clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1"))) + clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))) + proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(config, state, store)) + if _, err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + chain := makeBlockchain(t, proxyApp, state) + store.chain = chain // + latestAppHash := state.AppHash + proxyApp.Stop() + + if n > 0 { + // start a new app without handshake, play n blocks + proxyApp = proxy.NewAppConns(config, clientCreator2, nil) + if _, err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + state2, _ := stateAndStore() + for i := 0; i < n; i++ { + block := chain[i] + err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) + if err != nil { + t.Fatal(err) + } + } + proxyApp.Stop() + } + + // now start it with the handshake + handshaker := NewHandshaker(config, state, store) + proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker) + if _, err := proxyApp.Start(); err != nil { + t.Fatalf("Error starting proxy app connections: %v", err) + } + + // get the latest app hash from the app + r, _, blockInfo, _ := proxyApp.Query().InfoSync() + if r.IsErr() { + t.Fatal(r) + } + + // the app hash should be synced up + if !bytes.Equal(latestAppHash, blockInfo.AppHash) { + t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", blockInfo.AppHash, latestAppHash) + } + + if handshaker.nBlocks != nBlocks-n { + t.Fatalf("Expected handshake to sync %d blocks, got %d", nBlocks-n, handshaker.nBlocks) + } + +} + +//-------------------------- + +// make some bogus txs +func txsFunc(blockNum int) (txs []types.Tx) { + for i := 0; i < 10; i++ { + txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)})) + } + return txs +} + +// sign a commit vote +func signCommit(height, round int, hash []byte, header types.PartSetHeader) *types.Vote { + vote := &types.Vote{ + ValidatorIndex: 0, + ValidatorAddress: privKey.PubKey().Address(), + Height: height, + Round: round, + Type: types.VoteTypePrecommit, + BlockID: types.BlockID{hash, header}, + } + + sig := privKey.Sign(types.SignBytes(chainID, vote)) + vote.Signature = sig.(crypto.SignatureEd25519) + return vote +} + +// make a blockchain with one validator +func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockchain []*types.Block) { + + prevHash := state.LastBlockID.Hash + lastCommit := new(types.Commit) + prevParts := types.PartSetHeader{} + valHash := state.Validators.Hash() + prevBlockID := types.BlockID{prevHash, prevParts} + + for i := 1; i < nBlocks+1; i++ { + block, parts := types.MakeBlock(i, chainID, txsFunc(i), lastCommit, + prevBlockID, valHash, state.AppHash, testPartSize) + fmt.Println(i) + fmt.Println(prevBlockID) + fmt.Println(block.LastBlockID) + err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) + if err != nil { + t.Fatal(i, err) + } + + voteSet := types.NewVoteSet(chainID, i, 0, types.VoteTypePrecommit, state.Validators) + vote := signCommit(i, 0, block.Hash(), parts.Header()) + _, err = voteSet.AddVote(vote) + if err != nil { + t.Fatal(err) + } + + blockchain = append(blockchain, block) + prevHash = block.Hash() + prevParts = parts.Header() + lastCommit = voteSet.MakeCommit() + prevBlockID = types.BlockID{prevHash, prevParts} + } + return blockchain +} + +// fresh state and mock store +func stateAndStore() (*State, *mockBlockStore) { + stateDB := dbm.NewMemDB() + return MakeGenesisState(stateDB, &types.GenesisDoc{ + ChainID: chainID, + Validators: []types.GenesisValidator{ + types.GenesisValidator{privKey.PubKey(), 10000, "test"}, + }, + AppHash: nil, + }), NewMockBlockStore(nil) +} + +//---------------------------------- +// mock block store + +type mockBlockStore struct { + chain []*types.Block +} + +func NewMockBlockStore(chain []*types.Block) *mockBlockStore { + return &mockBlockStore{chain} +} + +func (bs *mockBlockStore) Height() int { return len(bs.chain) } +func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] } diff --git a/state/state.go b/state/state.go index 213484863..4a54dfe6d 100644 --- a/state/state.go +++ b/state/state.go @@ -7,6 +7,7 @@ import ( "time" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -20,16 +21,25 @@ var ( // NOTE: not goroutine-safe. type State struct { - mtx sync.Mutex - db dbm.DB - GenesisDoc *types.GenesisDoc - ChainID string + // mtx for writing to db + mtx sync.Mutex + db dbm.DB + + // should not change + GenesisDoc *types.GenesisDoc + ChainID string + + // updated at end of ExecBlock LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockID types.BlockID LastBlockTime time.Time Validators *types.ValidatorSet LastValidators *types.ValidatorSet - AppHash []byte + + // AppHash is updated after Commit; + // it's stale after ExecBlock and before Commit + Stale bool + AppHash []byte } func LoadState(db dbm.DB) *State { @@ -59,6 +69,7 @@ func (s *State) Copy() *State { LastBlockTime: s.LastBlockTime, Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), + Stale: s.Stale, // XXX: but really state shouldnt be copied while its stale AppHash: s.AppHash, } } @@ -66,13 +77,47 @@ func (s *State) Copy() *State { func (s *State) Save() { s.mtx.Lock() defer s.mtx.Unlock() + s.db.Set(stateKey, s.Bytes()) +} +func (s *State) Equals(s2 *State) bool { + return bytes.Equal(s.Bytes(), s2.Bytes()) +} + +func (s *State) Bytes() []byte { buf, n, err := new(bytes.Buffer), new(int), new(error) wire.WriteBinary(s, buf, n, err) if *err != nil { PanicCrisis(*err) } - s.db.Set(stateKey, buf.Bytes()) + return buf.Bytes() +} + +// Mutate state variables to match block and validators +// Since we don't have the new AppHash yet, we set s.Stale=true +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { + s.LastBlockHeight = header.Height + s.LastBlockID = types.BlockID{header.Hash(), blockPartsHeader} + s.LastBlockTime = header.Time + s.Validators = nextValSet + s.LastValidators = prevValSet + + s.Stale = true +} + +func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { + return s.LastValidators, s.Validators +} + +// Load the most recent state from "state" db, +// or create a new one (and save) from genesis. +func GetState(config cfg.Config, stateDB dbm.DB) *State { + state := LoadState(stateDB) + if state == nil { + state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + state.Save() + } + return state } //----------------------------------------------------------------------------- diff --git a/state/state_test.go b/state/state_test.go new file mode 100644 index 000000000..a534cb695 --- /dev/null +++ b/state/state_test.go @@ -0,0 +1,42 @@ +package state + +import ( + "testing" + + dbm "github.com/tendermint/go-db" + "github.com/tendermint/tendermint/config/tendermint_test" +) + +func TestStateCopyEquals(t *testing.T) { + config := tendermint_test.ResetConfig("state_") + // Get State db + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + stateCopy := state.Copy() + + if !state.Equals(stateCopy) { + t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", stateCopy, state) + } + + stateCopy.LastBlockHeight += 1 + + if state.Equals(stateCopy) { + t.Fatal("expected states to be different. got same %v", state) + } +} + +func TestStateSaveLoad(t *testing.T) { + config := tendermint_test.ResetConfig("state_") + // Get State db + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + state.LastBlockHeight += 1 + state.Save() + + loadedState := LoadState(stateDB) + if !state.Equals(loadedState) { + t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state) + } +} diff --git a/test/persist/test.sh b/test/persist/test.sh new file mode 100644 index 000000000..5c1e12411 --- /dev/null +++ b/test/persist/test.sh @@ -0,0 +1,68 @@ +#! /bin/bash + + +export TMROOT=$HOME/.tendermint_persist + +rm -rf $TMROOT +tendermint init + +function start_procs(){ + name=$1 + echo "Starting persistent dummy and tendermint" + dummy --persist $TMROOT/dummy &> "dummy_${name}.log" & + PID_DUMMY=$! + tendermint node &> tendermint_${name}.log & + PID_TENDERMINT=$! + sleep 5 +} + +function kill_procs(){ + kill -9 $PID_DUMMY $PID_TENDERMINT +} + + +function send_txs(){ + # send a bunch of txs over a few blocks + echo "Sending txs" + for i in `seq 1 5`; do + for j in `seq 1 100`; do + tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'` + curl -s 127.0.0.1:46657/broadcast_tx_async?tx=\"$tx\" &> /dev/null + done + sleep 1 + done +} + + +start_procs 1 +send_txs +kill_procs +start_procs 2 + +# wait for node to handshake and make a new block +addr="localhost:46657" +curl -s $addr/status > /dev/null +ERR=$? +i=0 +while [ "$ERR" != 0 ]; do + sleep 1 + curl -s $addr/status > /dev/null + ERR=$? + i=$(($i + 1)) + if [[ $i == 10 ]]; then + echo "Timed out waiting for tendermint to start" + exit 1 + fi +done + +# wait for a new block +h1=`curl -s $addr/status | jq .result[1].latest_block_height` +h2=$h1 +while [ "$h2" == "$h1" ]; do + sleep 1 + h2=`curl -s $addr/status | jq .result[1].latest_block_height` +done + +kill_procs + +echo "Passed Test: Persistence" diff --git a/test/persist/test2.sh b/test/persist/test2.sh new file mode 100644 index 000000000..509deee79 --- /dev/null +++ b/test/persist/test2.sh @@ -0,0 +1,104 @@ +#! /bin/bash + + +export TMROOT=$HOME/.tendermint_persist + +rm -rf $TMROOT +tendermint init + +function start_procs(){ + name=$1 + indexToFail=$2 + echo "Starting persistent dummy and tendermint" + dummy --persist $TMROOT/dummy &> "dummy_${name}.log" & + PID_DUMMY=$! + if [[ "$indexToFail" == "" ]]; then + # run in background, dont fail + tendermint node &> tendermint_${name}.log & + PID_TENDERMINT=$! + else + # run in foreground, fail + FAIL_TEST_INDEX=$indexToFail tendermint node &> tendermint_${name}.log + PID_TENDERMINT=$! + fi +} + +function kill_procs(){ + kill -9 $PID_DUMMY $PID_TENDERMINT + wait $PID_DUMMY + wait $PID_TENDERMINT +} + + +# wait till node is up, send txs +function send_txs(){ + addr="127.0.0.1:46657" + curl -s $addr/status > /dev/null + ERR=$? + while [ "$ERR" != 0 ]; do + sleep 1 + curl -s $addr/status > /dev/null + ERR=$? + done + + # send a bunch of txs over a few blocks + echo "Node is up, sending txs" + for i in `seq 1 5`; do + for j in `seq 1 100`; do + tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'` + curl -s $addr/broadcast_tx_async?tx=\"$tx\" &> /dev/null + done + sleep 1 + done +} + + +failsStart=0 +fails=`grep -r "fail.Fail" --include \*.go . | wc -l` +failsEnd=$(($fails-1)) + +for failIndex in `seq $failsStart $failsEnd`; do + echo "" + echo "* Test FailIndex $failIndex" + # test failure at failIndex + + send_txs & + start_procs 1 $failIndex + + # tendermint should fail when it hits the fail index + kill -9 $PID_DUMMY + wait $PID_DUMMY + + start_procs 2 + + # wait for node to handshake and make a new block + addr="localhost:46657" + curl -s $addr/status > /dev/null + ERR=$? + i=0 + while [ "$ERR" != 0 ]; do + sleep 1 + curl -s $addr/status > /dev/null + ERR=$? + i=$(($i + 1)) + if [[ $i == 10 ]]; then + echo "Timed out waiting for tendermint to start" + exit 1 + fi + done + + # wait for a new block + h1=`curl -s $addr/status | jq .result[1].latest_block_height` + h2=$h1 + while [ "$h2" == "$h1" ]; do + sleep 1 + h2=`curl -s $addr/status | jq .result[1].latest_block_height` + done + + kill_procs + + echo "* Passed Test for FailIndex $failIndex" + echo "" +done + +echo "Passed Test: Persistence" diff --git a/test/run_test.sh b/test/run_test.sh index 9ef4388f2..ba4e1b0e4 100644 --- a/test/run_test.sh +++ b/test/run_test.sh @@ -11,6 +11,9 @@ bash test/test_cover.sh # run the app tests bash test/app/test.sh +# run the persistence test +bash test/persist/test.sh + if [[ "$BRANCH" == "master" || $(echo "$BRANCH" | grep "release-") != "" ]]; then echo "" echo "* branch $BRANCH; testing libs" diff --git a/types/block.go b/types/block.go index bf6ec2b2a..80a59f8a3 100644 --- a/types/block.go +++ b/types/block.go @@ -21,6 +21,27 @@ type Block struct { LastCommit *Commit `json:"last_commit"` } +func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, + prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) { + block := &Block{ + Header: &Header{ + ChainID: chainID, + Height: height, + Time: time.Now(), + NumTxs: len(txs), + LastBlockID: prevBlockID, + ValidatorsHash: valHash, + AppHash: appHash, // state merkle root of txs from the previous block. + }, + LastCommit: commit, + Data: &Data{ + Txs: txs, + }, + } + block.FillHeader() + return block, block.MakePartSet(partSize) +} + // Basic validation that doesn't involve state data. func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockID BlockID, lastBlockTime time.Time, appHash []byte) error { diff --git a/types/events.go b/types/events.go index fc7e0ac6b..c6eb7611a 100644 --- a/types/events.go +++ b/types/events.go @@ -130,7 +130,9 @@ func NewEventCache(evsw EventSwitch) EventCache { // All events should be based on this FireEvent to ensure they are TMEventData func fireEvent(fireable events.Fireable, event string, data TMEventData) { - fireable.FireEvent(event, data) + if fireable != nil { + fireable.FireEvent(event, data) + } } func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) { diff --git a/types/protobuf.go b/types/protobuf.go new file mode 100644 index 000000000..a7a95d121 --- /dev/null +++ b/types/protobuf.go @@ -0,0 +1,44 @@ +package types + +import ( + "github.com/tendermint/tmsp/types" +) + +// Convert tendermint types to protobuf types +var TM2PB = tm2pb{} + +type tm2pb struct{} + +func (tm2pb) Header(header *Header) *types.Header { + return &types.Header{ + ChainId: header.ChainID, + Height: int32(header.Height), + Time: uint64(header.Time.Unix()), + NumTxs: uint64(header.NumTxs), + LastBlockId: TM2PB.BlockID(header.LastBlockID), + LastCommitHash: header.LastCommitHash, + DataHash: header.DataHash, + AppHash: header.AppHash, + } +} + +func (tm2pb) BlockID(blockID BlockID) *types.BlockID { + return &types.BlockID{ + Hash: blockID.Hash, + Parts: TM2PB.PartSetHeader(blockID.PartsHeader), + } +} + +func (tm2pb) PartSetHeader(partSetHeader PartSetHeader) *types.PartSetHeader { + return &types.PartSetHeader{ + Total: uint64(partSetHeader.Total), + Hash: partSetHeader.Hash, + } +} + +func (tm2pb) Validator(val *Validator) *types.Validator { + return &types.Validator{ + PubKey: val.PubKey.Bytes(), + Power: uint64(val.VotingPower), + } +} diff --git a/types/validator.go b/types/validator.go index 699114f22..2e45ebba9 100644 --- a/types/validator.go +++ b/types/validator.go @@ -11,8 +11,9 @@ import ( ) // Volatile state for each Validator -// Also persisted with the state, but fields change -// every height|round so they don't go in merkle.Tree +// TODO: make non-volatile identity +// - Remove LastCommitHeight, send bitarray of vals that signed in BeginBlock +// - Remove Accum - it can be computed, and now valset becomes identifying type Validator struct { Address []byte `json:"address"` PubKey crypto.PubKey `json:"pub_key"` diff --git a/types/validator_set.go b/types/validator_set.go index 92400f67a..3f5a17d9a 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -20,7 +20,7 @@ import ( // NOTE: Not goroutine-safe. // NOTE: All get/set to validators should copy the value for safety. // TODO: consider validator Accum overflow -// TODO: replace validators []*Validator with github.com/jaekwon/go-ibbs? +// TODO: move valset into an iavl tree where key is 'blockbonded|pubkey' type ValidatorSet struct { Validators []*Validator // NOTE: persisted via reflect, must be exported.