diff --git a/DOCKER/README.md b/DOCKER/README.md index 92617c189..e765fc525 100644 --- a/DOCKER/README.md +++ b/DOCKER/README.md @@ -1,5 +1,16 @@ +# Docker + +Tendermint uses docker for deployment of testnets via the [mintnet](github.com/tendermint/mintnet) tool. + +For faster development iterations (ie. to avoid docker builds), +the dockerfile just sets up the OS, and tendermint is fetched/installed at runtime. + +For the deterministic docker builds used in testing, see the [tests directory](https://github.com/tendermint/tendermint/tree/master/test) + # Build and run a docker image and container +These are notes for the dev team. + ``` # Build base Docker image # Make sure ./run.sh exists. diff --git a/INSTALL.md b/INSTALL.md new file mode 100644 index 000000000..eca4f0aa9 --- /dev/null +++ b/INSTALL.md @@ -0,0 +1,57 @@ +# Install Go + +[Install Go, set the `GOPATH`, and put `GOPATH/bin` on your `PATH`](https://github.com/tendermint/tendermint/wiki/Setting-GOPATH). + +# Install Tendermint + +You should be able to install the latest with a simple `go get -u github.com/tendermint/tendermint/cmd/tendermint`. +The `-u` makes sure all dependencies are updated as well. + +Run `tendermint version` and `tendermint --help`. + +If the install falied, see [vendored dependencies below](#vendored-dependencies). + +To start a one-node blockchain with a simple in-process application: + +``` +tendermint init +tendermint node --proxy_app=dummy +``` + +See the [application developers guide](https://github.com/tendermint/tendermint/wiki/Application-Developers) for more details on building and running applications. + + +## Vendored dependencies + +If the `go get` failed, updated dependencies may have broken the build. +Install the correct version of each dependency using `glide`. + +Fist, install `glide`: + +``` +go get github.com/Masterminds/glide +``` + +Now, fetch the dependencies and install them with `glide` and `go`: + +``` +cd $GOPATH/src/github.com/tendermint/tendermint +glide install +go install ./cmd/tendermint +``` + +Sometimes `glide install` is painfully slow. Hang in there champ. + +The latest Tendermint Core version is now installed. Check by running `tendermint version`. + +## Troubleshooting + +If `go get` failing bothers you, fetch the code using `git`: + +``` +mkdir -p $GOPATH/src/github.com/tendermint +git clone https://github.com/tendermint/tendermint $GOPATH/src/github.com/tendermint/tendermint +cd $GOPATH/src/github.com/tendermint/tendermint +glide install +go install ./cmd/tendermint +``` diff --git a/INSTALL/FORK_TESTNET.md b/INSTALL/FORK_TESTNET.md deleted file mode 100644 index 37063069b..000000000 --- a/INSTALL/FORK_TESTNET.md +++ /dev/null @@ -1,21 +0,0 @@ -1. Fork github.com/tendermint/tendermint. -2. Run "make", it should install the daemon, which we named "tendermint". -3. Run "tendermint gen_account". Save the address, pub_key bytes, and priv_key bytes. - This is your developer key for controlling the cloud nodes. -4. Also run "tendermint gen_validator" 5 times, once for each cloud node. Save the output. -5. Create a directory ~/.debora/ and copy cmd/debora/default.cfg into ~/.debora/default.cfg - Copy the priv_key bytes from step 4 into ~/.debora/default.cfg where it says so. - Change the list of hosts in ~/.debora/default.cfg with your own set of 5 cloud nodes. -6. Replace cmd/barak/seed's pubkey with the pub_key bytes from step 3. -7. Update config/tendermint/config.go's genesis with validator pubkeys from step 4. - Give each of your nodes the same amount of voting power. - Set up the accounts however you want. -8. On each cloud node, follow the instructions here: https://github.com/tendermint/tendermint/tree/master/INSTALL - Create tmuser, install go, and also install 'barak'. - Then, run `barak -config="cmd/barak/seed"`. - You don't need to start the node at this time. -9. Now you can run "debora list" on your development machine and post commands to each cloud node. -10. Run scripts/unsafe_upgrade_barak.sh to test that barak is running. - The old barak you started on step 8 should now have quit. - A new instance of barak should be running. Check with `ps -ef | grep "barak"` -11. Run scripts/unsafe_restart_net.sh start your new testnet. diff --git a/INSTALL/README.md b/INSTALL/README.md deleted file mode 100644 index 88e416da5..000000000 --- a/INSTALL/README.md +++ /dev/null @@ -1,30 +0,0 @@ -NOTE: Only Ubuntu 14.04 64bit is supported at this time. - -### Server setup / create `tmuser` - -Secure the server, install dependencies, and create a new user `tmuser` - - curl -L https://raw.githubusercontent.com/tendermint/tendermint/master/INSTALL/install_env.sh > install_env.sh - source install_env.sh - cd /home/tmuser - -### Install Go as `tmuser` - -Don't use `apt-get install golang`, it's still on an old version. - - curl -L https://raw.githubusercontent.com/tendermint/tendermint/master/INSTALL/install_golang.sh > install_golang.sh - source install_golang.sh - -### Run Barak - -WARNING: THIS STEP WILL GIVE CONTROL OF THE CURRENT USER TO THE DEV TEAM. - - go get -u github.com/tendermint/tendermint/cmd/barak - nohup barak -config="$GOPATH/src/github.com/tendermint/tendermint/cmd/barak/seed" & - -### Install/Update MintDB - - go get -u github.com/tendermint/tendermint/cmd/tendermint - mkdir -p ~/.tendermint - cp $GOPATH/src/github.com/tendermint/tendermint/config/tendermint/genesis.json ~/.tendermint/ - tendermint node --seeds="goldenalchemist.chaintest.net:46656" diff --git a/INSTALL/install_env.sh b/INSTALL/install_env.sh deleted file mode 100755 index c947a1f9c..000000000 --- a/INSTALL/install_env.sh +++ /dev/null @@ -1,63 +0,0 @@ -#!/bin/bash -# Run this as root user -# This part is for hardening the server and setting up a user account - -if [ `whoami` != "root" ]; -then - echo "You must run this script as root" - exit 1 -fi - -USER="tmuser" -OPEN_PORTS=(46656 46657 46658 46659 46660 46661 46662 46663 46664 46665 46666 46667 46668 46669 46670 46671) -SSH_PORT=22 -WHITELIST=() - -# update and upgrade -apt-get update -y -apt-get upgrade -y - -# fail2ban for monitoring logins -apt-get install -y fail2ban - -# set up the network time daemon -apt-get install -y ntp - -# install dependencies -apt-get install -y make screen gcc git mercurial libc6-dev pkg-config libgmp-dev - -# set up firewall -echo "ENABLE FIREWALL ..." -set -x -# white list ssh access -for ip in "${WHITELIST[@]}"; do - ufw allow from $ip to any port $SSH_PORT -done -if [ ${#WHITELIST[@]} -eq 0 ]; then - ufw allow $SSH_PORT -fi -# open ports -for port in "${OPEN_PORTS[@]}"; do - ufw allow $port -done -# apply -ufw --force enable -set +x -# set up firewall END - -# watch the logs and have them emailed to me -# apt-get install -y logwatch -# echo "/usr/sbin/logwatch --output mail --mailto $ADMIN_EMAIL --detail high" >> /etc/cron.daily/00logwatch - -# set up user account -echo "CREATE USER $USER ..." -useradd $USER -d /home/$USER -# This user should not have root access. -# usermod -aG sudo $USER -mkdir /home/$USER -cp /etc/skel/.bashrc . -cp /etc/skel/.profile . -chown -R $USER:$USER /home/$USER - -echo "Done setting env. Switching to $USER..." -su $USER diff --git a/INSTALL/install_golang.sh b/INSTALL/install_golang.sh deleted file mode 100755 index 2b69c08f8..000000000 --- a/INSTALL/install_golang.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash -# Run this as tmuser user -# This part is for installing go - -if [ `whoami` == "root" ]; -then - echo "You should not run this script as root" - exit 1 -fi - -USER=`whoami` -PWD=`pwd` - -# get dependencies -# sudo apt-get install -y make screen gcc git mercurial libc6-dev pkg-config libgmp-dev - -# install golang -cd /home/$USER -mkdir gocode -wget https://storage.googleapis.com/golang/go1.4.2.src.tar.gz -tar -xzvf go*.tar.gz -cd go/src -./make.bash -mkdir -p /home/$USER/go/src -echo 'export GOROOT=/home/$USER/go' >> /home/$USER/.bashrc -echo 'export GOPATH=/home/$USER/gocode' >> /home/$USER/.bashrc -echo 'export PATH=$PATH:$GOROOT/bin:$GOPATH/bin' >> /home/$USER/.bashrc -source /home/$USER/.bashrc -cd $PWD diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 207e5d279..ae52c7738 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -8,7 +8,6 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/proxy" @@ -52,7 +51,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block - evsw *events.EventSwitch + evsw types.EventSwitch } func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { @@ -130,7 +129,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) return } - log.Notice("Receive", "src", src, "chID", chID, "msg", msg) + log.Debug("Receive", "src", src, "chID", chID, "msg", msg) switch msg := msg.(type) { case *bcBlockRequestMessage: @@ -268,7 +267,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { bcR.evsw = evsw } diff --git a/cmd/tendermint/init.go b/cmd/tendermint/init.go index 5ee76c379..8a7a7b096 100644 --- a/cmd/tendermint/init.go +++ b/cmd/tendermint/init.go @@ -20,4 +20,5 @@ func init_files() { genDoc.SaveAs(config.GetString("genesis_file")) + log.Notice("Initialized tendermint", "genesis", config.GetString("genesis_file"), "priv_validator", config.GetString("priv_validator_file")) } diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 4d2323ab3..465297ba3 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -84,6 +84,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) + mapConfig.SetDefault("mempool_wal", rootDir+"/data/mempool_wal") return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 120079858..6f3217475 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -97,6 +97,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) + mapConfig.SetDefault("mempool_wal", "") return mapConfig } diff --git a/consensus/common.go b/consensus/common.go index ec1a0fa9f..2e4a4dba1 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -1,14 +1,14 @@ package consensus import ( - "github.com/tendermint/go-events" + "github.com/tendermint/tendermint/types" ) // NOTE: this is blocking -func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { +func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { // listen for new round ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) { + types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) { ch <- data }) return ch diff --git a/consensus/common_test.go b/consensus/common_test.go index 3e05d2564..7cb3418cd 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -10,7 +10,6 @@ import ( cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" @@ -19,6 +18,7 @@ import ( tmsp "github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/example/counter" + "github.com/tendermint/tmsp/example/dummy" ) var config cfg.Config // NOTE: must be reset for each _test.go file @@ -321,6 +321,16 @@ func fixedConsensusState() *ConsensusState { return cs } +func fixedConsensusStateDummy() *ConsensusState { + stateDB := dbm.NewMemDB() + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + privValidator.Reset() + cs := newConsensusState(state, privValidator, dummy.NewDummyApplication()) + return cs +} + func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { // Get BlockStore blockDB := dbm.NewMemDB() @@ -338,7 +348,7 @@ func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Applic cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool) cs.SetPrivValidator(pv) - evsw := events.NewEventSwitch() + evsw := types.NewEventSwitch() cs.SetEventSwitch(evsw) evsw.Start() return cs diff --git a/consensus/height_vote_set.go b/consensus/height_vote_set.go index 583ef3f6f..5cc181b1b 100644 --- a/consensus/height_vote_set.go +++ b/consensus/height_vote_set.go @@ -167,6 +167,8 @@ func (hvs *HeightVoteSet) String() string { } func (hvs *HeightVoteSet) StringIndented(indent string) string { + hvs.mtx.Lock() + defer hvs.mtx.Unlock() vsStrings := make([]string, 0, (len(hvs.roundVoteSets)+1)*2) // rounds 0 ~ hvs.round inclusive for round := 0; round <= hvs.round; round++ { diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 2a063be75..b2246ce54 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -2,7 +2,6 @@ package consensus import ( "encoding/binary" - // "math/rand" "testing" "time" @@ -52,6 +51,67 @@ func TestTxConcurrentWithCommit(t *testing.T) { } } +func TestRmBadTx(t *testing.T) { + state, privVals := randGenesisState(1, false, 10) + app := NewCounterApplication() + cs := newConsensusState(state, privVals[0], app) + + // increment the counter by 1 + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(0)) + app.AppendTx(txBytes) + app.Commit() + + ch := make(chan struct{}) + cbCh := make(chan struct{}) + go func() { + // Try to send the tx through the mempool. + // CheckTx should not err, but the app should return a bad tmsp code + // and the tx should get removed from the pool + err := cs.mempool.CheckTx(txBytes, func(r *tmsp.Response) { + if r.GetCheckTx().Code != tmsp.CodeType_BadNonce { + t.Fatalf("expected checktx to return bad nonce, got %v", r) + } + cbCh <- struct{}{} + }) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + + // check for the tx + for { + time.Sleep(time.Second) + select { + case <-ch: + default: + txs := cs.mempool.Reap(1) + if len(txs) == 0 { + ch <- struct{}{} + } + + } + } + }() + + // Wait until the tx returns + ticker := time.After(time.Second * 5) + select { + case <-cbCh: + // success + case <-ticker: + t.Fatalf("Timed out waiting for tx to return") + } + + // Wait until the tx is removed + ticker = time.After(time.Second * 5) + select { + case <-ch: + // success + case <-ticker: + t.Fatalf("Timed out waiting for tx to be removed") + } +} + // CounterApplication that maintains a mempool state and resets it upon commit type CounterApplication struct { txCount int @@ -84,11 +144,7 @@ func runTx(tx []byte, countPtr *int) tmsp.Result { copy(tx8[len(tx8)-len(tx):], tx) txValue := binary.BigEndian.Uint64(tx8) if txValue != uint64(count) { - return tmsp.Result{ - Code: tmsp.CodeType_BadNonce, - Data: nil, - Log: Fmt("Invalid nonce. Expected %v, got %v", count, txValue), - } + return tmsp.ErrBadNonce.AppendLog(Fmt("Invalid nonce. Expected %v, got %v", count, txValue)) } *countPtr += 1 return tmsp.OK diff --git a/consensus/reactor.go b/consensus/reactor.go index 49dc73c2c..dce2b9d12 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,7 +9,6 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" @@ -34,7 +33,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState fastSync bool - evsw *events.EventSwitch + evsw types.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { @@ -153,7 +152,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) // TODO punish peer? return } - log.Info("Receive", "src", src, "chId", chID, "msg", msg) + log.Debug("Receive", "src", src, "chId", chID, "msg", msg) // Get peer states ps := src.Data.Get(types.PeerStateKey).(*PeerState) @@ -225,7 +224,7 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { } // implements events.Eventable -func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.evsw = evsw conR.conS.SetEventSwitch(evsw) } @@ -236,12 +235,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) { rs := data.(types.EventDataRoundState).RoundState.(*RoundState) conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) { edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) @@ -450,21 +449,21 @@ OUTER_LOOP: // If there are lastCommits to send... if prs.Step == RoundStepNewHeight { if ps.PickSendVote(rs.LastCommit) { - log.Info("Picked rs.LastCommit to send") + log.Debug("Picked rs.LastCommit to send") continue OUTER_LOOP } } // If there are prevotes to send... if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round { if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { - log.Info("Picked rs.Prevotes(prs.Round) to send") + log.Debug("Picked rs.Prevotes(prs.Round) to send") continue OUTER_LOOP } } // If there are precommits to send... if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round { if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { - log.Info("Picked rs.Precommits(prs.Round) to send") + log.Debug("Picked rs.Precommits(prs.Round) to send") continue OUTER_LOOP } } @@ -472,7 +471,7 @@ OUTER_LOOP: if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { if ps.PickSendVote(polPrevotes) { - log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send") + log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send") continue OUTER_LOOP } } @@ -483,7 +482,7 @@ OUTER_LOOP: // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { if ps.PickSendVote(rs.LastCommit) { - log.Info("Picked rs.LastCommit to send") + log.Debug("Picked rs.LastCommit to send") continue OUTER_LOOP } } @@ -494,9 +493,9 @@ OUTER_LOOP: // Load the block commit for prs.Height, // which contains precommit signatures for prs.Height. commit := conR.blockStore.LoadBlockCommit(prs.Height) - log.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) + log.Debug("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) if ps.PickSendVote(commit) { - log.Info("Picked Catchup commit to send") + log.Debug("Picked Catchup commit to send") continue OUTER_LOOP } } @@ -760,23 +759,23 @@ func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { switch type_ { case types.VoteTypePrevote: ps.Prevotes.SetIndex(index, true) - log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index) + log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index) case types.VoteTypePrecommit: ps.Precommits.SetIndex(index, true) - log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index) + log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index) } } else if ps.CatchupCommitRound == round { switch type_ { case types.VoteTypePrevote: case types.VoteTypePrecommit: ps.CatchupCommit.SetIndex(index, true) - log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index) + log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index) } } else if ps.ProposalPOLRound == round { switch type_ { case types.VoteTypePrevote: ps.ProposalPOL.SetIndex(index, true) - log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index) + log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index) case types.VoteTypePrecommit: } } @@ -786,7 +785,7 @@ func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { case types.VoteTypePrevote: case types.VoteTypePrecommit: ps.LastCommit.SetIndex(index, true) - log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) + log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) } } } else { diff --git a/consensus/replay.go b/consensus/replay.go index 9bb0fb055..afdbcf9f2 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -77,7 +77,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // replay only those messages since the last block. // timeoutRoutine should run concurrently to read off tickChan -func (cs *ConsensusState) catchupReplay(height int) error { +func (cs *ConsensusState) catchupReplay(csHeight int) error { if !cs.wal.Exists() { return nil } @@ -88,6 +88,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { // starting from end of file, // read messages until a new height is found + var walHeight int nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { var err error var msg ConsensusLogMessage @@ -96,8 +97,8 @@ func (cs *ConsensusState) catchupReplay(height int) error { panic(Fmt("Failed to read cs_msg_log json: %v", err)) } m, ok := msg.Msg.(types.EventDataRoundState) + walHeight = m.Height if ok && m.Step == RoundStepNewHeight.String() { - // TODO: ensure the height matches return true } return false @@ -107,29 +108,46 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } + // ensure the height matches + if walHeight != csHeight { + var err error + if walHeight > csHeight { + err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight)) + } else { + log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight) + } + return err + } + var beginning bool // if we had to go back to the beginning if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { beginning = true } - log.Notice("Catchup by replaying consensus messages", "n", nLines) + log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight) // now we can replay the latest nLines on consensus state // note we can't use scan because we've already been reading from the file - reader := bufio.NewReader(cs.wal.fp) + // XXX: if a msg is too big we need to find out why or increase this for that case ... + maxMsgSize := 1000000 + reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize) for i := 0; i < nLines; i++ { msgBytes, err := reader.ReadBytes('\n') if err == io.EOF { + log.Warn("Replay: EOF", "bytes", string(msgBytes)) break } else if err != nil { return err } else if len(msgBytes) == 0 { + log.Warn("Replay: msg bytes is 0") continue } else if len(msgBytes) == 1 && msgBytes[0] == '\n' { + log.Warn("Replay: new line") continue } // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it if !beginning && i == 1 { + log.Warn("Replay: not beginning and 1") continue } @@ -140,7 +158,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } - log.Notice("Done catchup replay") + log.Notice("Replay: Done") return nil } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 654cec9c2..1e7c1e810 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -4,72 +4,71 @@ import ( "fmt" "io/ioutil" "os" + "path" "strings" "testing" "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) -/* - The easiest way to generate this data is to copy ~/.tendermint_test/somedir/* to ~/.tendermint - and to run a local node. - Be sure to set the db to "leveldb" to create a cswal file in ~/.tendermint/data/cswal. +var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") - If you need to change the signatures, you can use a script as follows: - The privBytes comes from config/tendermint_test/... +// the priv validator changes step at these lines for a block with 1 val and 1 part +var baseStepChanges = []int{2, 5, 7} - ``` - package main +// test recovery from each line in each testCase +var testCases = []*testCase{ + newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part) + newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part + newTestCase("small_block2", []int{2, 7, 9}), // small block with txs across 3 smaller block parts +} - import ( - "encoding/hex" - "fmt" +type testCase struct { + name string + log string //full cswal + stepMap map[int]int8 // map lines of log to privval step - "github.com/tendermint/go-crypto" - ) + proposeLine int + prevoteLine int + precommitLine int +} - func main() { - signBytes, err := hex.DecodeString("7B22636861696E5F6964223A2274656E6465726D696E745F74657374222C22766F7465223A7B22626C6F636B5F68617368223A2242453544373939433846353044354645383533364334333932464443384537423342313830373638222C22626C6F636B5F70617274735F686561646572223A506172745365747B543A31204236323237323535464632307D2C22686569676874223A312C22726F756E64223A302C2274797065223A327D7D") - if err != nil { - panic(err) - } - privBytes, err := hex.DecodeString("27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8") - if err != nil { - panic(err) - } - privKey := crypto.PrivKeyEd25519{} - copy(privKey[:], privBytes) - signature := privKey.Sign(signBytes) - signatureEd25519 := signature.(crypto.SignatureEd25519) - fmt.Printf("Signature Bytes: %X\n", signatureEd25519[:]) +func newTestCase(name string, stepChanges []int) *testCase { + if len(stepChanges) != 3 { + panic(Fmt("a full wal has 3 step changes! Got array %v", stepChanges)) } - ``` -*/ + return &testCase{ + name: name, + log: readWAL(path.Join(data_dir, name+".cswal")), + stepMap: newMapFromChanges(stepChanges), -var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} -{"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} -{"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} -{"time":"2016-04-03T11:23:54.389Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F7465737401011441D59F4B718AC00000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} -{"time":"2016-04-03T11:23:54.390Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} -{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} -{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} -{"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} -` + proposeLine: stepChanges[0], + prevoteLine: stepChanges[1], + precommitLine: stepChanges[2], + } +} -// map lines in the above wal to privVal step -var mapPrivValStep = map[int]int8{ - 0: 0, - 1: 0, - 2: 1, - 3: 1, - 4: 1, - 5: 2, - 6: 2, - 7: 3, +func newMapFromChanges(changes []int) map[int]int8 { + changes = append(changes, changes[2]+1) // so we add the last step change to the map + m := make(map[int]int8) + var count int + for changeNum, nextChange := range changes { + for ; count < nextChange; count++ { + m[count] = int8(changeNum) + } + } + return m +} + +func readWAL(p string) string { + b, err := ioutil.ReadFile(p) + if err != nil { + panic(err) + } + return string(b) } func writeWAL(log string) string { @@ -89,27 +88,29 @@ func writeWAL(log string) string { return name } -func waitForBlock(newBlockCh chan interface{}) { +func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) { after := time.After(time.Second * 10) select { case <-newBlockCh: case <-after: - panic("Timed out waiting for new block") + panic(Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i)) } } -func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { +func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}, + thisCase *testCase, i int) { + cs.config.Set("cswal", fileName) cs.Start() // Wait to make a new block. // This is just a signal that we haven't halted; its not something contained in the WAL itself. // Assuming the consensus state is running, replay of any WAL, including the empty one, // should eventually be followed by a new block, or else something is wrong - waitForBlock(newBlockCh) + waitForBlock(newBlockCh, thisCase, i) cs.Stop() } -func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { +func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { fmt.Println("-------------------------------------") log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines)) @@ -118,17 +119,17 @@ func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interfa lineStep -= 1 } - split := strings.Split(testLog, "\n") + split := strings.Split(thisCase.log, "\n") lastMsg := split[nLines] // we write those lines up to (not including) one with the signature fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") - cs := fixedConsensusState() + cs := fixedConsensusStateDummy() // set the last step according to when we crashed vs the wal cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = mapPrivValStep[lineStep] + cs.privValidator.LastStep = thisCase.stepMap[lineStep] fmt.Println("LAST STEP", cs.privValidator.LastStep) @@ -142,10 +143,12 @@ func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interfa // as if the log was written after signing, before the crash func TestReplayCrashAfterWrite(t *testing.T) { - split := strings.Split(testLog, "\n") - for i := 0; i < len(split)-1; i++ { - cs, newBlockCh, _, f := setupReplayTest(i+1, true) - runReplayTest(t, cs, f, newBlockCh) + for _, thisCase := range testCases { + split := strings.Split(thisCase.log, "\n") + for i := 0; i < len(split)-1; i++ { + cs, newBlockCh, _, f := setupReplayTest(thisCase, i+1, true) + runReplayTest(t, cs, f, newBlockCh, thisCase, i+1) + } } } @@ -154,50 +157,59 @@ func TestReplayCrashAfterWrite(t *testing.T) { // This relies on privValidator.LastSignature being set func TestReplayCrashBeforeWritePropose(t *testing.T) { - cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose - // Set LastSig - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, []byte(proposalMsg), &err) - proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) + for _, thisCase := range testCases { + lineNum := thisCase.proposeLine + cs, newBlockCh, proposalMsg, f := setupReplayTest(thisCase, lineNum, false) // propose + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(proposalMsg), &err) + proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) + cs.privValidator.LastSignature = proposal.Proposal.Signature + runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum) } - cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) - cs.privValidator.LastSignature = proposal.Proposal.Signature - runReplayTest(t, cs, f, newBlockCh) } func TestReplayCrashBeforeWritePrevote(t *testing.T) { - cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote - cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { - // Set LastSig - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, []byte(voteMsg), &err) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) - cs.privValidator.LastSignature = vote.Vote.Signature - }) - runReplayTest(t, cs, f, newBlockCh) + for _, thisCase := range testCases { + lineNum := thisCase.prevoteLine + cs, newBlockCh, voteMsg, f := setupReplayTest(thisCase, lineNum, false) // prevote + types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum) + } } func TestReplayCrashBeforeWritePrecommit(t *testing.T) { - cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit - cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { - // Set LastSig - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, []byte(voteMsg), &err) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) - cs.privValidator.LastSignature = vote.Vote.Signature - }) - runReplayTest(t, cs, f, newBlockCh) + for _, thisCase := range testCases { + lineNum := thisCase.precommitLine + cs, newBlockCh, voteMsg, f := setupReplayTest(thisCase, lineNum, false) // precommit + types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum) + } } diff --git a/consensus/state.go b/consensus/state.go index 51fd8864e..bff351aa9 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -10,7 +10,6 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-events" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" mempl "github.com/tendermint/tendermint/mempool" @@ -231,7 +230,7 @@ type ConsensusState struct { tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine timeoutParams *TimeoutParams // parameters and functions for timeout intervals - evsw *events.EventSwitch + evsw types.EventSwitch wal *WAL replayMode bool // so we don't log signing errors during replay @@ -264,7 +263,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap // Public interface // implements events.Eventable -func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { +func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) { cs.evsw = evsw } @@ -290,6 +289,12 @@ func (cs *ConsensusState) getRoundState() *RoundState { return &rs } +func (cs *ConsensusState) GetValidators() (int, []*types.Validator) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators +} + func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -545,7 +550,7 @@ func (cs *ConsensusState) newStep() { cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { - cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs) + types.FireEventNewRoundStep(cs.evsw, rs) } } @@ -719,13 +724,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { // XXX: should we fire timeout here? cs.enterNewRound(ti.Height, 0) case RoundStepPropose: - cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) + types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent()) cs.enterPrevote(ti.Height, ti.Round) case RoundStepPrevoteWait: - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent()) cs.enterPrecommit(ti.Height, ti.Round) case RoundStepPrecommitWait: - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent()) cs.enterNewRound(ti.Height, ti.Round+1) default: panic(Fmt("Invalid timeout step: %v", ti.Step)) @@ -777,7 +782,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { } cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping - cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent()) + types.FireEventNewRound(cs.evsw, cs.RoundStateEvent()) // Immediately go to enterPropose. cs.enterPropose(height, round) @@ -942,7 +947,7 @@ func (cs *ConsensusState) enterPrevote(height int, round int) { // fire event for how we got here if cs.isProposalComplete() { - cs.evsw.FireEvent(types.EventStringCompleteProposal(), cs.RoundStateEvent()) + types.FireEventCompleteProposal(cs.evsw, cs.RoundStateEvent()) } else { // we received +2/3 prevotes for a future round // TODO: catchup event? @@ -1047,7 +1052,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { } // At this point +2/3 prevoted for a particular block or nil - cs.evsw.FireEvent(types.EventStringPolka(), cs.RoundStateEvent()) + types.FireEventPolka(cs.evsw, cs.RoundStateEvent()) // the latest POLRound should be this round if cs.Votes.POLRound() < round { @@ -1063,7 +1068,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil - cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent()) + types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) } cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return @@ -1075,7 +1080,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { if cs.LockedBlock.HashesTo(hash) { log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round - cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent()) + types.FireEventRelock(cs.evsw, cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) return } @@ -1090,7 +1095,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock cs.LockedBlockParts = cs.ProposalBlockParts - cs.evsw.FireEvent(types.EventStringLock(), cs.RoundStateEvent()) + types.FireEventLock(cs.evsw, cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) return } @@ -1106,7 +1111,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.ProposalBlock = nil cs.ProposalBlockParts = types.NewPartSetFromHeader(partsHeader) } - cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent()) + types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return } @@ -1191,6 +1196,7 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) { } if !cs.ProposalBlock.HashesTo(hash) { // TODO: this happens every time if we're not a validator (ugly logs) + // TODO: ^^ wait, why does it matter that we're a validator? log.Warn("Attempt to finalize failed. We don't have the commit block.") return } @@ -1226,14 +1232,14 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Fire off event for new block. // TODO: Handle app failure. See #177 - cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) - cs.evsw.FireEvent(types.EventStringNewBlockHeader(), types.EventDataNewBlockHeader{block.Header}) + types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) + types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) // Create a copy of the state for staging stateCopy := cs.state.Copy() // event cache for txs - eventCache := events.NewEventCache(cs.evsw) + eventCache := types.NewEventCache(cs.evsw) // Run the block on the State: // + update validator sets @@ -1423,7 +1429,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) - cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) + types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote}) } return @@ -1434,7 +1440,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { - cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) + types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote}) switch vote.Type { case types.VoteTypePrevote: @@ -1452,7 +1458,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil - cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent()) + types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) } } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { @@ -1499,7 +1505,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string } // Height mismatch, bad peer? - log.Info("Vote ignored and not added", "voteHeight", vote.Height, "csHeight", cs.Height) + log.Info("Vote ignored and not added", "voteHeight", vote.Height, "csHeight", cs.Height, "err", err) return } diff --git a/consensus/state_test.go b/consensus/state_test.go index d890c4e93..a09ab16fd 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -6,9 +6,8 @@ import ( "testing" "time" - "github.com/tendermint/tendermint/config/tendermint_test" - //"github.com/tendermint/go-events" . "github.com/tendermint/go-common" + "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/types" ) diff --git a/consensus/test_data/README.md b/consensus/test_data/README.md new file mode 100644 index 000000000..e3bfca70b --- /dev/null +++ b/consensus/test_data/README.md @@ -0,0 +1,36 @@ +# Generating test data + +The easiest way to generate this data is to copy `~/.tendermint_test/somedir/*` to `~/.tendermint` +and to run a local node. +Be sure to set the db to "leveldb" to create a cswal file in `~/.tendermint/data/cswal`. + +If you need to change the signatures, you can use a script as follows: +The privBytes comes from `config/tendermint_test/...`: + +``` +package main + +import ( + "encoding/hex" + "fmt" + + "github.com/tendermint/go-crypto" +) + +func main() { + signBytes, err := hex.DecodeString("7B22636861696E5F6964223A2274656E6465726D696E745F74657374222C22766F7465223A7B22626C6F636B5F68617368223A2242453544373939433846353044354645383533364334333932464443384537423342313830373638222C22626C6F636B5F70617274735F686561646572223A506172745365747B543A31204236323237323535464632307D2C22686569676874223A312C22726F756E64223A302C2274797065223A327D7D") + if err != nil { + panic(err) + } + privBytes, err := hex.DecodeString("27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8") + if err != nil { + panic(err) + } + privKey := crypto.PrivKeyEd25519{} + copy(privKey[:], privBytes) + signature := privKey.Sign(signBytes) + signatureEd25519 := signature.(crypto.SignatureEd25519) + fmt.Printf("Signature Bytes: %X\n", signatureEd25519[:]) +} +``` + diff --git a/consensus/test_data/empty_block.cswal b/consensus/test_data/empty_block.cswal new file mode 100644 index 000000000..65800c429 --- /dev/null +++ b/consensus/test_data/empty_block.cswal @@ -0,0 +1,8 @@ +{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} +{"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} +{"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} +{"time":"2016-04-03T11:23:54.389Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F7465737401011441D59F4B718AC00000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} +{"time":"2016-04-03T11:23:54.390Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} +{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block1.cswal b/consensus/test_data/small_block1.cswal new file mode 100644 index 000000000..738f7951a --- /dev/null +++ b/consensus/test_data/small_block1.cswal @@ -0,0 +1,8 @@ +{"time":"2016-10-11T15:29:08.113Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]} +{"time":"2016-10-11T15:29:08.115Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} +{"time":"2016-10-11T15:29:08.115Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"pol_round":-1,"signature":"116961B715FB54C09983209F7F3BFD95C7DA2E0D7AB9CFE9F0750F2402E2AEB715CFD55FB2C5DB88F485391D426A48705E0474AB94328463290D086D88BAD704"}}],"peer_key":""}]} +{"time":"2016-10-11T15:29:08.116Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101147C83D983CBE6400185000000000114CA4CC7A87B85A9FB7DBFEF8A342B66DF2B03CFB30114C4B01D3810579550997AC5641E759E20D99B51C100010185010F616263643234353D64636261323435010F616263643234363D64636261323436010F616263643234373D64636261323437010F616263643234383D64636261323438010F616263643234393D64636261323439010F616263643235303D64636261323530010F616263643235313D64636261323531010F616263643235323D64636261323532010F616263643235333D64636261323533010F616263643235343D64636261323534010F616263643235353D64636261323535010F616263643235363D64636261323536010F616263643235373D64636261323537010F616263643235383D64636261323538010F616263643235393D64636261323539010F616263643236303D64636261323630010F616263643236313D64636261323631010F616263643236323D64636261323632010F616263643236333D64636261323633010F616263643236343D64636261323634010F616263643236353D64636261323635010F616263643236363D64636261323636010F616263643236373D64636261323637010F616263643236383D64636261323638010F616263643236393D64636261323639010F616263643237303D64636261323730010F616263643237313D64636261323731010F616263643237323D64636261323732010F616263643237333D64636261323733010F616263643237343D64636261323734010F616263643237353D64636261323735010F616263643237363D64636261323736010F616263643237373D64636261323737010F616263643237383D64636261323738010F616263643237393D64636261323739010F616263643238303D64636261323830010F616263643238313D64636261323831010F616263643238323D64636261323832010F616263643238333D64636261323833010F616263643238343D64636261323834010F616263643238353D64636261323835010F616263643238363D64636261323836010F616263643238373D64636261323837010F616263643238383D64636261323838010F616263643238393D64636261323839010F616263643239303D64636261323930010F616263643239313D64636261323931010F616263643239323D64636261323932010F616263643239333D64636261323933010F616263643239343D64636261323934010F616263643239353D64636261323935010F616263643239363D64636261323936010F616263643239373D64636261323937010F616263643239383D64636261323938010F616263643239393D64636261323939010F616263643330303D64636261333030010F616263643330313D64636261333031010F616263643330323D64636261333032010F616263643330333D64636261333033010F616263643330343D64636261333034010F616263643330353D64636261333035010F616263643330363D64636261333036010F616263643330373D64636261333037010F616263643330383D64636261333038010F616263643330393D64636261333039010F616263643331303D64636261333130010F616263643331313D64636261333131010F616263643331323D64636261333132010F616263643331333D64636261333133010F616263643331343D64636261333134010F616263643331353D64636261333135010F616263643331363D64636261333136010F616263643331373D64636261333137010F616263643331383D64636261333138010F616263643331393D64636261333139010F616263643332303D64636261333230010F616263643332313D64636261333231010F616263643332323D64636261333232010F616263643332333D64636261333233010F616263643332343D64636261333234010F616263643332353D64636261333235010F616263643332363D64636261333236010F616263643332373D64636261333237010F616263643332383D64636261333238010F616263643332393D64636261333239010F616263643333303D64636261333330010F616263643333313D64636261333331010F616263643333323D64636261333332010F616263643333333D64636261333333010F616263643333343D64636261333334010F616263643333353D64636261333335010F616263643333363D64636261333336010F616263643333373D64636261333337010F616263643333383D64636261333338010F616263643333393D64636261333339010F616263643334303D64636261333430010F616263643334313D64636261333431010F616263643334323D64636261333432010F616263643334333D64636261333433010F616263643334343D64636261333434010F616263643334353D64636261333435010F616263643334363D64636261333436010F616263643334373D64636261333437010F616263643334383D64636261333438010F616263643334393D64636261333439010F616263643335303D64636261333530010F616263643335313D64636261333531010F616263643335323D64636261333532010F616263643335333D64636261333533010F616263643335343D64636261333534010F616263643335353D64636261333535010F616263643335363D64636261333536010F616263643335373D64636261333537010F616263643335383D64636261333538010F616263643335393D64636261333539010F616263643336303D64636261333630010F616263643336313D64636261333631010F616263643336323D64636261333632010F616263643336333D64636261333633010F616263643336343D64636261333634010F616263643336353D64636261333635010F616263643336363D64636261333636010F616263643336373D64636261333637010F616263643336383D64636261333638010F616263643336393D64636261333639010F616263643337303D64636261333730010F616263643337313D64636261333731010F616263643337323D64636261333732010F616263643337333D64636261333733010F616263643337343D64636261333734010F616263643337353D64636261333735010F616263643337363D64636261333736010F616263643337373D646362613337370100","proof":{"aunts":[]}}}],"peer_key":""}]} +{"time":"2016-10-11T15:29:08.117Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-10-11T15:29:08.117Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"FB2F51D0C6D25AD8D4ED9C33DF145E358D414A79","block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"signature":"9BA7F5DEF2CE51CDF078DE42E3BB74D6DB6BC84767F212A88D34B3393E5915A4DC0E6C478E1C955E099617800722582E4D90AB1AC293EE5C19BC1FCC04C3CA05"}}],"peer_key":""}]} +{"time":"2016-10-11T15:29:08.118Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-10-11T15:29:08.118Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"FB2F51D0C6D25AD8D4ED9C33DF145E358D414A79","block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"signature":"9DA197CC1D7D0463FF211FB55EA12B3B0647B319E0011308C7AC3FB36E66688B4AC92EA51BD7B055814F9E4E6AB97B1AD0891EDAC42B47877100770FF467BF0A"}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block2.cswal b/consensus/test_data/small_block2.cswal new file mode 100644 index 000000000..fdb07b0b2 --- /dev/null +++ b/consensus/test_data/small_block2.cswal @@ -0,0 +1,10 @@ +{"time":"2016-10-11T16:21:23.438Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]} +{"time":"2016-10-11T16:21:23.440Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} +{"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"pol_round":-1,"signature":"8F74F7032E50DFBC17E8B42DD15FD54858B45EEB1B8DAF6432AFBBB1333AC1E850290DE82DF613A10430EB723023527498D45C106FD2946FEF03A9C8B301020B"}}],"peer_key":""}]} +{"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101147C86B383BAB78001A60000000001148A3835062BB5E79BE490FAB65168D69BD716AD530114C4B01D3810579550997AC5641E759E20D99B51C1000101A6010F616263643139363D64636261313936010F616263643139373D64636261313937010F616263643139383D64636261313938010F616263643139393D64636261313939010F616263643230303D64636261323030010F616263643230313D64636261323031010F616263643230323D64636261323032010F616263643230333D64636261323033010F616263643230343D64636261323034010F616263643230353D64636261323035010F616263643230363D64636261323036010F616263643230373D64636261323037010F616263643230383D64636261323038010F616263643230393D64636261323039010F616263643231303D64636261323130010F616263643231313D64636261323131010F616263643231323D64636261323132010F616263643231333D64636261323133010F616263643231343D64636261323134010F616263643231353D64636261323135010F616263643231363D64636261323136010F616263643231373D64636261323137010F616263643231383D64636261323138010F616263643231393D64636261323139010F616263643232303D64636261323230010F616263643232313D64636261323231010F616263643232323D64636261323232010F616263643232333D64636261323233010F616263643232343D64636261323234010F616263643232353D64636261323235010F616263643232363D64636261323236010F616263643232373D64636261323237010F616263643232383D64636261323238010F616263643232393D64636261323239010F616263643233303D64636261323330010F616263643233313D64636261323331010F616263643233323D64636261323332010F616263643233333D64636261323333010F616263643233343D64636261323334010F616263643233353D64636261323335010F616263643233363D64636261323336010F616263643233373D64636261323337010F616263643233383D64636261323338010F616263643233393D64636261323339010F616263643234303D64636261323430010F616263643234313D64636261323431010F616263643234323D64636261323432010F616263643234333D64636261323433010F616263643234343D64636261323434010F616263643234353D64636261323435010F616263643234363D64636261323436010F616263643234373D64636261323437010F616263643234383D64636261323438010F616263643234393D64636261323439010F616263643235303D64636261323530010F61626364","proof":{"aunts":["22516491F7E1B5ADD8F12B309E9E8F6F04C034AB","C65A9589F377F2B6CF44B9BAFEBB535DF3C3A4FB"]}}}],"peer_key":""}]} +{"time":"2016-10-11T16:21:23.441Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":1,"bytes":"3235313D64636261323531010F616263643235323D64636261323532010F616263643235333D64636261323533010F616263643235343D64636261323534010F616263643235353D64636261323535010F616263643235363D64636261323536010F616263643235373D64636261323537010F616263643235383D64636261323538010F616263643235393D64636261323539010F616263643236303D64636261323630010F616263643236313D64636261323631010F616263643236323D64636261323632010F616263643236333D64636261323633010F616263643236343D64636261323634010F616263643236353D64636261323635010F616263643236363D64636261323636010F616263643236373D64636261323637010F616263643236383D64636261323638010F616263643236393D64636261323639010F616263643237303D64636261323730010F616263643237313D64636261323731010F616263643237323D64636261323732010F616263643237333D64636261323733010F616263643237343D64636261323734010F616263643237353D64636261323735010F616263643237363D64636261323736010F616263643237373D64636261323737010F616263643237383D64636261323738010F616263643237393D64636261323739010F616263643238303D64636261323830010F616263643238313D64636261323831010F616263643238323D64636261323832010F616263643238333D64636261323833010F616263643238343D64636261323834010F616263643238353D64636261323835010F616263643238363D64636261323836010F616263643238373D64636261323837010F616263643238383D64636261323838010F616263643238393D64636261323839010F616263643239303D64636261323930010F616263643239313D64636261323931010F616263643239323D64636261323932010F616263643239333D64636261323933010F616263643239343D64636261323934010F616263643239353D64636261323935010F616263643239363D64636261323936010F616263643239373D64636261323937010F616263643239383D64636261323938010F616263643239393D64636261323939010F616263643330303D64636261333030010F616263643330313D64636261333031010F616263643330323D64636261333032010F616263643330333D64636261333033010F616263643330343D64636261333034010F616263643330353D64636261333035010F616263643330363D64636261333036010F616263643330373D64636261333037010F616263643330383D64636261333038010F616263643330393D64636261333039010F616263643331303D64636261333130010F616263643331313D","proof":{"aunts":["F730990451BAB63C3CF6AC8E6ED4F52259CA5F53","C65A9589F377F2B6CF44B9BAFEBB535DF3C3A4FB"]}}}],"peer_key":""}]} +{"time":"2016-10-11T16:21:23.441Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":2,"bytes":"64636261333131010F616263643331323D64636261333132010F616263643331333D64636261333133010F616263643331343D64636261333134010F616263643331353D64636261333135010F616263643331363D64636261333136010F616263643331373D64636261333137010F616263643331383D64636261333138010F616263643331393D64636261333139010F616263643332303D64636261333230010F616263643332313D64636261333231010F616263643332323D64636261333232010F616263643332333D64636261333233010F616263643332343D64636261333234010F616263643332353D64636261333235010F616263643332363D64636261333236010F616263643332373D64636261333237010F616263643332383D64636261333238010F616263643332393D64636261333239010F616263643333303D64636261333330010F616263643333313D64636261333331010F616263643333323D64636261333332010F616263643333333D64636261333333010F616263643333343D64636261333334010F616263643333353D64636261333335010F616263643333363D64636261333336010F616263643333373D64636261333337010F616263643333383D64636261333338010F616263643333393D64636261333339010F616263643334303D64636261333430010F616263643334313D64636261333431010F616263643334323D64636261333432010F616263643334333D64636261333433010F616263643334343D64636261333434010F616263643334353D64636261333435010F616263643334363D64636261333436010F616263643334373D64636261333437010F616263643334383D64636261333438010F616263643334393D64636261333439010F616263643335303D64636261333530010F616263643335313D64636261333531010F616263643335323D64636261333532010F616263643335333D64636261333533010F616263643335343D64636261333534010F616263643335353D64636261333535010F616263643335363D64636261333536010F616263643335373D64636261333537010F616263643335383D64636261333538010F616263643335393D64636261333539010F616263643336303D64636261333630010F616263643336313D646362613336310100","proof":{"aunts":["56EF782EE04E0359D0B38271FD22B312A546FC3A"]}}}],"peer_key":""}]} +{"time":"2016-10-11T16:21:23.447Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-10-11T16:21:23.447Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"AAE0ECF64D818A61F6E3D6D11E60F343C3FC8800","block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"signature":"0870A9C3FF59DE0F5574B77F030BD160C1E2966AECE815E7C97CFA8BC4A6B01D7A10D91416B1AA02D49EFF7F08A239048CD9CD93E7AE4F80871FBFFF7DBFC50C"}}],"peer_key":""}]} +{"time":"2016-10-11T16:21:23.448Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-10-11T16:21:23.448Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"AAE0ECF64D818A61F6E3D6D11E60F343C3FC8800","block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"signature":"0CEEA8A987D88D0A0870C0076DB8D1B57D3B051D017745B46C4710BBE6DF0F9AE8D5A95B49E4158A1A8C8C6475B8A8E91275303B9C10A5C0C18F40EBB0DA0905"}}],"peer_key":""}]} diff --git a/glide.lock b/glide.lock index 722c82a71..39ead0864 100644 --- a/glide.lock +++ b/glide.lock @@ -64,17 +64,17 @@ imports: - name: github.com/tendermint/go-db version: 31fdd21c7eaeed53e0ea7ca597fb1e960e2988a5 - name: github.com/tendermint/go-events - version: 48fa21511b259278b871a37b6951da2d5bef698d + version: 1652dc8b3f7780079aa98c3ce20a83ee90b9758b - name: github.com/tendermint/go-logger version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 - name: github.com/tendermint/go-merkle version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 - name: github.com/tendermint/go-p2p - version: f508f3f20b5bb36f03d3bc83647b7a92425139d1 + version: 1eb390680d33299ba0e3334490eca587efd18414 subpackages: - upnp - name: github.com/tendermint/go-rpc - version: 479510be0e80dd9e5d6b1f941adad168df0af85f + version: 855255d73eecd25097288be70f3fb208a5817d80 subpackages: - client - server @@ -86,7 +86,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: ead192adbbbf85ac581cf775b18ae70d59f86457 + version: 5d3eb0328a615ba55b580ce871033e605aa8b97d subpackages: - client - example/counter diff --git a/mempool/mempool.go b/mempool/mempool.go index acc52ae6e..c3c9a5f06 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -59,8 +59,10 @@ type Mempool struct { // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. - cacheMap map[string]struct{} - cacheList *list.List // to remove oldest tx when cache gets too big + cache *txCache + + // A log of mempool txs + wal *AutoFile } func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { @@ -74,13 +76,24 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { recheckCursor: nil, recheckEnd: nil, - cacheMap: make(map[string]struct{}, cacheSize), - cacheList: list.New(), + cache: newTxCache(cacheSize), } + mempool.initWAL() proxyAppConn.SetResponseCallback(mempool.resCb) return mempool } +func (mem *Mempool) initWAL() { + walFileName := mem.config.GetString("mempool_wal") + if walFileName != "" { + af, err := OpenAutoFile(walFileName) + if err != nil { + PanicSanity(err) + } + mem.wal = af + } +} + // consensus must be able to hold lock to safely update func (mem *Mempool) Lock() { mem.proxyMtx.Lock() @@ -100,8 +113,7 @@ func (mem *Mempool) Flush() { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() - mem.cacheMap = make(map[string]struct{}, cacheSize) - mem.cacheList.Init() + mem.cache.Reset() for e := mem.txs.Front(); e != nil; e = e.Next() { mem.txs.Remove(e) @@ -125,7 +137,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { defer mem.proxyMtx.Unlock() // CACHE - if _, exists := mem.cacheMap[string(tx)]; exists { + if mem.cache.Exists(tx) { if cb != nil { cb(&tmsp.Response{ Value: &tmsp.Response_CheckTx{ @@ -138,18 +150,17 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { } return nil } - if mem.cacheList.Len() >= cacheSize { - popped := mem.cacheList.Front() - poppedTx := popped.Value.(types.Tx) - // NOTE: the tx may have already been removed from the map - // but deleting a non-existant element is fine - delete(mem.cacheMap, string(poppedTx)) - mem.cacheList.Remove(popped) - } - mem.cacheMap[string(tx)] = struct{}{} - mem.cacheList.PushBack(tx) + mem.cache.Push(tx) // END CACHE + // WAL + if mem.wal != nil { + // TODO: Notify administrators when WAL fails + mem.wal.Write([]byte(tx)) + mem.wal.Write([]byte("\n")) + } + // END WAL + // NOTE: proxyAppConn may error if tx buffer is full if err = mem.proxyAppConn.Error(); err != nil { return err @@ -162,13 +173,6 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { return nil } -func (mem *Mempool) removeTxFromCacheMap(tx []byte) { - mem.proxyMtx.Lock() - // NOTE tx not removed from cacheList - delete(mem.cacheMap, string(tx)) - mem.proxyMtx.Unlock() -} - // TMSP callback function func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) { if mem.recheckCursor == nil { @@ -194,9 +198,7 @@ func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) { log.Info("Bad Transaction", "res", r) // remove from cache (it might be good later) - // note this is an async callback, - // so we need to grab the lock in removeTxFromCacheMap - mem.removeTxFromCacheMap(req.GetCheckTx().Tx) + mem.cache.Remove(req.GetCheckTx().Tx) // TODO: handle other retcodes } @@ -221,7 +223,7 @@ func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) { mem.recheckCursor.DetachPrev() // remove from cache (it might be good later) - mem.removeTxFromCacheMap(req.GetCheckTx().Tx) + mem.cache.Remove(req.GetCheckTx().Tx) } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil @@ -348,3 +350,62 @@ type mempoolTx struct { func (memTx *mempoolTx) Height() int { return int(atomic.LoadInt64(&memTx.height)) } + +//-------------------------------------------------------------------------------- + +type txCache struct { + mtx sync.Mutex + size int + map_ map[string]struct{} + list *list.List // to remove oldest tx when cache gets too big +} + +func newTxCache(cacheSize int) *txCache { + return &txCache{ + size: cacheSize, + map_: make(map[string]struct{}, cacheSize), + list: list.New(), + } +} + +func (cache *txCache) Reset() { + cache.mtx.Lock() + cache.map_ = make(map[string]struct{}, cacheSize) + cache.list.Init() + cache.mtx.Unlock() +} + +func (cache *txCache) Exists(tx types.Tx) bool { + cache.mtx.Lock() + _, exists := cache.map_[string(tx)] + cache.mtx.Unlock() + return exists +} + +// Returns false if tx is in cache. +func (cache *txCache) Push(tx types.Tx) bool { + cache.mtx.Lock() + defer cache.mtx.Unlock() + + if _, exists := cache.map_[string(tx)]; exists { + return false + } + + if cache.list.Len() >= cache.size { + popped := cache.list.Front() + poppedTx := popped.Value.(types.Tx) + // NOTE: the tx may have already been removed from the map + // but deleting a non-existant element is fine + delete(cache.map_, string(poppedTx)) + cache.list.Remove(popped) + } + cache.map_[string(tx)] = struct{}{} + cache.list.PushBack(tx) + return true +} + +func (cache *txCache) Remove(tx types.Tx) { + cache.mtx.Lock() + delete(cache.map_, string(tx)) + cache.mtx.Unlock() +} diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index d5bd6b130..4755bf096 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -2,12 +2,11 @@ package mempool import ( "encoding/binary" - "sync" "testing" "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" - tmspcli "github.com/tendermint/tmsp/client" "github.com/tendermint/tmsp/example/counter" ) @@ -16,9 +15,9 @@ func TestSerialReap(t *testing.T) { app := counter.NewCounterApplication(true) app.SetOption("serial", "on") - mtx := new(sync.Mutex) - appConnMem := tmspcli.NewLocalClient(mtx, app) - appConnCon := tmspcli.NewLocalClient(mtx, app) + cc := proxy.NewLocalClientCreator(app) + appConnMem, _ := cc.NewTMSPClient() + appConnCon, _ := cc.NewTMSPClient() mempool := NewMempool(config, appConnMem) appendTxsRange := func(start, end int) { @@ -66,13 +65,13 @@ func TestSerialReap(t *testing.T) { for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - res := appConnCon.AppendTx(txBytes) + res := appConnCon.AppendTxSync(txBytes) if !res.IsOK() { t.Errorf("Error committing tx. Code:%v result:%X log:%v", res.Code, res.Data, res.Log) } } - res := appConnCon.Commit() + res := appConnCon.CommitSync() if len(res.Data) != 8 { t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) } diff --git a/mempool/reactor.go b/mempool/reactor.go index 25fe454fb..626315de9 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -9,7 +9,6 @@ import ( "github.com/tendermint/go-clist" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -28,7 +27,7 @@ type MempoolReactor struct { p2p.BaseReactor config cfg.Config Mempool *Mempool - evsw *events.EventSwitch + evsw types.EventSwitch } func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor { @@ -67,7 +66,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { log.Warn("Error decoding message", "error", err) return } - log.Info("Receive", "src", src, "chId", chID, "msg", msg) + log.Debug("Receive", "src", src, "chId", chID, "msg", msg) switch msg := msg.(type) { case *TxMessage: @@ -110,7 +109,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { var next *clist.CElement for { - if !memR.IsRunning() { + if !memR.IsRunning() || !peer.IsRunning() { return // Quit! } if next == nil { @@ -143,7 +142,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } // implements events.Eventable -func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index e8099c7f4..bb191b55e 100644 --- a/node/node.go +++ b/node/node.go @@ -12,7 +12,6 @@ import ( cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc/server" @@ -32,7 +31,7 @@ import _ "net/http/pprof" type Node struct { config cfg.Config sw *p2p.Switch - evsw *events.EventSwitch + evsw types.EventSwitch blockStore *bc.BlockStore bcReactor *bc.BlockchainReactor mempoolReactor *mempl.MempoolReactor @@ -80,7 +79,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato privKey := crypto.GenPrivKeyEd25519() // Make event switch - eventSwitch := events.NewEventSwitch() + eventSwitch := types.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { Exit(Fmt("Failed to start switch: %v", err)) @@ -110,12 +109,6 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato consensusReactor.SetPrivValidator(privValidator) } - // deterministic accountability - err = consensusState.OpenWAL(config.GetString("cswal")) - if err != nil { - log.Error("Failed to open cswal", "error", err.Error()) - } - // Make p2p network switch sw := p2p.NewSwitch(config.GetConfig("p2p")) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -187,7 +180,7 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) { for _, e := range eventables { e.SetEventSwitch(evsw) } @@ -207,10 +200,9 @@ func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetEventSwitch(n.evsw) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) - rpccore.SetConsensusReactor(n.consensusReactor) - rpccore.SetMempoolReactor(n.mempoolReactor) + rpccore.SetMempool(n.mempoolReactor.Mempool) rpccore.SetSwitch(n.sw) - rpccore.SetPrivValidator(n.privValidator) + rpccore.SetPubKey(n.privValidator.PubKey) rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetProxyAppQuery(n.proxyApp.Query()) @@ -252,7 +244,7 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } -func (n *Node) EventSwitch() *events.EventSwitch { +func (n *Node) EventSwitch() types.EventSwitch { return n.evsw } @@ -261,6 +253,14 @@ func (n *Node) PrivValidator() *types.PrivValidator { return n.privValidator } +func (n *Node) GenesisDoc() *types.GenesisDoc { + return n.genesisDoc +} + +func (n *Node) ProxyApp() proxy.AppConns { + return n.proxyApp +} + func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { nodeInfo := &p2p.NodeInfo{ @@ -401,7 +401,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { config.Set("chain_id", state.ChainID) // Make event switch - eventSwitch := events.NewEventSwitch() + eventSwitch := types.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { Exit(Fmt("Failed to start event switch: %v", err)) diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index fd5e2ff6d..92ed7bc7e 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -18,7 +18,7 @@ func BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, err if minHeight == 0 { minHeight = MaxInt(1, maxHeight-20) } - log.Info("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight) + log.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight) blockMetas := []*types.BlockMeta{} for height := maxHeight; height >= minHeight; height-- { diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 2a0b43614..e2ccce442 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -8,18 +8,7 @@ import ( ) func Validators() (*ctypes.ResultValidators, error) { - var blockHeight int - var validators []*types.Validator - - // XXX: this is racy. - // Either use state.LoadState(db) or make state atomic (see #165) - state := consensusState.GetState() - blockHeight = state.LastBlockHeight - state.Validators.Iterate(func(index int, val *types.Validator) bool { - validators = append(validators, val) - return false - }) - + blockHeight, validators := consensusState.GetValidators() return &ctypes.ResultValidators{blockHeight, validators}, nil } diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 6ae2014b2..43a989534 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -10,7 +10,7 @@ import ( ) func UnsafeFlushMempool() (*ctypes.ResultUnsafeFlushMempool, error) { - mempoolReactor.Mempool.Flush() + mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } diff --git a/rpc/core/events.go b/rpc/core/events.go index ab6fd35ec..7dc3c7c31 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,7 +1,6 @@ package core import ( - "github.com/tendermint/go-events" "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -9,10 +8,10 @@ import ( func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { + types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)}) + tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, msg}) wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) }) return &ctypes.ResultSubscribe{}, nil diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index bef85c796..ad599d228 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/tendermint/go-events" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -15,7 +14,7 @@ import ( // Returns right away, with no response func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := mempoolReactor.BroadcastTx(tx, nil) + err := mempool.CheckTx(tx, nil) if err != nil { return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } @@ -25,7 +24,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // Returns with the response from CheckTx func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *tmsp.Response, 1) - err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + err := mempool.CheckTx(tx, func(res *tmsp.Response) { resCh <- res }) if err != nil { @@ -52,14 +51,14 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // subscribe to tx being committed in block - appendTxResCh := make(chan *tmsp.Response, 1) - eventSwitch.AddListenerForEvent("rpc", types.EventStringTx(tx), func(data events.EventData) { - appendTxResCh <- data.(*tmsp.Response) + appendTxResCh := make(chan types.EventDataTx, 1) + types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) { + appendTxResCh <- data.(types.EventDataTx) }) // broadcast the tx and register checktx callback checkTxResCh := make(chan *tmsp.Response, 1) - err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + err := mempool.CheckTx(tx, func(res *tmsp.Response) { checkTxResCh <- res }) if err != nil { @@ -84,11 +83,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // The tx was included in a block. // NOTE we don't return an error regardless of the AppendTx code; // clients must check this to see if they need to send a new tx! - r := appendTxRes.GetAppendTx() return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, + Code: appendTxRes.Code, + Data: appendTxRes.Result, + Log: appendTxRes.Log, }, nil case <-timer.C: r := checkTxR @@ -103,10 +101,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - txs := mempoolReactor.Mempool.Reap(-1) + txs := mempool.Reap(-1) return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil } func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - return &ctypes.ResultUnconfirmedTxs{N: mempoolReactor.Mempool.Size()}, nil + return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 90febf0b0..ac776c7f6 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -2,58 +2,89 @@ package core import ( cfg "github.com/tendermint/go-config" + "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" - "github.com/tendermint/go-events" - bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" - mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) -var eventSwitch *events.EventSwitch -var blockStore *bc.BlockStore -var consensusState *consensus.ConsensusState -var consensusReactor *consensus.ConsensusReactor -var mempoolReactor *mempl.MempoolReactor -var p2pSwitch *p2p.Switch -var privValidator *types.PrivValidator -var genDoc *types.GenesisDoc // cache the genesis structure -var proxyAppQuery proxy.AppConnQuery +//----------------------------------------------------- +// Interfaces for use by RPC +// NOTE: these methods must be thread safe! -var config cfg.Config = nil +type BlockStore interface { + Height() int + LoadBlockMeta(height int) *types.BlockMeta + LoadBlock(height int) *types.Block +} + +type Consensus interface { + GetValidators() (int, []*types.Validator) + GetRoundState() *consensus.RoundState +} + +type Mempool interface { + Size() int + CheckTx(types.Tx, func(*tmsp.Response)) error + Reap(int) []types.Tx + Flush() +} + +type P2P interface { + Listeners() []p2p.Listener + Peers() p2p.IPeerSet + NumPeers() (outbound, inbound, dialig int) + NodeInfo() *p2p.NodeInfo + IsListening() bool + DialSeeds([]string) +} + +var ( + // external, thread safe interfaces + eventSwitch types.EventSwitch + proxyAppQuery proxy.AppConnQuery + config cfg.Config + + // interfaces defined above + blockStore BlockStore + consensusState Consensus + mempool Mempool + p2pSwitch P2P + + // objects + pubKey crypto.PubKey + genDoc *types.GenesisDoc // cache the genesis structure +) func SetConfig(c cfg.Config) { config = c } -func SetEventSwitch(evsw *events.EventSwitch) { +func SetEventSwitch(evsw types.EventSwitch) { eventSwitch = evsw } -func SetBlockStore(bs *bc.BlockStore) { +func SetBlockStore(bs BlockStore) { blockStore = bs } -func SetConsensusState(cs *consensus.ConsensusState) { +func SetConsensusState(cs Consensus) { consensusState = cs } -func SetConsensusReactor(cr *consensus.ConsensusReactor) { - consensusReactor = cr +func SetMempool(mem Mempool) { + mempool = mem } -func SetMempoolReactor(mr *mempl.MempoolReactor) { - mempoolReactor = mr -} - -func SetSwitch(sw *p2p.Switch) { +func SetSwitch(sw P2P) { p2pSwitch = sw } -func SetPrivValidator(pv *types.PrivValidator) { - privValidator = pv +func SetPubKey(pk crypto.PubKey) { + pubKey = pk } func SetGenesisDoc(doc *types.GenesisDoc) { diff --git a/rpc/core/status.go b/rpc/core/status.go index bf3d69ffe..8edadf136 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -22,7 +22,7 @@ func Status() (*ctypes.ResultStatus, error) { return &ctypes.ResultStatus{ NodeInfo: p2pSwitch.NodeInfo(), - PubKey: privValidator.PubKey, + PubKey: pubKey, LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash, LatestBlockHeight: latestHeight, diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 345049d5a..ddc84ef01 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -257,6 +257,40 @@ func TestWSBlockchainGrowth(t *testing.T) { } } +func TestWSTxEvent(t *testing.T) { + wsc := newWSClient(t) + tx := randBytes() + + // listen for the tx I am about to submit + eid := types.EventStringTx(types.Tx(tx)) + subscribe(t, wsc, eid) + defer func() { + unsubscribe(t, wsc, eid) + wsc.Stop() + }() + + // send an tx + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) + if err != nil { + t.Fatal("Error submitting event") + } + + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { + evt, ok := b.(types.EventDataTx) + if !ok { + t.Fatal("Got wrong event type", b) + } + if bytes.Compare([]byte(evt.Tx), tx) != 0 { + t.Error("Event returned different tx") + } + if evt.Code != tmsp.CodeType_OK { + t.Error("Event returned tx error code", evt.Code) + } + return nil + }) +} + /* TODO: this with dummy app.. func TestWSDoubleFire(t *testing.T) { if testing.Short() { diff --git a/scripts/glide/checkout.sh b/scripts/glide/checkout.sh new file mode 100644 index 000000000..4dcc243ca --- /dev/null +++ b/scripts/glide/checkout.sh @@ -0,0 +1,27 @@ +#! /bin/bash +set -u + +function parseGlide() { + cat $1 | grep -A1 $2 | grep -v $2 | awk '{print $2}' +} + + +# fetch and checkout vendored dep + +glide=$1 +lib=$2 + +echo "----------------------------------" +echo "Getting $lib ..." +go get -t github.com/tendermint/$lib/... + +VENDORED=$(parseGlide $glide $lib) +cd $GOPATH/src/github.com/tendermint/$lib +MASTER=$(git rev-parse origin/master) + +if [[ "$VENDORED" != "$MASTER" ]]; then + echo "... VENDORED != MASTER ($VENDORED != $MASTER)" + echo "... Checking out commit $VENDORED" + git checkout $VENDORED &> /dev/null +fi + diff --git a/scripts/glide/parse.sh b/scripts/glide/parse.sh index a92f70bc1..86b5567b7 100644 --- a/scripts/glide/parse.sh +++ b/scripts/glide/parse.sh @@ -3,6 +3,8 @@ set -euo pipefail LIB=$1 -GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock +if [[ "$GLIDE" == "" ]]; then + GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock +fi cat $GLIDE | grep -A1 $LIB | grep -v $LIB | awk '{print $2}' diff --git a/scripts/glide/status.sh b/scripts/glide/status.sh index 374fe38f2..40503dbda 100644 --- a/scripts/glide/status.sh +++ b/scripts/glide/status.sh @@ -2,7 +2,9 @@ # for every github.com/tendermint dependency, warn is if its not synced with origin/master -GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock +if [[ "$GLIDE" == "" ]]; then + GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock +fi # make list of libs LIBS=($(grep "github.com/tendermint" $GLIDE | awk '{print $3}')) @@ -31,6 +33,11 @@ for lib in "${LIBS[@]}"; do echo "Vendored: $VENDORED" echo "Master: $MASTER" fi + elif [[ "$VENDORED" != "$HEAD" ]]; then + echo "" + echo "Vendored version of $lib matches origin/master but differs from HEAD" + echo "Vendored: $VENDORED" + echo "Head: $HEAD" fi done diff --git a/scripts/glide/update.sh b/scripts/glide/update.sh index 64dd8cda0..52d7c219a 100644 --- a/scripts/glide/update.sh +++ b/scripts/glide/update.sh @@ -6,9 +6,12 @@ IFS=$'\n\t' LIB=$1 -GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock +TMCORE=$GOPATH/src/github.com/tendermint/tendermint +if [[ "$GLIDE" == "" ]]; then + GLIDE=$TMCORE/glide.lock +fi -OLD_COMMIT=`bash scripts/glide/parse.sh $LIB` +OLD_COMMIT=`bash $TMCORE/scripts/glide/parse.sh $LIB` PWD=`pwd` cd $GOPATH/src/github.com/tendermint/$LIB @@ -16,4 +19,12 @@ cd $GOPATH/src/github.com/tendermint/$LIB NEW_COMMIT=$(git rev-parse HEAD) cd $PWD -sed -i "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE + +uname -a | grep Linux > /dev/null +if [[ "$?" == 0 ]]; then + # linux + sed -i "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE +else + # mac + sed -i "" "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE +fi diff --git a/state/execution.go b/state/execution.go index 14ea965c1..088c693e2 100644 --- a/state/execution.go +++ b/state/execution.go @@ -5,7 +5,6 @@ import ( "fmt" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -18,7 +17,7 @@ func (s *State) ValidateBlock(block *types.Block) error { // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { +func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. err := s.validateBlock(block) @@ -55,7 +54,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn // Executes block's transactions on proxyAppConn. // TODO: Generate a bitmap or otherwise store tx validity in state. -func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { +func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { var validTxs, invalidTxs = 0, 0 @@ -67,15 +66,25 @@ func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn pro // TODO: make use of this info // Blocks may include invalid txs. // reqAppendTx := req.(tmsp.RequestAppendTx) - if r.AppendTx.Code == tmsp.CodeType_OK { + txError := "" + apTx := r.AppendTx + if apTx.Code == tmsp.CodeType_OK { validTxs += 1 } else { log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log) invalidTxs += 1 + txError = apTx.Code.String() } // NOTE: if we count we can access the tx from the block instead of // pulling it from the req - eventCache.FireEvent(types.EventStringTx(req.GetAppendTx().Tx), res) + event := types.EventDataTx{ + Tx: req.GetAppendTx().Tx, + Result: apTx.Data, + Code: apTx.Code, + Log: apTx.Log, + Error: txError, + } + types.FireEventTx(eventCache, event) } } proxyAppConn.SetResponseCallback(proxyCb) @@ -97,7 +106,7 @@ func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn pro return err } // TODO: Do something with changedValidators - log.Info("TODO: Do something with changedValidators", changedValidators) + log.Info("TODO: Do something with changedValidators", "changedValidators", changedValidators) log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs)) return nil diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile index 83435adcf..7cc952545 100644 --- a/test/docker/Dockerfile +++ b/test/docker/Dockerfile @@ -6,6 +6,8 @@ RUN apt-get update && \ apt-get install -y --no-install-recommends \ jq bsdmainutils vim-common psmisc +# Setup tendermint repo with vendored dependencies +# but without code - docker caching prevents reinstall on code change! ENV REPO $GOPATH/src/github.com/tendermint/tendermint WORKDIR $REPO ADD glide.yaml glide.yaml @@ -13,6 +15,7 @@ ADD glide.lock glide.lock ADD Makefile Makefile RUN make get_vendor_deps +# Now copy in the code COPY . $REPO RUN go install ./cmd/tendermint diff --git a/test/net/test.sh b/test/net/test.sh index e3029a0dd..ed818dc28 100644 --- a/test/net/test.sh +++ b/test/net/test.sh @@ -20,25 +20,39 @@ set -u export TMHEAD=`git rev-parse --abbrev-ref HEAD` export TM_IMAGE="tendermint/tmbase" -# not a go repo +# grab glide for dependency mgmt +go get github.com/Masterminds/glide + +# grab network monitor, install mintnet, netmon +# these might err +echo "... fetching repos. ignore go get errors" set +e go get github.com/tendermint/network_testing -set -e - -# install mintnet, netmon -# TODO: specify branch - go get github.com/tendermint/mintnet go get github.com/tendermint/netmon +set -e +# install vendored deps +echo "GOPATH $GOPATH" + +cd $GOPATH/src/github.com/tendermint/mintnet +echo "... install mintnet dir $(pwd)" +glide install +go install +cd $GOPATH/src/github.com/tendermint/netmon +echo "... install netmon dir $(pwd)" +glide install +go install cd $GOPATH/src/github.com/tendermint/network_testing +echo "... running network test $(pwd)" bash experiments/exp_throughput.sh $DATACENTER $VALSETSIZE $BLOCKSIZE $TX_SIZE $NTXS $MACH_PREFIX $RESULTSDIR $CLOUD_PROVIDER # TODO: publish result! # cleanup +echo "... destroying machines" mintnet destroy --machines $MACH_PREFIX[1-$VALSETSIZE] diff --git a/test/test_libs.sh b/test/test_libs.sh index 6824e3141..9f601f788 100644 --- a/test/test_libs.sh +++ b/test/test_libs.sh @@ -6,27 +6,6 @@ if [[ "$GLIDE" == "" ]]; then fi # get vendored commit for given lib -function parseGlide() { - cat $1 | grep -A1 $2 | grep -v $2 | awk '{print $2}' -} - -# fetch and checkout vendored dep -function getDep() { - lib=$1 - echo "----------------------------------" - echo "Getting $lib ..." - go get -t github.com/tendermint/$lib/... - - VENDORED=$(parseGlide $GLIDE $lib) - cd $GOPATH/src/github.com/tendermint/$lib - MASTER=$(git rev-parse origin/master) - - if [[ "$VENDORED" != "$MASTER" ]]; then - echo "... VENDORED != MASTER ($VENDORED != $MASTER)" - echo "... Checking out commit $VENDORED" - git checkout $VENDORED &> /dev/null - fi -} #################### # libs we depend on @@ -36,7 +15,9 @@ LIBS_GO_TEST=(go-clist go-common go-config go-crypto go-db go-events go-merkle g LIBS_MAKE_TEST=(go-rpc go-wire tmsp) for lib in "${LIBS_GO_TEST[@]}"; do - getDep $lib + + # checkout vendored version of lib + bash scripts/glide/checkout.sh $GLIDE $lib echo "Testing $lib ..." go test --race github.com/tendermint/$lib/... @@ -46,7 +27,6 @@ for lib in "${LIBS_GO_TEST[@]}"; do fi done - for lib in "${LIBS_MAKE_TEST[@]}"; do getDep $lib diff --git a/types/events.go b/types/events.go index 68313ff25..4fe4d4075 100644 --- a/types/events.go +++ b/types/events.go @@ -5,6 +5,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-events" "github.com/tendermint/go-wire" + tmsp "github.com/tendermint/tmsp/types" ) // Functions to generate eventId strings @@ -35,7 +36,7 @@ func EventStringVote() string { return "Vote" } // implements events.EventData type TMEventData interface { events.EventData - // AssertIsTMEventData() + AssertIsTMEventData() } const ( @@ -72,10 +73,11 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Tx Tx `json:"tx"` - Result []byte `json:"result"` - Log string `json:"log"` - Error string `json:"error"` + Tx Tx `json:"tx"` + Result []byte `json:"result"` + Log string `json:"log"` + Code tmsp.CodeType `json:"code"` + Error string `json:"error"` } // NOTE: This goes into the replay WAL @@ -99,3 +101,99 @@ func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} + +//---------------------------------------- +// Wrappers for type safety + +type Fireable interface { + events.Fireable +} + +type Eventable interface { + SetEventSwitch(EventSwitch) +} + +type EventSwitch interface { + events.EventSwitch +} + +type EventCache interface { + Fireable + Flush() +} + +func NewEventSwitch() EventSwitch { + return events.NewEventSwitch() +} + +func NewEventCache(evsw EventSwitch) EventCache { + return events.NewEventCache(evsw) +} + +// All events should be based on this FireEvent to ensure they are TMEventData +func fireEvent(fireable events.Fireable, event string, data TMEventData) { + fireable.FireEvent(event, data) +} + +func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) { + evsw.AddListenerForEvent(id, event, func(data events.EventData) { + cb(data.(TMEventData)) + }) + +} + +//--- block, tx, and vote events + +func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) { + fireEvent(fireable, EventStringNewBlock(), block) +} + +func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) { + fireEvent(fireable, EventStringNewBlockHeader(), header) +} + +func FireEventVote(fireable events.Fireable, vote EventDataVote) { + fireEvent(fireable, EventStringVote(), vote) +} + +func FireEventTx(fireable events.Fireable, tx EventDataTx) { + fireEvent(fireable, EventStringTx(tx.Tx), tx) +} + +//--- EventDataRoundState events + +func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringNewRoundStep(), rs) +} + +func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringTimeoutPropose(), rs) +} + +func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringTimeoutWait(), rs) +} + +func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringNewRound(), rs) +} + +func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringCompleteProposal(), rs) +} + +func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringPolka(), rs) +} + +func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringUnlock(), rs) +} + +func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringRelock(), rs) +} + +func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringLock(), rs) +} diff --git a/version/version.go b/version/version.go index 104353142..beb7e8615 100644 --- a/version/version.go +++ b/version/version.go @@ -2,6 +2,6 @@ package version const Maj = "0" const Min = "7" // tmsp useability (protobuf, unix); optimizations; broadcast_tx_commit -const Fix = "2" // query conn, peer filter, fast sync fix (+hot fix to tmsp connecting) +const Fix = "3" // fixes to event safety, mempool deadlock, hvs race, replay non-empty blocks const Version = Maj + "." + Min + "." + Fix