diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 8e6dbee94..220bc5ce5 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -35,6 +35,7 @@ func TestBasic(t *testing.T) { requestsCh := make(chan BlockRequest, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() + defer pool.Stop() // Introduce each peer. go func() { @@ -76,8 +77,6 @@ func TestBasic(t *testing.T) { }() } } - - pool.Stop() } func TestTimeout(t *testing.T) { @@ -87,6 +86,7 @@ func TestTimeout(t *testing.T) { requestsCh := make(chan BlockRequest, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() + defer pool.Stop() for _, peer := range peers { log.Info("Peer", "id", peer.id) @@ -131,6 +131,4 @@ func TestTimeout(t *testing.T) { log.Info("TEST: Pulled new BlockRequest", "request", request) } } - - pool.Stop() } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 90258825e..f88bccc3d 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -251,7 +251,6 @@ FOR_LOOP: // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } - bcR.state.Save() } } continue FOR_LOOP diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 3edf2df18..5ddde460d 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -101,6 +101,8 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("mempool_broadcast", true) mapConfig.SetDefault("mempool_wal_dir", rootDir+"/data/mempool.wal") + mapConfig.SetDefault("tx_index", "kv") + return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 26a483358..9d405dc95 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -107,6 +107,8 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("mempool_broadcast", true) mapConfig.SetDefault("mempool_wal_dir", "") + mapConfig.SetDefault("tx_index", "kv") + logger.SetLogLevel(mapConfig.GetString("log_level")) return mapConfig diff --git a/consensus/replay.go b/consensus/replay.go index 731e7d216..bd0975f4d 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -100,27 +100,51 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { cs.replayMode = true defer func() { cs.replayMode = false }() - // Ensure that height+1 doesn't exist - gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1)) - if found { - return errors.New(Fmt("WAL should not contain height %d.", csHeight+1)) - } + // Ensure that ENDHEIGHT for this height doesn't exist + // NOTE: This is just a sanity check. As far as we know things work fine without it, + // and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). + gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) if gr != nil { gr.Close() } + if found { + return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight)) + } - // Search for height marker - gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) + // Search for last height marker + gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) if err == io.EOF { - log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight) - return nil + log.Warn("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) + // if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead + // TODO (0.10.0): remove this + gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) + if err == io.EOF { + log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight) + return nil + } else if err != nil { + return err + } } else if err != nil { return err + } else { + defer gr.Close() } if !found { - return errors.New(Fmt("WAL does not contain height %d.", csHeight)) + // if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead + // TODO (0.10.0): remove this + gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) + if err == io.EOF { + log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight) + return nil + } else if err != nil { + return err + } else { + defer gr.Close() + } + + // TODO (0.10.0): uncomment + // return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) } - defer gr.Close() log.Notice("Catchup by replaying consensus messages", "height", csHeight) @@ -147,7 +171,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { //-------------------------------------------------------------------------------- // Parses marker lines of the form: -// #HEIGHT: 12345 +// #ENDHEIGHT: 12345 func makeHeightSearchFunc(height int) auto.SearchFunc { return func(line string) (int, error) { line = strings.TrimRight(line, "\n") @@ -273,15 +297,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), - // so run through consensus with the real app + // so replayBlock with the real app. + // NOTE: We could instead use the cs.WAL on cs.Start, + // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT log.Info("Replay last block using real app") - return h.replayLastBlock(proxyApp.Consensus()) + return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) } else if appBlockHeight == storeBlockHeight { - // We ran Commit, but didn't save the state, so run through consensus with mock app - mockApp := newMockProxyApp(appHash) + // We ran Commit, but didn't save the state, so replayBlock with mock app + abciResponses := h.state.LoadABCIResponses() + mockApp := newMockProxyApp(appHash, abciResponses) log.Info("Replay last block using mock app") - return h.replayLastBlock(mockApp) + return h.replayBlock(storeBlockHeight, mockApp) } } @@ -290,24 +317,23 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p return nil, nil } -func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { +func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, mutateState bool) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. - // If useReplayFunc == true, stop short of the last block - // so it can be replayed using the WAL in ReplayBlocks. // Note that we don't have an old version of the state, - // so we by-pass state validation using sm.ApplyBlock. + // so we by-pass state validation/mutation using sm.ExecCommitBlock. + // If mutateState == true, the final block is replayed with h.replayBlock() var appHash []byte var err error finalBlock := storeBlockHeight - if useReplayFunc { + if mutateState { finalBlock -= 1 } for i := appBlockHeight + 1; i <= finalBlock; i++ { log.Info("Applying block", "height", i) block := h.store.LoadBlock(i) - appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block) + appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block) if err != nil { return nil, err } @@ -315,44 +341,29 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store h.nBlocks += 1 } - if useReplayFunc { + if mutateState { // sync the final block - return h.ReplayBlocks(appHash, finalBlock, proxyApp) + return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) } return appHash, h.checkAppHash(appHash) } -// Replay the last block through the consensus and return the AppHash from after Commit. -func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { +// ApplyBlock on the proxyApp with the last block. +func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} - cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) - evsw := types.NewEventSwitch() - evsw.Start() - defer evsw.Stop() - cs.SetEventSwitch(evsw) - newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) + var eventCache types.Fireable // nil + block := h.store.LoadBlock(height) + meta := h.store.LoadBlockMeta(height) - // run through the WAL, commit new block, stop - if _, err := cs.Start(); err != nil { + if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { return nil, err } - defer cs.Stop() - - timeout := h.config.GetInt("timeout_handshake") - timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) - log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout) - - select { - case <-newBlockCh: - case <-timer.C: - return nil, ErrReplayLastBlockTimeout - } h.nBlocks += 1 - return cs.state.AppHash, nil + return h.state.AppHash, nil } func (h *Handshaker) checkAppHash(appHash []byte) error { @@ -364,9 +375,14 @@ func (h *Handshaker) checkAppHash(appHash []byte) error { } //-------------------------------------------------------------------------------- +// mockProxyApp uses ABCIResponses to give the right results +// Useful because we don't want to call Commit() twice for the same block on the real app. -func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { - clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) +func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus { + clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ + appHash: appHash, + abciResponses: abciResponses, + }) cli, _ := clientCreator.NewABCIClient() return proxy.NewAppConnConsensus(cli) } @@ -374,7 +390,24 @@ func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { type mockProxyApp struct { abci.BaseApplication - appHash []byte + appHash []byte + txCount int + abciResponses *sm.ABCIResponses +} + +func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { + r := mock.abciResponses.DeliverTx[mock.txCount] + mock.txCount += 1 + return abci.Result{ + r.Code, + r.Data, + r.Log, + } +} + +func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock { + mock.txCount = 0 + return mock.abciResponses.EndBlock } func (mock *mockProxyApp) Commit() abci.Result { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index c70b60fa0..43204ab72 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -443,7 +443,7 @@ func buildTMStateFromChain(config cfg.Config, state *sm.State, chain []*types.Bl func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { // Search for height marker - gr, found, err := wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(1)) + gr, found, err := wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(0)) if err != nil { return nil, nil, err } diff --git a/consensus/state.go b/consensus/state.go index 63616cbd3..6ff97ddc8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -283,7 +283,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap //---------------------------------------- // Public interface -// implements events.Eventable +// SetEventSwitch implements events.Eventable func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) { cs.evsw = evsw } @@ -1216,13 +1216,26 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX + // Finish writing to the WAL for this height. + // NOTE: If we fail before writing this, we'll never write it, + // and just recover by running ApplyBlock in the Handshake. + // If we moved it before persisting the block, we'd have to allow + // WAL replay for blocks with an #ENDHEIGHT + // As is, ConsensusState should not be started again + // until we successfully call ApplyBlock (ie. here or in Handshake after restart) + if cs.wal != nil { + cs.wal.writeEndHeight(height) + } + + fail.Fail() // XXX + // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() eventCache := types.NewEventCache(cs.evsw) - // Execute and commit the block, and update the mempool. - // All calls to the proxyAppConn should come here. + // Execute and commit the block, update and save the state, and update the mempool. + // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) if err != nil { @@ -1232,20 +1245,24 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX - // Fire off event for new block. - // TODO: Handle app failure. See #177 + // Fire event for new block. + // NOTE: If we fail before firing, these events will never fire + // + // TODO: Either + // * Fire before persisting state, in ApplyBlock + // * Fire on start up if we haven't written any new WAL msgs + // Both options mean we may fire more than once. Is that fine ? types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() - // Save the state. - stateCopy.Save() - fail.Fail() // XXX // NewHeightStep! cs.updateToState(stateCopy) + fail.Fail() // XXX + // cs.StartTime is already set. // Schedule Round0 to start soon. cs.scheduleRound0(&cs.RoundState) diff --git a/consensus/test_data/build.sh b/consensus/test_data/build.sh index ea0c9604a..2759c0e38 100644 --- a/consensus/test_data/build.sh +++ b/consensus/test_data/build.sh @@ -27,7 +27,7 @@ killall tendermint # /q would print up to and including the match, then quit. # /Q doesn't include the match. # http://unix.stackexchange.com/questions/11305/grep-show-all-the-file-up-to-the-match -sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal +sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal reset } @@ -41,7 +41,7 @@ sleep 7 killall tendermint kill -9 $PID -sed '/HEIGHT: 7/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal +sed '/ENDHEIGHT: 6/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal reset } @@ -56,7 +56,7 @@ sleep 10 killall tendermint kill -9 $PID -sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal +sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal reset } @@ -73,7 +73,7 @@ sleep 5 killall tendermint kill -9 $PID -sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal +sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal reset } diff --git a/consensus/test_data/empty_block.cswal b/consensus/test_data/empty_block.cswal index a3a3585ce..aa5b232c9 100644 --- a/consensus/test_data/empty_block.cswal +++ b/consensus/test_data/empty_block.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2016-12-18T05:05:33.502Z","msg":[3,{"duration":974084551,"height":1,"round":0,"step":1}]} {"time":"2016-12-18T05:05:33.505Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-12-18T05:05:33.505Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"71D2DA2336A9F84C22A28FF6C67F35F3478FC0AF"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"62C0F2BCCB491399EEDAF8E85837ADDD4E25BAB7A84BFC4F0E88594531FBC6D4755DEC7E6427F04AD7EB8BB89502762AB4380C7BBA93A4C297E6180EC78E3504"]}}],"peer_key":""}]} diff --git a/consensus/test_data/many_blocks.cswal b/consensus/test_data/many_blocks.cswal index 9ef06c32c..fd103cb1e 100644 --- a/consensus/test_data/many_blocks.cswal +++ b/consensus/test_data/many_blocks.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2017-02-17T23:54:19.013Z","msg":[3,{"duration":969121813,"height":1,"round":0,"step":1}]} {"time":"2017-02-17T23:54:19.014Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2017-02-17T23:54:19.014Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"2E32C8D500E936D27A47FCE3FF4BE7C1AFB3FAE1"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"105A5A834E9AE2FA2191CAB5CB20D63594BA7859BD3EB92F055C8A35476D71F0D89F9FD5B0FF030D021533C71A81BF6E8F026BF4A37FC637CF38CA35291A9D00"]}}],"peer_key":""}]} @@ -8,7 +8,7 @@ {"time":"2017-02-17T23:54:19.016Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:19.016Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":1,"round":0,"type":2,"block_id":{"hash":"3F32EE37F9EA674A2173CAD651836A8EE628B5C7","parts":{"total":1,"hash":"2E32C8D500E936D27A47FCE3FF4BE7C1AFB3FAE1"}},"signature":[1,"2B1070A5AB9305612A3AE74A8036D82B5E49E0DBBFBC7D723DB985CC8A8E72A52FF8E34D85273FEB8B901945CA541FA5142C3C4D43A04E9205ACECF53FD19B01"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:19.017Z","msg":[1,{"height":1,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 2 +#ENDHEIGHT: 1 {"time":"2017-02-17T23:54:19.019Z","msg":[1,{"height":2,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:20.017Z","msg":[3,{"duration":998073370,"height":2,"round":0,"step":1}]} {"time":"2017-02-17T23:54:20.018Z","msg":[1,{"height":2,"round":0,"step":"RoundStepPropose"}]} @@ -19,7 +19,7 @@ {"time":"2017-02-17T23:54:20.021Z","msg":[1,{"height":2,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:20.021Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":2,"round":0,"type":2,"block_id":{"hash":"32310D174A99844713693C9815D2CA660364E028","parts":{"total":1,"hash":"D008E9014CDDEA8EC95E1E99E21333241BD52DFC"}},"signature":[1,"AA9F03D0707752301D7CBFCF4F0BCDBD666A46C1CAED3910BD64A3C5C2874AAF328172646C951C5E2FD962359C382A3CBBA2C73EC9B533668C6386995B83EC08"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:20.022Z","msg":[1,{"height":2,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 3 +#ENDHEIGHT: 2 {"time":"2017-02-17T23:54:20.025Z","msg":[1,{"height":3,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:21.022Z","msg":[3,{"duration":997103974,"height":3,"round":0,"step":1}]} {"time":"2017-02-17T23:54:21.024Z","msg":[1,{"height":3,"round":0,"step":"RoundStepPropose"}]} @@ -30,7 +30,7 @@ {"time":"2017-02-17T23:54:21.028Z","msg":[1,{"height":3,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:21.028Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":3,"round":0,"type":2,"block_id":{"hash":"37AF6866DA8C3167CFC280FAE47B6ED441B00D5B","parts":{"total":1,"hash":"2E5DE5777A5AD899CD2531304F42A470509DE989"}},"signature":[1,"C900519E305EC03392E7D197D5FAB535DB240C9C0BA5375A1679C75BAAA07C7410C0EF43CF97D98F2C08A1D739667D5ACFF6233A1FAE75D3DA275AEA422EFD0F"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:21.028Z","msg":[1,{"height":3,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 4 +#ENDHEIGHT: 3 {"time":"2017-02-17T23:54:21.032Z","msg":[1,{"height":4,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:22.028Z","msg":[3,{"duration":996302067,"height":4,"round":0,"step":1}]} {"time":"2017-02-17T23:54:22.030Z","msg":[1,{"height":4,"round":0,"step":"RoundStepPropose"}]} @@ -41,7 +41,7 @@ {"time":"2017-02-17T23:54:22.033Z","msg":[1,{"height":4,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:22.033Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":4,"round":0,"type":2,"block_id":{"hash":"04715E223BF4327FFA9B0D5AD849B74A099D5DEC","parts":{"total":1,"hash":"24CEBCBEB833F56D47AD14354071B3B7A243068A"}},"signature":[1,"F544743F17479A61F94B0F68C63D254BD60493D78E818D48A5859133619AEE5E92C47CAD89C654DF64E0911C3152091E047555D5F14655D95B9681AE9B336505"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:22.034Z","msg":[1,{"height":4,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 5 +#ENDHEIGHT: 4 {"time":"2017-02-17T23:54:22.036Z","msg":[1,{"height":5,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:23.034Z","msg":[3,{"duration":997096276,"height":5,"round":0,"step":1}]} {"time":"2017-02-17T23:54:23.035Z","msg":[1,{"height":5,"round":0,"step":"RoundStepPropose"}]} @@ -52,7 +52,7 @@ {"time":"2017-02-17T23:54:23.038Z","msg":[1,{"height":5,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:23.038Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":5,"round":0,"type":2,"block_id":{"hash":"FDC6D837995BEBBBFCBF3E7D7CF44F8FDA448543","parts":{"total":1,"hash":"A52BAA9C2E52E633A1605F4B930205613E3E7A2F"}},"signature":[1,"DF51D23D5D2C57598F67791D953A6C2D9FC5865A3048ADA4469B37500D2996B95732E0DC6F99EAEAEA12B4818CE355C7B701D16857D2AC767D740C2E30E9260C"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:23.038Z","msg":[1,{"height":5,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 6 +#ENDHEIGHT: 5 {"time":"2017-02-17T23:54:23.041Z","msg":[1,{"height":6,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:24.038Z","msg":[3,{"duration":997341910,"height":6,"round":0,"step":1}]} {"time":"2017-02-17T23:54:24.040Z","msg":[1,{"height":6,"round":0,"step":"RoundStepPropose"}]} diff --git a/consensus/test_data/small_block1.cswal b/consensus/test_data/small_block1.cswal index 90103dff3..d4eff73f1 100644 --- a/consensus/test_data/small_block1.cswal +++ b/consensus/test_data/small_block1.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2016-12-18T05:05:38.593Z","msg":[3,{"duration":970717663,"height":1,"round":0,"step":1}]} {"time":"2016-12-18T05:05:38.595Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-12-18T05:05:38.595Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A434EC796DF1CECC01296E953839C4675863A4E5"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"39563C3C7EDD9855B2971457A5DABF05CFDAF52805658847EB1F05115B8341344A77761CC85E670AF1B679DA9FC0905231957174699FE8326DBE7706209BDD0B"]}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block2.cswal b/consensus/test_data/small_block2.cswal index 1be6c4c93..b5d1d282b 100644 --- a/consensus/test_data/small_block2.cswal +++ b/consensus/test_data/small_block2.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2016-12-18T05:05:43.641Z","msg":[3,{"duration":969409681,"height":1,"round":0,"step":1}]} {"time":"2016-12-18T05:05:43.643Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-12-18T05:05:43.643Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":5,"hash":"C916905C3C444501DDDAA1BF52E959B7531E762E"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"F1A8E9928889C68FD393F3983B5362AECA4A95AA13FE3C78569B2515EC046893CB718071CAF54F3F1507DCD851B37CD5557EA17BB5471D2DC6FB5AC5FBB72E02"]}}],"peer_key":""}]} diff --git a/consensus/wal.go b/consensus/wal.go index 6d8eb3819..a89eff5e4 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -59,7 +59,7 @@ func (wal *WAL) OnStart() error { if err != nil { return err } else if size == 0 { - wal.writeHeight(1) + wal.writeEndHeight(0) } _, err = wal.group.Start() return err @@ -83,12 +83,6 @@ func (wal *WAL) Save(wmsg WALMessage) { } } } - // Write #HEIGHT: XYZ if new height - if edrs, ok := wmsg.(types.EventDataRoundState); ok { - if edrs.Step == RoundStepNewHeight.String() { - wal.writeHeight(edrs.Height) - } - } // Write the wal message var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg}) err := wal.group.WriteLine(string(wmsgBytes)) @@ -101,8 +95,8 @@ func (wal *WAL) Save(wmsg WALMessage) { } } -func (wal *WAL) writeHeight(height int) { - wal.group.WriteLine(Fmt("#HEIGHT: %v", height)) +func (wal *WAL) writeEndHeight(height int) { + wal.group.WriteLine(Fmt("#ENDHEIGHT: %v", height)) // TODO: only flush when necessary if err := wal.group.Flush(); err != nil { diff --git a/glide.lock b/glide.lock index 9c2a25a31..f0ede402e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: d9724aa287c40d1b3856b6565f09235d809c8b2f7c6537c04f597137c0d6cd26 -updated: 2017-04-12T13:43:29.528413882-04:00 +updated: 2017-04-14T17:17:31.933202871-04:00 imports: - name: github.com/btcsuite/btcd version: b8df516b4b267acf2de46be593a9d948d1d2c420 @@ -85,7 +85,7 @@ imports: - name: github.com/tendermint/go-clist version: 3baa390bbaf7634251c42ad69a8682e7e3990552 - name: github.com/tendermint/go-common - version: 6af2364fa91ef2f3afc8ba0db33b66d9d3ae006c + version: 714fdaee3bb3f8670e721a75c5ddda8787b256dd subpackages: - test - name: github.com/tendermint/go-config @@ -111,13 +111,13 @@ imports: subpackages: - upnp - name: github.com/tendermint/go-rpc - version: c3295f4878019ff3fdfcac37a4c0e4bcf4bb02a7 + version: 4671c44b2d124f7f6f6243dbfbf4ae2bf42ee809 subpackages: - client - server - types - name: github.com/tendermint/go-wire - version: ad797c70affa2c81fccc5edaed63ac25144397c6 + version: 50889e2b4a9ba65b67be86a486f25853d514b937 - name: github.com/tendermint/log15 version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6 subpackages: diff --git a/node/node.go b/node/node.go index 6815c779b..990779486 100644 --- a/node/node.go +++ b/node/node.go @@ -10,12 +10,12 @@ import ( abci "github.com/tendermint/abci/types" cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-p2p" - "github.com/tendermint/go-rpc" - "github.com/tendermint/go-rpc/server" - "github.com/tendermint/go-wire" + p2p "github.com/tendermint/go-p2p" + rpc "github.com/tendermint/go-rpc" + rpcserver "github.com/tendermint/go-rpc/server" + wire "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" @@ -23,6 +23,9 @@ import ( rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/state/txindex/kv" + "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -51,6 +54,7 @@ type Node struct { consensusReactor *consensus.ConsensusReactor // for participating in the consensus proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers + txIndexer txindex.TxIndexer } func NewNodeDefault(config cfg.Config) *Node { @@ -84,6 +88,17 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato // reload the state (it may have been updated by the handshake) state = sm.LoadState(stateDB) + // Transaction indexing + var txIndexer txindex.TxIndexer + switch config.GetString("tx_index") { + case "kv": + store := dbm.NewDB("tx_index", config.GetString("db_backend"), config.GetString("db_dir")) + txIndexer = kv.NewTxIndex(store) + default: + txIndexer = &null.TxIndex{} + } + state.TxIndexer = txIndexer + // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() @@ -188,6 +203,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato consensusState: consensusState, consensusReactor: consensusReactor, proxyApp: proxyApp, + txIndexer: txIndexer, } node.BaseService = *cmn.NewBaseService(log, "Node", node) return node @@ -201,7 +217,7 @@ func (n *Node) OnStart() error { n.sw.AddListener(l) // Start the switch - n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey)) + n.sw.SetNodeInfo(n.makeNodeInfo()) n.sw.SetNodePrivKey(n.privKey) _, err := n.sw.Start() if err != nil { @@ -278,6 +294,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetAddrBook(n.addrBook) rpccore.SetProxyAppQuery(n.proxyApp.Query()) + rpccore.SetTxIndexer(n.txIndexer) } func (n *Node) startRPC() ([]net.Listener, error) { @@ -348,34 +365,39 @@ func (n *Node) ProxyApp() proxy.AppConns { return n.proxyApp } -func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { +func (n *Node) makeNodeInfo() *p2p.NodeInfo { + txIndexerStatus := "on" + if _, ok := n.txIndexer.(*null.TxIndex); ok { + txIndexerStatus = "off" + } nodeInfo := &p2p.NodeInfo{ - PubKey: privKey.PubKey().(crypto.PubKeyEd25519), - Moniker: config.GetString("moniker"), - Network: config.GetString("chain_id"), + PubKey: n.privKey.PubKey().(crypto.PubKeyEd25519), + Moniker: n.config.GetString("moniker"), + Network: n.config.GetString("chain_id"), Version: version.Version, Other: []string{ cmn.Fmt("wire_version=%v", wire.Version), cmn.Fmt("p2p_version=%v", p2p.Version), cmn.Fmt("consensus_version=%v", consensus.Version), cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), + cmn.Fmt("tx_index=%v", txIndexerStatus), }, } // include git hash in the nodeInfo if available - if rev, err := cmn.ReadFile(config.GetString("revision_file")); err == nil { + if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil { nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev))) } - if !sw.IsListening() { + if !n.sw.IsListening() { return nodeInfo } - p2pListener := sw.Listeners()[0] + p2pListener := n.sw.Listeners()[0] p2pHost := p2pListener.ExternalAddress().IP.String() p2pPort := p2pListener.ExternalAddress().Port - rpcListenAddr := config.GetString("rpc_laddr") + rpcListenAddr := n.config.GetString("rpc_laddr") // We assume that the rpcListener has the same ExternalAddress. // This is probably true because both P2P and RPC listeners use UPnP, diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 07059bca0..04595e766 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -160,6 +160,19 @@ func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) { return (*tmResult).(*ctypes.ResultCommit), nil } +func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { + tmResult := new(ctypes.TMResult) + query := map[string]interface{}{ + "hash": hash, + "prove": prove, + } + _, err := c.rpc.Call("tx", query, tmResult) + if err != nil { + return nil, errors.Wrap(err, "Tx") + } + return (*tmResult).(*ctypes.ResultTx), nil +} + func (c *HTTP) Validators() (*ctypes.ResultValidators, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult) diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 9a5ba668b..2ba890798 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -44,6 +44,7 @@ type SignClient interface { Block(height int) (*ctypes.ResultBlock, error) Commit(height int) (*ctypes.ResultCommit, error) Validators() (*ctypes.ResultValidators, error) + Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) } // HistoryClient shows us data from genesis to now in large chunks. diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 0a83db05d..d0f0d11b1 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -103,3 +103,7 @@ func (c Local) Commit(height int) (*ctypes.ResultCommit, error) { func (c Local) Validators() (*ctypes.ResultValidators, error) { return core.Validators() } + +func (c Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { + return core.Tx(hash, prove) +} diff --git a/rpc/client/mock/abci.go b/rpc/client/mock/abci.go index 13e187412..6f6fa1d47 100644 --- a/rpc/client/mock/abci.go +++ b/rpc/client/mock/abci.go @@ -45,7 +45,7 @@ func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error if c.IsOK() { go func() { a.App.DeliverTx(tx) }() } - return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log}, nil + return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil } func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { @@ -54,7 +54,7 @@ func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) if c.IsOK() { go func() { a.App.DeliverTx(tx) }() } - return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log}, nil + return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil } // ABCIMock will send all abci related request to the named app, diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index c5f32d97f..18f0f1aab 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -3,7 +3,6 @@ package client_test import ( "strings" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -11,6 +10,7 @@ import ( merktest "github.com/tendermint/merkleeyes/testutil" "github.com/tendermint/tendermint/rpc/client" rpctest "github.com/tendermint/tendermint/rpc/test" + "github.com/tendermint/tendermint/types" ) func getHTTPClient() *client.HTTP { @@ -117,49 +117,58 @@ func TestAppCalls(t *testing.T) { // write something k, v, tx := merktest.MakeTxKV() - _, err = c.BroadcastTxCommit(tx) + bres, err := c.BroadcastTxCommit(tx) require.Nil(err, "%d: %+v", i, err) + require.True(bres.DeliverTx.GetCode().IsOK()) + txh := bres.Height + apph := txh + 1 // this is where the tx will be applied to the state + // wait before querying - time.Sleep(time.Second * 1) + client.WaitForHeight(c, apph, nil) qres, err := c.ABCIQuery("/key", k, false) if assert.Nil(err) && assert.True(qres.Response.Code.IsOK()) { data := qres.Response // assert.Equal(k, data.GetKey()) // only returned for proofs assert.Equal(v, data.GetValue()) } - // +/- 1 making my head hurt - h := int(qres.Response.Height) - 1 + + // make sure we can lookup the tx with proof + // ptx, err := c.Tx(bres.TxID, true) + ptx, err := c.Tx(bres.TxID, true) + require.Nil(err, "%d: %+v", i, err) + assert.Equal(txh, ptx.Height) + assert.Equal(types.Tx(tx), ptx.Tx) // and we can even check the block is added - block, err := c.Block(h) + block, err := c.Block(apph) require.Nil(err, "%d: %+v", i, err) appHash := block.BlockMeta.Header.AppHash assert.True(len(appHash) > 0) - assert.EqualValues(h, block.BlockMeta.Header.Height) + assert.EqualValues(apph, block.BlockMeta.Header.Height) // check blockchain info, now that we know there is info // TODO: is this commented somewhere that they are returned // in order of descending height??? - info, err := c.BlockchainInfo(h-2, h) + info, err := c.BlockchainInfo(apph, apph) require.Nil(err, "%d: %+v", i, err) - assert.True(info.LastHeight > 2) - if assert.Equal(3, len(info.BlockMetas)) { + assert.True(info.LastHeight >= apph) + if assert.Equal(1, len(info.BlockMetas)) { lastMeta := info.BlockMetas[0] - assert.EqualValues(h, lastMeta.Header.Height) + assert.EqualValues(apph, lastMeta.Header.Height) bMeta := block.BlockMeta assert.Equal(bMeta.Header.AppHash, lastMeta.Header.AppHash) assert.Equal(bMeta.BlockID, lastMeta.BlockID) } // and get the corresponding commit with the same apphash - commit, err := c.Commit(h) + commit, err := c.Commit(apph) require.Nil(err, "%d: %+v", i, err) cappHash := commit.Header.AppHash assert.Equal(appHash, cappHash) assert.NotNil(commit.Commit) // compare the commits (note Commit(2) has commit from Block(3)) - commit2, err := c.Commit(h - 1) + commit2, err := c.Commit(apph - 1) require.Nil(err, "%d: %+v", i, err) assert.Equal(block.Block.LastCommit, commit2.Commit) diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index eefc226ad..c4fc3b1a1 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -4,9 +4,9 @@ import ( "fmt" "time" + abci "github.com/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" - abci "github.com/tendermint/abci/types" ) //----------------------------------------------------------------------------- @@ -18,7 +18,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { if err != nil { return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } - return &ctypes.ResultBroadcastTx{}, nil + return &ctypes.ResultBroadcastTx{TxID: tx.Hash()}, nil } // Returns with the response from CheckTx @@ -36,6 +36,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { Code: r.Code, Data: r.Data, Log: r.Log, + TxID: tx.Hash(), }, nil } @@ -65,8 +66,9 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { if checkTxR.Code != abci.CodeType_OK { // CheckTx failed! return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR, + CheckTx: checkTxR, DeliverTx: nil, + TxID: tx.Hash(), }, nil } @@ -84,14 +86,17 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { } log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR, + CheckTx: checkTxR, DeliverTx: deliverTxR, + TxID: tx.Hash(), + Height: deliverTxRes.Height, }, nil case <-timer.C: log.Error("failed to include tx") return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR, + CheckTx: checkTxR, DeliverTx: nil, + TxID: tx.Hash(), }, fmt.Errorf("Timed out waiting for transaction to be included in a block") } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 123b13dd8..4993ed992 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -2,11 +2,12 @@ package core import ( cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-p2p" + crypto "github.com/tendermint/go-crypto" + p2p "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) @@ -42,9 +43,10 @@ var ( p2pSwitch P2P // objects - pubKey crypto.PubKey - genDoc *types.GenesisDoc // cache the genesis structure - addrBook *p2p.AddrBook + pubKey crypto.PubKey + genDoc *types.GenesisDoc // cache the genesis structure + addrBook *p2p.AddrBook + txIndexer txindex.TxIndexer ) func SetConfig(c cfg.Config) { @@ -86,3 +88,7 @@ func SetAddrBook(book *p2p.AddrBook) { func SetProxyAppQuery(appConn proxy.AppConnQuery) { proxyAppQuery = appConn } + +func SetTxIndexer(indexer txindex.TxIndexer) { + txIndexer = indexer +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 643b2bf02..38e609601 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -19,6 +19,7 @@ var Routes = map[string]*rpc.RPCFunc{ "genesis": rpc.NewRPCFunc(GenesisResult, ""), "block": rpc.NewRPCFunc(BlockResult, "height"), "commit": rpc.NewRPCFunc(CommitResult, "height"), + "tx": rpc.NewRPCFunc(TxResult, "hash,prove"), "validators": rpc.NewRPCFunc(ValidatorsResult, ""), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), @@ -45,185 +46,100 @@ var Routes = map[string]*rpc.RPCFunc{ } func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { - if r, err := Subscribe(wsCtx, event); err != nil { - return nil, err - } else { - return r, nil - } + return Subscribe(wsCtx, event) } func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { - if r, err := Unsubscribe(wsCtx, event); err != nil { - return nil, err - } else { - return r, nil - } + return Unsubscribe(wsCtx, event) } func StatusResult() (ctypes.TMResult, error) { - if r, err := Status(); err != nil { - return nil, err - } else { - return r, nil - } + return Status() } func NetInfoResult() (ctypes.TMResult, error) { - if r, err := NetInfo(); err != nil { - return nil, err - } else { - return r, nil - } + return NetInfo() } func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) { - if r, err := UnsafeDialSeeds(seeds); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeDialSeeds(seeds) } func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { - if r, err := BlockchainInfo(min, max); err != nil { - return nil, err - } else { - return r, nil - } + return BlockchainInfo(min, max) } func GenesisResult() (ctypes.TMResult, error) { - if r, err := Genesis(); err != nil { - return nil, err - } else { - return r, nil - } + return Genesis() } func BlockResult(height int) (ctypes.TMResult, error) { - if r, err := Block(height); err != nil { - return nil, err - } else { - return r, nil - } + return Block(height) } func CommitResult(height int) (ctypes.TMResult, error) { - if r, err := Commit(height); err != nil { - return nil, err - } else { - return r, nil - } + return Commit(height) } func ValidatorsResult() (ctypes.TMResult, error) { - if r, err := Validators(); err != nil { - return nil, err - } else { - return r, nil - } + return Validators() } func DumpConsensusStateResult() (ctypes.TMResult, error) { - if r, err := DumpConsensusState(); err != nil { - return nil, err - } else { - return r, nil - } + return DumpConsensusState() } func UnconfirmedTxsResult() (ctypes.TMResult, error) { - if r, err := UnconfirmedTxs(); err != nil { - return nil, err - } else { - return r, nil - } + return UnconfirmedTxs() } func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { - if r, err := NumUnconfirmedTxs(); err != nil { - return nil, err - } else { - return r, nil - } + return NumUnconfirmedTxs() +} + +// Tx allow user to query the transaction results. `nil` could mean the +// transaction is in the mempool, invalidated, or was not send in the first +// place. +func TxResult(hash []byte, prove bool) (ctypes.TMResult, error) { + return Tx(hash, prove) } func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTxCommit(tx); err != nil { - return nil, err - } else { - return r, nil - } + return BroadcastTxCommit(tx) } func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTxSync(tx); err != nil { - return nil, err - } else { - return r, nil - } + return BroadcastTxSync(tx) } func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTxAsync(tx); err != nil { - return nil, err - } else { - return r, nil - } + return BroadcastTxAsync(tx) } func ABCIQueryResult(path string, data []byte, prove bool) (ctypes.TMResult, error) { - if r, err := ABCIQuery(path, data, prove); err != nil { - return nil, err - } else { - return r, nil - } + return ABCIQuery(path, data, prove) } func ABCIInfoResult() (ctypes.TMResult, error) { - if r, err := ABCIInfo(); err != nil { - return nil, err - } else { - return r, nil - } + return ABCIInfo() } func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { - if r, err := UnsafeFlushMempool(); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeFlushMempool() } func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { - if r, err := UnsafeSetConfig(typ, key, value); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeSetConfig(typ, key, value) } func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) { - if r, err := UnsafeStartCPUProfiler(filename); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeStartCPUProfiler(filename) } func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) { - if r, err := UnsafeStopCPUProfiler(); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeStopCPUProfiler() } func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) { - if r, err := UnsafeWriteHeapProfile(filename); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeWriteHeapProfile(filename) } diff --git a/rpc/core/tx.go b/rpc/core/tx.go new file mode 100644 index 000000000..7f3cdd037 --- /dev/null +++ b/rpc/core/tx.go @@ -0,0 +1,43 @@ +package core + +import ( + "fmt" + + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/state/txindex/null" + "github.com/tendermint/tendermint/types" +) + +func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { + + // if index is disabled, return error + if _, ok := txIndexer.(*null.TxIndex); ok { + return nil, fmt.Errorf("Transaction indexing is disabled.") + } + + r, err := txIndexer.Get(hash) + if err != nil { + return nil, err + } + + if r == nil { + return nil, fmt.Errorf("Tx (%X) not found", hash) + } + + height := int(r.Height) // XXX + index := int(r.Index) + + var proof types.TxProof + if prove { + block := blockStore.LoadBlock(height) + proof = block.Data.Txs.Proof(index) + } + + return &ctypes.ResultTx{ + Height: height, + Index: index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, + }, nil +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 82087e252..940aa433f 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -1,6 +1,8 @@ package core_types import ( + "strings" + abci "github.com/tendermint/abci/types" "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" @@ -38,6 +40,19 @@ type ResultStatus struct { LatestBlockTime int64 `json:"latest_block_time"` // nano } +func (s *ResultStatus) TxIndexEnabled() bool { + if s == nil || s.NodeInfo == nil { + return false + } + for _, s := range s.NodeInfo.Other { + info := strings.Split(s, "=") + if len(info) == 2 && info[0] == "tx_index" { + return info[1] == "on" + } + } + return false +} + type ResultNetInfo struct { Listening bool `json:"listening"` Listeners []string `json:"listeners"` @@ -68,11 +83,23 @@ type ResultBroadcastTx struct { Code abci.CodeType `json:"code"` Data []byte `json:"data"` Log string `json:"log"` + + TxID []byte `json:"tx_id"` } type ResultBroadcastTxCommit struct { CheckTx *abci.ResponseCheckTx `json:"check_tx"` DeliverTx *abci.ResponseDeliverTx `json:"deliver_tx"` + TxID []byte `json:"tx_id"` + Height int `json:"height"` +} + +type ResultTx struct { + Height int `json:"height"` + Index int `json:"index"` + TxResult abci.ResponseDeliverTx `json:"tx_result"` + Tx types.Tx `json:"tx"` + Proof types.TxProof `json:"proof,omitempty"` } type ResultUnconfirmedTxs struct { @@ -128,6 +155,7 @@ const ( ResultTypeBroadcastTx = byte(0x60) ResultTypeUnconfirmedTxs = byte(0x61) ResultTypeBroadcastTxCommit = byte(0x62) + ResultTypeTx = byte(0x63) // 0x7 bytes are for querying the application ResultTypeABCIQuery = byte(0x70) @@ -164,6 +192,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit}, + wire.ConcreteType{&ResultTx{}, ResultTypeTx}, wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, diff --git a/rpc/core/types/responses_test.go b/rpc/core/types/responses_test.go new file mode 100644 index 000000000..69ee4faec --- /dev/null +++ b/rpc/core/types/responses_test.go @@ -0,0 +1,38 @@ +package core_types + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/go-p2p" +) + +func TestStatusIndexer(t *testing.T) { + assert := assert.New(t) + + var status *ResultStatus + assert.False(status.TxIndexEnabled()) + + status = &ResultStatus{} + assert.False(status.TxIndexEnabled()) + + status.NodeInfo = &p2p.NodeInfo{} + assert.False(status.TxIndexEnabled()) + + cases := []struct { + expected bool + other []string + }{ + {false, nil}, + {false, []string{}}, + {false, []string{"a=b"}}, + {false, []string{"tx_indexiskv", "some=dood"}}, + {true, []string{"tx_index=on", "tx_index=other"}}, + {true, []string{"^(*^(", "tx_index=on", "a=n=b=d="}}, + } + + for _, tc := range cases { + status.NodeInfo.Other = tc.other + assert.Equal(tc.expected, status.TxIndexEnabled()) + } +} diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 43409c723..50e326050 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -13,7 +13,9 @@ import ( abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" rpc "github.com/tendermint/go-rpc/client" + "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" ) @@ -149,6 +151,91 @@ func testBroadcastTxCommit(t *testing.T, client rpc.HTTPClient) { // TODO: find tx in block } +//-------------------------------------------------------------------------------- +// query tx + +func TestURITx(t *testing.T) { + testTx(t, GetURIClient(), true) + + core.SetTxIndexer(&null.TxIndex{}) + testTx(t, GetJSONClient(), false) + core.SetTxIndexer(node.ConsensusState().GetState().TxIndexer) +} + +func TestJSONTx(t *testing.T) { + testTx(t, GetJSONClient(), true) + + core.SetTxIndexer(&null.TxIndex{}) + testTx(t, GetJSONClient(), false) + core.SetTxIndexer(node.ConsensusState().GetState().TxIndexer) +} + +func testTx(t *testing.T, client rpc.HTTPClient, withIndexer bool) { + assert, require := assert.New(t), require.New(t) + + // first we broadcast a tx + tmResult := new(ctypes.TMResult) + txBytes := randBytes(t) + tx := types.Tx(txBytes) + _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": txBytes}, tmResult) + require.Nil(err) + + res := (*tmResult).(*ctypes.ResultBroadcastTxCommit) + checkTx := res.CheckTx + require.Equal(abci.CodeType_OK, checkTx.Code) + deliverTx := res.DeliverTx + require.Equal(abci.CodeType_OK, deliverTx.Code) + mem := node.MempoolReactor().Mempool + require.Equal(0, mem.Size()) + + txHash := tx.Hash() + txHash2 := types.Tx("a different tx").Hash() + + cases := []struct { + valid bool + hash []byte + prove bool + }{ + // only valid if correct hash provided + {true, txHash, false}, + {true, txHash, true}, + {false, txHash2, false}, + {false, txHash2, true}, + {false, nil, false}, + {false, nil, true}, + } + + for i, tc := range cases { + idx := fmt.Sprintf("%d", i) + + // now we query for the tx. + // since there's only one tx, we know index=0. + tmResult = new(ctypes.TMResult) + query := map[string]interface{}{ + "hash": tc.hash, + "prove": tc.prove, + } + _, err = client.Call("tx", query, tmResult) + valid := (withIndexer && tc.valid) + if !valid { + require.NotNil(err, idx) + } else { + require.Nil(err, idx) + res2 := (*tmResult).(*ctypes.ResultTx) + assert.Equal(tx, res2.Tx, idx) + assert.Equal(res.Height, res2.Height, idx) + assert.Equal(0, res2.Index, idx) + assert.Equal(abci.CodeType_OK, res2.TxResult.Code, idx) + // time to verify the proof + proof := res2.Proof + if tc.prove && assert.Equal(tx, proof.Data, idx) { + assert.True(proof.Proof.Verify(proof.Index, proof.Total, tx.Hash(), proof.RootHash), idx) + } + } + } + +} + //-------------------------------------------------------------------------------- // Test the websocket service diff --git a/state/execution.go b/state/execution.go index aa9113011..0b1aff699 100644 --- a/state/execution.go +++ b/state/execution.go @@ -2,68 +2,49 @@ package state import ( "errors" + "fmt" - "github.com/ebuchman/fail-test" - + fail "github.com/ebuchman/fail-test" abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) //-------------------------------------------------- // Execute the block -// Execute the block to mutate State. -// Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { - +// ValExecBlock executes the block, but does NOT mutate State. +// + validates the block +// + executes block.Txs on the proxyAppConn +func (s *State) ValExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { // Validate the block. if err := s.validateBlock(block); err != nil { - return ErrInvalidBlock(err) + return nil, ErrInvalidBlock(err) } - // compute bitarray of validators that signed - signed := commitBitArrayFromBlock(block) - _ = signed // TODO send on begin block - - // copy the valset - valSet := s.Validators.Copy() - nextValSet := valSet.Copy() - // Execute the block txs - changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) + abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. - return ErrProxyAppConn(err) + return nil, ErrProxyAppConn(err) } - // update the validator set - err = updateValidators(nextValSet, changedValidators) - if err != nil { - log.Warn("Error changing validator set", "error", err) - // TODO: err or carry on? - } - - // All good! - // Update validator accums and set state variables - nextValSet.IncrementAccum(1) - s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) - - fail.Fail() // XXX - - return nil + return abciResponses, nil } // Executes block's transactions on proxyAppConn. -// Returns a list of updates to the validator set +// Returns a list of transaction results and updates to the validator set // TODO: Generate a bitmap or otherwise store tx validity in state. -func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*abci.Validator, error) { - +func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 + txIndex := 0 + abciResponses := NewABCIResponses(block) + // Execute transactions and get hash proxyCb := func(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { @@ -73,22 +54,27 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo // Blocks may include invalid txs. // reqDeliverTx := req.(abci.RequestDeliverTx) txError := "" - apTx := r.DeliverTx - if apTx.Code == abci.CodeType_OK { - validTxs += 1 + txResult := r.DeliverTx + if txResult.Code == abci.CodeType_OK { + validTxs++ } else { - log.Debug("Invalid tx", "code", r.DeliverTx.Code, "log", r.DeliverTx.Log) - invalidTxs += 1 - txError = apTx.Code.String() + log.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) + invalidTxs++ + txError = txResult.Code.String() } + + abciResponses.DeliverTx[txIndex] = txResult + txIndex++ + // NOTE: if we count we can access the tx from the block instead of // pulling it from the req event := types.EventDataTx{ - Tx: req.GetDeliverTx().Tx, - Data: apTx.Data, - Code: apTx.Code, - Log: apTx.Log, - Error: txError, + Height: block.Height, + Tx: types.Tx(req.GetDeliverTx().Tx), + Data: txResult.Data, + Code: txResult.Code, + Log: txResult.Log, + Error: txError, } types.FireEventTx(eventCache, event) } @@ -102,33 +88,29 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo return nil, err } - fail.Fail() // XXX - // Run txs of block for _, tx := range block.Txs { - fail.FailRand(len(block.Txs)) // XXX proxyAppConn.DeliverTxAsync(tx) if err := proxyAppConn.Error(); err != nil { return nil, err } } - fail.Fail() // XXX - // End block - respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height)) + abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) return nil, err } - fail.Fail() // XXX + valDiff := abciResponses.EndBlock.Diffs log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs) - if len(respEndBlock.Diffs) > 0 { - log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs)) + if len(valDiff) > 0 { + log.Info("Update to validator set", "updates", abci.ValidatorsString(valDiff)) } - return respEndBlock.Diffs, nil + + return abciResponses, nil } func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error { @@ -219,25 +201,43 @@ func (s *State) validateBlock(block *types.Block) error { } //----------------------------------------------------------------------------- -// ApplyBlock executes the block, then commits and updates the mempool atomically +// ApplyBlock validates & executes the block, updates state w/ ABCI responses, +// then commits and updates the mempool atomically, then saves state. +// Transaction results are optionally indexed. -// Execute and commit block against app, save block and state +// Validate, execute, and commit block against app, save block and state func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + abciResponses, err := s.ValExecBlock(eventCache, proxyAppConn, block) if err != nil { - return errors.New(Fmt("Exec failed for application: %v", err)) + return fmt.Errorf("Exec failed for application: %v", err) } + fail.Fail() // XXX + + // index txs. This could run in the background + s.indexTxs(abciResponses) + + // save the results before we commit + s.SaveABCIResponses(abciResponses) + + fail.Fail() // XXX + + // now update the block and validators + s.SetBlockAndValidators(block.Header, partsHeader, abciResponses) + // lock mempool, commit state, update mempoool err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) if err != nil { - return errors.New(Fmt("Commit failed for application: %v", err)) + return fmt.Errorf("Commit failed for application: %v", err) } + + fail.Fail() // XXX + + // save the state + s.Save() + return nil } @@ -268,9 +268,25 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return nil } -// Apply and commit a block, but without all the state validation. +func (s *State) indexTxs(abciResponses *ABCIResponses) { + // save the tx results using the TxIndexer + // NOTE: these may be overwriting, but the values should be the same. + batch := txindex.NewBatch(len(abciResponses.DeliverTx)) + for i, d := range abciResponses.DeliverTx { + tx := abciResponses.txs[i] + batch.Add(types.TxResult{ + Height: uint64(abciResponses.Height), + Index: uint32(i), + Tx: tx, + Result: *d, + }) + } + s.TxIndexer.AddBatch(batch) +} + +// Exec and commit a block on the proxyApp without validating or mutating the state // Returns the application root hash (result of abci.Commit) -func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { +func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) if err != nil { diff --git a/state/execution_test.go b/state/execution_test.go new file mode 100644 index 000000000..299c6baa2 --- /dev/null +++ b/state/execution_test.go @@ -0,0 +1,90 @@ +package state + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/abci/example/dummy" + crypto "github.com/tendermint/go-crypto" + dbm "github.com/tendermint/go-db" + cfg "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/types" +) + +var ( + privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("execution_test")) + chainID = "execution_chain" + testPartSize = 65536 + nTxsPerBlock = 10 +) + +func TestApplyBlock(t *testing.T) { + cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication()) + config := cfg.ResetConfig("execution_test_") + proxyApp := proxy.NewAppConns(config, cc, nil) + _, err := proxyApp.Start() + require.Nil(t, err) + defer proxyApp.Stop() + mempool := mempool.NewMempool(config, proxyApp.Mempool()) + + state := state() + indexer := &dummyIndexer{0} + state.TxIndexer = indexer + + // make block + block := makeBlock(1, state) + + err = state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) + + require.Nil(t, err) + assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works + + // TODO check state and mempool +} + +//---------------------------------------------------------------------------- + +// make some bogus txs +func makeTxs(blockNum int) (txs []types.Tx) { + for i := 0; i < nTxsPerBlock; i++ { + txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)})) + } + return txs +} + +func state() *State { + return MakeGenesisState(dbm.NewMemDB(), &types.GenesisDoc{ + ChainID: chainID, + Validators: []types.GenesisValidator{ + types.GenesisValidator{privKey.PubKey(), 10000, "test"}, + }, + AppHash: nil, + }) +} + +func makeBlock(num int, state *State) *types.Block { + prevHash := state.LastBlockID.Hash + prevParts := types.PartSetHeader{} + valHash := state.Validators.Hash() + prevBlockID := types.BlockID{prevHash, prevParts} + block, _ := types.MakeBlock(num, chainID, makeTxs(num), new(types.Commit), + prevBlockID, valHash, state.AppHash, testPartSize) + return block +} + +// dummyIndexer increments counter every time we index transaction. +type dummyIndexer struct { + Indexed int +} + +func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) { + return nil, nil +} +func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error { + indexer.Indexed += batch.Size() + return nil +} diff --git a/state/state.go b/state/state.go index c4c6d7489..086b0e710 100644 --- a/state/state.go +++ b/state/state.go @@ -6,15 +6,19 @@ import ( "sync" "time" + abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" ) var ( - stateKey = []byte("stateKey") + stateKey = []byte("stateKey") + abciResponsesKey = []byte("abciResponsesKey") ) //----------------------------------------------------------------------------- @@ -29,7 +33,7 @@ type State struct { GenesisDoc *types.GenesisDoc ChainID string - // updated at end of ExecBlock + // updated at end of SetBlockAndValidators LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockID types.BlockID LastBlockTime time.Time @@ -38,6 +42,12 @@ type State struct { // AppHash is updated after Commit AppHash []byte + + TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer. + + // Intermediate results from processing + // Persisted separately from the state + abciResponses *ABCIResponses } func LoadState(db dbm.DB) *State { @@ -45,7 +55,7 @@ func LoadState(db dbm.DB) *State { } func loadState(db dbm.DB, key []byte) *State { - s := &State{db: db} + s := &State{db: db, TxIndexer: &null.TxIndex{}} buf := db.Get(key) if len(buf) == 0 { return nil @@ -54,7 +64,7 @@ func loadState(db dbm.DB, key []byte) *State { wire.ReadBinaryPtr(&s, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + Exit(Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err)) } // TODO: ensure that buf is completely read. } @@ -72,6 +82,7 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, + TxIndexer: s.TxIndexer, // pointer here, not value } } @@ -81,6 +92,29 @@ func (s *State) Save() { s.db.SetSync(stateKey, s.Bytes()) } +// Sets the ABCIResponses in the state and writes them to disk +// in case we crash after app.Commit and before s.Save() +func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { + // save the validators to the db + s.db.SetSync(abciResponsesKey, abciResponses.Bytes()) +} + +func (s *State) LoadABCIResponses() *ABCIResponses { + abciResponses := new(ABCIResponses) + + buf := s.db.Get(abciResponsesKey) + if len(buf) != 0 { + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(abciResponses, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + Exit(Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err)) + } + // TODO: ensure that buf is completely read. + } + return abciResponses +} + func (s *State) Equals(s2 *State) bool { return bytes.Equal(s.Bytes(), s2.Bytes()) } @@ -96,7 +130,22 @@ func (s *State) Bytes() []byte { // Mutate state variables to match block and validators // after running EndBlock -func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, abciResponses *ABCIResponses) { + + // copy the valset so we can apply changes from EndBlock + // and update s.LastValidators and s.Validators + prevValSet := s.Validators.Copy() + nextValSet := prevValSet.Copy() + + // update the validator set with the latest abciResponses + err := updateValidators(nextValSet, abciResponses.EndBlock.Diffs) + if err != nil { + log.Warn("Error changing validator set", "error", err) + // TODO: err or carry on? + } + // Update validator accums and set state variables + nextValSet.IncrementAccum(1) + s.setBlockAndValidators(header.Height, types.BlockID{header.Hash(), blockPartsHeader}, header.Time, prevValSet, nextValSet) @@ -125,12 +174,46 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) state.Save() } + return state } +//-------------------------------------------------- +// ABCIResponses holds intermediate state during block processing + +type ABCIResponses struct { + Height int + + DeliverTx []*abci.ResponseDeliverTx + EndBlock abci.ResponseEndBlock + + txs types.Txs // reference for indexing results by hash +} + +func NewABCIResponses(block *types.Block) *ABCIResponses { + return &ABCIResponses{ + Height: block.Height, + DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), + txs: block.Data.Txs, + } +} + +// Serialize the ABCIResponse +func (a *ABCIResponses) Bytes() []byte { + buf, n, err := new(bytes.Buffer), new(int), new(error) + wire.WriteBinary(*a, buf, n, err) + if *err != nil { + PanicCrisis(*err) + } + return buf.Bytes() +} + //----------------------------------------------------------------------------- // Genesis +// MakeGenesisStateFromFile reads and unmarshals state from the given file. +// +// Used during replay and in tests. func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State { genDocJSON, err := ioutil.ReadFile(genDocFile) if err != nil { @@ -143,6 +226,9 @@ func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State { return MakeGenesisState(db, genDoc) } +// MakeGenesisState creates state from types.GenesisDoc. +// +// Used in tests. func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State { if len(genDoc.Validators) == 0 { Exit(Fmt("The genesis file has no validators")) @@ -176,5 +262,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State { Validators: types.NewValidatorSet(validators), LastValidators: types.NewValidatorSet(nil), AppHash: genDoc.AppHash, + TxIndexer: &null.TxIndex{}, // we do not need indexer during replay and in tests } } diff --git a/state/state_test.go b/state/state_test.go index a534cb695..dca83e801 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -1,8 +1,12 @@ package state import ( + "fmt" "testing" + "github.com/stretchr/testify/assert" + abci "github.com/tendermint/abci/types" + "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" "github.com/tendermint/tendermint/config/tendermint_test" ) @@ -40,3 +44,30 @@ func TestStateSaveLoad(t *testing.T) { t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state) } } + +func TestABCIResponsesSaveLoad(t *testing.T) { + assert := assert.New(t) + + config := tendermint_test.ResetConfig("state_") + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + state.LastBlockHeight += 1 + + // build mock responses + block := makeBlock(2, state) + abciResponses := NewABCIResponses(block) + abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} + abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} + abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ + { + PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), + Power: 10, + }, + }} + abciResponses.txs = nil + + state.SaveABCIResponses(abciResponses) + abciResponses2 := state.LoadABCIResponses() + assert.Equal(abciResponses, abciResponses2, fmt.Sprintf("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses)) +} diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go new file mode 100644 index 000000000..1c311830a --- /dev/null +++ b/state/txindex/indexer.go @@ -0,0 +1,57 @@ +package txindex + +import ( + "errors" + + "github.com/tendermint/tendermint/types" +) + +// Indexer interface defines methods to index and search transactions. +type TxIndexer interface { + + // Batch analyzes, indexes or stores a batch of transactions. + // + // NOTE We do not specify Index method for analyzing a single transaction + // here because it bears heavy perfomance loses. Almost all advanced indexers + // support batching. + AddBatch(b *Batch) error + + // Tx returns specified transaction or nil if the transaction is not indexed + // or stored. + Get(hash []byte) (*types.TxResult, error) +} + +//---------------------------------------------------- +// Txs are written as a batch + +// A Batch groups together multiple Index operations you would like performed +// at the same time. The Batch structure is NOT thread-safe. You should only +// perform operations on a batch from a single thread at a time. Once batch +// execution has started, you may not modify it. +type Batch struct { + Ops []types.TxResult +} + +// NewBatch creates a new Batch. +func NewBatch(n int) *Batch { + return &Batch{ + Ops: make([]types.TxResult, n), + } +} + +// Index adds or updates entry for the given result.Index. +func (b *Batch) Add(result types.TxResult) error { + b.Ops[result.Index] = result + return nil +} + +// Size returns the total number of operations inside the batch. +func (b *Batch) Size() int { + return len(b.Ops) +} + +//---------------------------------------------------- +// Errors + +// ErrorEmptyHash indicates empty hash +var ErrorEmptyHash = errors.New("Transaction hash cannot be empty") diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go new file mode 100644 index 000000000..03acc8dae --- /dev/null +++ b/state/txindex/kv/kv.go @@ -0,0 +1,56 @@ +package kv + +import ( + "bytes" + "fmt" + + db "github.com/tendermint/go-db" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/types" +) + +// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB). +// It could only index transaction by its identifier. +type TxIndex struct { + store db.DB +} + +// NewTxIndex returns new instance of TxIndex. +func NewTxIndex(store db.DB) *TxIndex { + return &TxIndex{store: store} +} + +// Get gets transaction from the TxIndex storage and returns it or nil if the +// transaction is not found. +func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { + if len(hash) == 0 { + return nil, txindex.ErrorEmptyHash + } + + rawBytes := txi.store.Get(hash) + if rawBytes == nil { + return nil, nil + } + + r := bytes.NewReader(rawBytes) + var n int + var err error + txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult) + if err != nil { + return nil, fmt.Errorf("Error reading TxResult: %v", err) + } + + return txResult, nil +} + +// Batch writes a batch of transactions into the TxIndex storage. +func (txi *TxIndex) AddBatch(b *txindex.Batch) error { + storeBatch := txi.store.NewBatch() + for _, result := range b.Ops { + rawBytes := wire.BinaryBytes(&result) + storeBatch.Set(result.Tx.Hash(), rawBytes) + } + storeBatch.Write() + return nil +} diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go new file mode 100644 index 000000000..9a1898d7e --- /dev/null +++ b/state/txindex/kv/kv_test.go @@ -0,0 +1,63 @@ +package kv + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + abci "github.com/tendermint/abci/types" + db "github.com/tendermint/go-db" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/types" +) + +func TestTxIndex(t *testing.T) { + indexer := &TxIndex{store: db.NewMemDB()} + + tx := types.Tx("HELLO WORLD") + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + hash := tx.Hash() + + batch := txindex.NewBatch(1) + batch.Add(*txResult) + err := indexer.AddBatch(batch) + require.Nil(t, err) + + loadedTxResult, err := indexer.Get(hash) + require.Nil(t, err) + assert.Equal(t, txResult, loadedTxResult) +} + +func benchmarkTxIndex(txsCount int, b *testing.B) { + tx := types.Tx("HELLO WORLD") + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + + dir, err := ioutil.TempDir("", "tx_index_db") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(dir) + + store := db.NewDB("tx_index", "leveldb", dir) + indexer := &TxIndex{store: store} + + batch := txindex.NewBatch(txsCount) + for i := 0; i < txsCount; i++ { + txResult.Index += 1 + batch.Add(*txResult) + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + err = indexer.AddBatch(batch) + } +} + +func BenchmarkTxIndex1(b *testing.B) { benchmarkTxIndex(1, b) } +func BenchmarkTxIndex500(b *testing.B) { benchmarkTxIndex(500, b) } +func BenchmarkTxIndex1000(b *testing.B) { benchmarkTxIndex(1000, b) } +func BenchmarkTxIndex2000(b *testing.B) { benchmarkTxIndex(2000, b) } +func BenchmarkTxIndex10000(b *testing.B) { benchmarkTxIndex(10000, b) } diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go new file mode 100644 index 000000000..4999bbdeb --- /dev/null +++ b/state/txindex/null/null.go @@ -0,0 +1,21 @@ +package null + +import ( + "errors" + + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/types" +) + +// TxIndex acts as a /dev/null. +type TxIndex struct{} + +// Tx panics. +func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { + return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`) +} + +// Batch returns nil. +func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { + return nil +} diff --git a/types/events.go b/types/events.go index aa5896e9b..114979047 100644 --- a/types/events.go +++ b/types/events.go @@ -2,10 +2,10 @@ package types import ( // for registering TMEventData as events.EventData + abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" "github.com/tendermint/go-events" "github.com/tendermint/go-wire" - abci "github.com/tendermint/abci/types" ) // Functions to generate eventId strings @@ -73,11 +73,12 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Tx Tx `json:"tx"` - Data []byte `json:"data"` - Log string `json:"log"` - Code abci.CodeType `json:"code"` - Error string `json:"error"` // this is redundant information for now + Height int `json:"height"` + Tx Tx `json:"tx"` + Data []byte `json:"data"` + Log string `json:"log"` + Code abci.CodeType `json:"code"` + Error string `json:"error"` // this is redundant information for now } // NOTE: This goes into the replay WAL diff --git a/types/tx.go b/types/tx.go index a67206d4e..df7f0e71a 100644 --- a/types/tx.go +++ b/types/tx.go @@ -1,6 +1,10 @@ package types import ( + "bytes" + "errors" + + abci "github.com/tendermint/abci/types" "github.com/tendermint/go-merkle" ) @@ -30,3 +34,80 @@ func (txs Txs) Hash() []byte { return merkle.SimpleHashFromTwoHashes(left, right) } } + +// Index returns the index of this transaction in the list, or -1 if not found +func (txs Txs) Index(tx Tx) int { + for i := range txs { + if bytes.Equal(txs[i], tx) { + return i + } + } + return -1 +} + +// Index returns the index of this transaction hash in the list, or -1 if not found +func (txs Txs) IndexByHash(hash []byte) int { + for i := range txs { + if bytes.Equal(txs[i].Hash(), hash) { + return i + } + } + return -1 +} + +// Proof returns a simple merkle proof for this node. +// +// Panics if i < 0 or i >= len(txs) +// +// TODO: optimize this! +func (txs Txs) Proof(i int) TxProof { + l := len(txs) + hashables := make([]merkle.Hashable, l) + for i := 0; i < l; i++ { + hashables[i] = txs[i] + } + root, proofs := merkle.SimpleProofsFromHashables(hashables) + + return TxProof{ + Index: i, + Total: l, + RootHash: root, + Data: txs[i], + Proof: *proofs[i], + } +} + +type TxProof struct { + Index, Total int + RootHash []byte + Data Tx + Proof merkle.SimpleProof +} + +func (tp TxProof) LeafHash() []byte { + return tp.Data.Hash() +} + +// Validate returns nil if it matches the dataHash, and is internally consistent +// otherwise, returns a sensible error +func (tp TxProof) Validate(dataHash []byte) error { + if !bytes.Equal(dataHash, tp.RootHash) { + return errors.New("Proof matches different data hash") + } + + valid := tp.Proof.Verify(tp.Index, tp.Total, tp.LeafHash(), tp.RootHash) + if !valid { + return errors.New("Proof is not internally consistent") + } + return nil +} + +// TxResult contains results of executing the transaction. +// +// One usage is indexing transaction results. +type TxResult struct { + Height uint64 `json:"height"` + Index uint32 `json:"index"` + Tx Tx `json:"tx"` + Result abci.ResponseDeliverTx `json:"result"` +} diff --git a/types/tx_test.go b/types/tx_test.go new file mode 100644 index 000000000..7688a9bf1 --- /dev/null +++ b/types/tx_test.go @@ -0,0 +1,122 @@ +package types + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + cmn "github.com/tendermint/go-common" + ctest "github.com/tendermint/go-common/test" + wire "github.com/tendermint/go-wire" +) + +func makeTxs(cnt, size int) Txs { + txs := make(Txs, cnt) + for i := 0; i < cnt; i++ { + txs[i] = cmn.RandBytes(size) + } + return txs +} + +func randInt(low, high int) int { + off := cmn.RandInt() % (high - low) + return low + off +} + +func TestTxIndex(t *testing.T) { + assert := assert.New(t) + for i := 0; i < 20; i++ { + txs := makeTxs(15, 60) + for j := 0; j < len(txs); j++ { + tx := txs[j] + idx := txs.Index(tx) + assert.Equal(j, idx) + } + assert.Equal(-1, txs.Index(nil)) + assert.Equal(-1, txs.Index(Tx("foodnwkf"))) + } +} + +func TestValidTxProof(t *testing.T) { + assert := assert.New(t) + cases := []struct { + txs Txs + }{ + {Txs{{1, 4, 34, 87, 163, 1}}}, + {Txs{{5, 56, 165, 2}, {4, 77}}}, + {Txs{Tx("foo"), Tx("bar"), Tx("baz")}}, + {makeTxs(20, 5)}, + {makeTxs(7, 81)}, + {makeTxs(61, 15)}, + } + + for h, tc := range cases { + txs := tc.txs + root := txs.Hash() + // make sure valid proof for every tx + for i := range txs { + leaf := txs[i] + leafHash := leaf.Hash() + proof := txs.Proof(i) + assert.Equal(i, proof.Index, "%d: %d", h, i) + assert.Equal(len(txs), proof.Total, "%d: %d", h, i) + assert.Equal(root, proof.RootHash, "%d: %d", h, i) + assert.Equal(leaf, proof.Data, "%d: %d", h, i) + assert.Equal(leafHash, proof.LeafHash(), "%d: %d", h, i) + assert.Nil(proof.Validate(root), "%d: %d", h, i) + assert.NotNil(proof.Validate([]byte("foobar")), "%d: %d", h, i) + + // read-write must also work + var p2 TxProof + bin := wire.BinaryBytes(proof) + err := wire.ReadBinaryBytes(bin, &p2) + if assert.Nil(err, "%d: %d: %+v", h, i, err) { + assert.Nil(p2.Validate(root), "%d: %d", h, i) + } + } + } +} + +func TestTxProofUnchangable(t *testing.T) { + // run the other test a bunch... + for i := 0; i < 40; i++ { + testTxProofUnchangable(t) + } +} + +func testTxProofUnchangable(t *testing.T) { + assert := assert.New(t) + + // make some proof + txs := makeTxs(randInt(2, 100), randInt(16, 128)) + root := txs.Hash() + i := randInt(0, len(txs)-1) + proof := txs.Proof(i) + + // make sure it is valid to start with + assert.Nil(proof.Validate(root)) + bin := wire.BinaryBytes(proof) + + // try mutating the data and make sure nothing breaks + for j := 0; j < 500; j++ { + bad := ctest.MutateByteSlice(bin) + if !bytes.Equal(bad, bin) { + assertBadProof(t, root, bad, proof) + } + } +} + +// this make sure the proof doesn't deserialize into something valid +func assertBadProof(t *testing.T, root []byte, bad []byte, good TxProof) { + var proof TxProof + err := wire.ReadBinaryBytes(bad, &proof) + if err == nil { + err = proof.Validate(root) + if err == nil { + // okay, this can happen if we have a slightly different total + // (where the path ends up the same), if it is something else, we have + // a real problem + assert.NotEqual(t, proof.Total, good.Total, "bad: %#v\ngood: %#v", proof, good) + } + } +}