diff --git a/.gitignore b/.gitignore index 23ae616ee..22a6be0b9 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ test/logs coverage.txt docs/_build docs/tools + +scripts/wal2json/wal2json +scripts/cutWALUntil/cutWALUntil diff --git a/consensus/replay.go b/consensus/replay.go index c5cb42191..d3c5cd5d1 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "hash/crc32" "io" "reflect" "strconv" @@ -11,7 +12,6 @@ import ( "time" abci "github.com/tendermint/abci/types" - wire "github.com/tendermint/go-wire" auto "github.com/tendermint/tmlibs/autofile" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -22,6 +22,8 @@ import ( "github.com/tendermint/tendermint/version" ) +var crc32c = crc32.MakeTable(crc32.Castagnoli) + // Functionality to replay blocks and messages on recovery from a crash. // There are two general failure scenarios: failure during consensus, and failure while applying the block. // The former is handled by the WAL, the latter by the proxyApp Handshake on restart, @@ -35,18 +37,11 @@ import ( // as if it were received in receiveRoutine // Lines that start with "#" are ignored. // NOTE: receiveRoutine should not be running -func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { - // Skip over empty and meta lines - if len(msgBytes) == 0 || msgBytes[0] == '#' { +func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error { + // skip meta messages + if _, ok := msg.Msg.(EndHeightMessage); ok { return nil } - var err error - var msg TimedWALMessage - wire.ReadJSON(&msg, msgBytes, &err) - if err != nil { - fmt.Println("MsgBytes:", msgBytes, string(msgBytes)) - return fmt.Errorf("Error reading json data: %v", err) - } // for logging switch m := msg.Msg.(type) { @@ -104,7 +99,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { // Ensure that ENDHEIGHT for this height doesn't exist // NOTE: This is just a sanity check. As far as we know things work fine without it, // and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). - gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) + gr, found, err := cs.wal.SearchForEndHeight(uint64(csHeight)) if gr != nil { gr.Close() } @@ -113,33 +108,33 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { } // Search for last height marker - gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) + gr, found, err = cs.wal.SearchForEndHeight(uint64(csHeight - 1)) if err == io.EOF { cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) } else if err != nil { return err - } else { - defer gr.Close() } if !found { return errors.New(cmn.Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) } + defer gr.Close() cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight) + var msg *TimedWALMessage + dec := WALDecoder{gr} + for { - line, err := gr.ReadLine() - if err != nil { - if err == io.EOF { - break - } else { - return err - } + msg, err = dec.Decode() + if err == io.EOF { + break + } else if err != nil { + return err } // NOTE: since the priv key is set when the msgs are received // it will attempt to eg double sign but we can just ignore it // since the votes will be replayed and we'll get to the next step - if err := cs.readReplayMessage([]byte(line), nil); err != nil { + if err := cs.readReplayMessage(msg, nil); err != nil { return err } } diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 1182aaf04..24df20fb3 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "fmt" + "io" "os" "strconv" "strings" @@ -53,12 +54,20 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { defer pb.fp.Close() var nextN int // apply N msgs in a row - for pb.scanner.Scan() { + var msg *TimedWALMessage + for { if nextN == 0 && console { nextN = pb.replayConsoleLoop() } - if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { + msg, err = pb.dec.Decode() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + + if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil { return err } @@ -76,9 +85,9 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { type playback struct { cs *ConsensusState - fp *os.File - scanner *bufio.Scanner - count int // how many lines/msgs into the file are we + fp *os.File + dec *WALDecoder + count int // how many lines/msgs into the file are we // replays can be reset to beginning fileName string // so we can close/reopen the file @@ -91,7 +100,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm. fp: fp, fileName: fileName, genesisState: genState, - scanner: bufio.NewScanner(fp), + dec: NewWALDecoder(fp), } } @@ -111,13 +120,20 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { return err } pb.fp = fp - pb.scanner = bufio.NewScanner(fp) + pb.dec = NewWALDecoder(fp) count = pb.count - count fmt.Printf("Reseting from %d to %d\n", pb.count, count) pb.count = 0 pb.cs = newCS - for i := 0; pb.scanner.Scan() && i < count; i++ { - if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { + var msg *TimedWALMessage + for i := 0; i < count; i++ { + msg, err = pb.dec.Decode() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil { return err } pb.count += 1 diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 86c61035a..7d882dc1b 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "path" - "strings" "testing" "time" @@ -58,14 +57,14 @@ var baseStepChanges = []int{3, 6, 8} // test recovery from each line in each testCase var testCases = []*testCase{ - newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part) - newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part - newTestCase("small_block2", []int{3, 9, 11}), // small block with txs across 6 smaller block parts + newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part) + newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part + newTestCase("small_block2", []int{3, 12, 14}), // small block with txs across 6 smaller block parts } type testCase struct { name string - log string //full cs wal + log []byte //full cs wal stepMap map[int]int8 // map lines of log to privval step proposeLine int @@ -100,29 +99,27 @@ func newMapFromChanges(changes []int) map[int]int8 { return m } -func readWAL(p string) string { +func readWAL(p string) []byte { b, err := ioutil.ReadFile(p) if err != nil { panic(err) } - return string(b) + return b } -func writeWAL(walMsgs string) string { - tempDir := os.TempDir() - walDir := path.Join(tempDir, "/wal"+cmn.RandStr(12)) - walFile := path.Join(walDir, "wal") - // Create WAL directory - err := cmn.EnsureDir(walDir, 0700) +func writeWAL(walMsgs []byte) string { + walFile, err := ioutil.TempFile("", "wal") if err != nil { - panic(err) + panic(fmt.Errorf("failed to create temp WAL file: %v", err)) } - // Write the needed WAL to file - err = cmn.WriteFile(walFile, []byte(walMsgs), 0600) + _, err = walFile.Write(walMsgs) if err != nil { - panic(err) + panic(fmt.Errorf("failed to write to temp WAL file: %v", err)) } - return walFile + if err := walFile.Close(); err != nil { + panic(fmt.Errorf("failed to close temp WAL file: %v", err)) + } + return walFile.Name() } func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) { @@ -167,7 +164,7 @@ func toPV(pv types.PrivValidator) *types.PrivValidatorFS { return pv.(*types.PrivValidatorFS) } -func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { +func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, []byte, string) { t.Log("-------------------------------------") t.Logf("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter) @@ -176,11 +173,13 @@ func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bo lineStep -= 1 } - split := strings.Split(thisCase.log, "\n") + split := bytes.Split(thisCase.log, walSeparator) lastMsg := split[nLines] // we write those lines up to (not including) one with the signature - walFile := writeWAL(strings.Join(split[:nLines], "\n") + "\n") + b := bytes.Join(split[:nLines], walSeparator) + b = append(b, walSeparator...) + walFile := writeWAL(b) cs := fixedConsensusStateDummy() @@ -195,14 +194,19 @@ func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bo return cs, newBlockCh, lastMsg, walFile } -func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage { - var err error - var msg TimedWALMessage - wire.ReadJSON(&msg, []byte(walMsg), &err) +func readTimedWALMessage(t *testing.T, rawMsg []byte) TimedWALMessage { + b := bytes.NewBuffer(rawMsg) + // because rawMsg does not contain a separator and WALDecoder#Decode expects it + _, err := b.Write(walSeparator) + if err != nil { + t.Fatal(err) + } + dec := NewWALDecoder(b) + msg, err := dec.Decode() if err != nil { t.Fatalf("Error reading json data: %v", err) } - return msg + return *msg } //----------------------------------------------- @@ -211,10 +215,15 @@ func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage { func TestWALCrashAfterWrite(t *testing.T) { for _, thisCase := range testCases { - split := strings.Split(thisCase.log, "\n") - for i := 0; i < len(split)-1; i++ { - cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true) - runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1) + splitSize := bytes.Count(thisCase.log, walSeparator) + for i := 0; i < splitSize-1; i++ { + t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) { + cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true) + cs.config.TimeoutPropose = 100 + runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1) + // cleanup + os.Remove(walFile) + }) } } } @@ -226,14 +235,19 @@ func TestWALCrashAfterWrite(t *testing.T) { func TestWALCrashBeforeWritePropose(t *testing.T) { for _, thisCase := range testCases { lineNum := thisCase.proposeLine - // setup replay test where last message is a proposal - cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false) - msg := readTimedWALMessage(t, proposalMsg) - proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) - // Set LastSig - toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) - toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature - runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum) + t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) { + // setup replay test where last message is a proposal + cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false) + cs.config.TimeoutPropose = 100 + msg := readTimedWALMessage(t, proposalMsg) + proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) + // Set LastSig + toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) + toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature + runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum) + // cleanup + os.Remove(walFile) + }) } } @@ -315,7 +329,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { if err != nil { t.Fatal(err) } - walFile := writeWAL(string(walBody)) + walFile := writeWAL(walBody) config.Consensus.SetWalFile(walFile) privVal := types.LoadPrivValidatorFS(config.PrivValidatorFile()) @@ -465,7 +479,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { // Search for height marker - gr, found, err := wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(0)) + gr, found, err := wal.SearchForEndHeight(0) if err != nil { return nil, nil, err } @@ -479,20 +493,17 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { var blockParts *types.PartSet var blocks []*types.Block var commits []*types.Commit - for { - line, err := gr.ReadLine() - if err != nil { - if err == io.EOF { - break - } else { - return nil, nil, err - } - } - piece, err := readPieceFromWAL([]byte(line)) - if err != nil { + dec := NewWALDecoder(gr) + for { + msg, err := dec.Decode() + if err == io.EOF { + break + } else if err != nil { return nil, nil, err } + + piece := readPieceFromWAL(msg) if piece == nil { continue } @@ -528,17 +539,10 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { return blocks, commits, nil } -func readPieceFromWAL(msgBytes []byte) (interface{}, error) { - // Skip over empty and meta lines - if len(msgBytes) == 0 || msgBytes[0] == '#' { - return nil, nil - } - var err error - var msg TimedWALMessage - wire.ReadJSON(&msg, msgBytes, &err) - if err != nil { - fmt.Println("MsgBytes:", msgBytes, string(msgBytes)) - return nil, fmt.Errorf("Error reading json data: %v", err) +func readPieceFromWAL(msg *TimedWALMessage) interface{} { + // skip meta messages + if _, ok := msg.Msg.(EndHeightMessage); ok { + return nil } // for logging @@ -546,14 +550,15 @@ func readPieceFromWAL(msgBytes []byte) (interface{}, error) { case msgInfo: switch msg := m.Msg.(type) { case *ProposalMessage: - return &msg.Proposal.BlockPartsHeader, nil + return &msg.Proposal.BlockPartsHeader case *BlockPartMessage: - return msg.Part, nil + return msg.Part case *VoteMessage: - return msg.Vote, nil + return msg.Vote } } - return nil, nil + + return nil } // fresh state and mock store diff --git a/consensus/state.go b/consensus/state.go index 448dda158..e01c2ab25 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1188,7 +1188,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { // As is, ConsensusState should not be started again // until we successfully call ApplyBlock (ie. here or in Handshake after restart) if cs.wal != nil { - cs.wal.writeEndHeight(height) + cs.wal.Save(EndHeightMessage{uint64(height)}) } fail.Fail() // XXX diff --git a/consensus/test_data/build.sh b/consensus/test_data/build.sh index bbcb810f5..dcec6f2a0 100755 --- a/consensus/test_data/build.sh +++ b/consensus/test_data/build.sh @@ -1,112 +1,148 @@ #!/usr/bin/env bash -# XXX: removes tendermint dir +# Requires: killall command and jq JSON processor. -cd "$GOPATH/src/github.com/tendermint/tendermint" || exit 1 +# Get the parent directory of where this script is. +SOURCE="${BASH_SOURCE[0]}" +while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done +DIR="$( cd -P "$( dirname "$SOURCE" )/../.." && pwd )" + +# Change into that dir because we expect that. +cd "$DIR" || exit 1 # Make sure we have a tendermint command. if ! hash tendermint 2>/dev/null; then - make install + make install +fi + +# Make sure we have a cutWALUntil binary. +cutWALUntil=./scripts/cutWALUntil/cutWALUntil +cutWALUntilDir=$(dirname $cutWALUntil) +if ! hash $cutWALUntil 2>/dev/null; then + cd "$cutWALUntilDir" && go build && cd - || exit 1 +fi + +TMHOME=$(mktemp -d) +export TMHOME="$TMHOME" + +if [[ ! -d "$TMHOME" ]]; then + echo "Could not create temp directory" + exit 1 +else + echo "TMHOME: ${TMHOME}" fi -# specify a dir to copy # TODO: eventually we should replace with `tendermint init --test` DIR_TO_COPY=$HOME/.tendermint_test/consensus_state_test +if [ ! -d "$DIR_TO_COPY" ]; then + echo "$DIR_TO_COPY does not exist. Please run: go test ./consensus" + exit 1 +fi +echo "==> Copying ${DIR_TO_COPY} to ${TMHOME} directory..." +cp -r "$DIR_TO_COPY"/* "$TMHOME" -TMHOME="$HOME/.tendermint" -#rm -rf "$TMHOME" -#cp -r "$DIR_TO_COPY" "$TMHOME" -#mv $TMHOME/config.toml $TMHOME/config.toml.bak -cp $TMHOME/genesis.json $TMHOME/genesis.json.bak +# preserve original genesis file because later it will be modified (see small_block2) +cp "$TMHOME/genesis.json" "$TMHOME/genesis.json.bak" function reset(){ -tendermint unsafe_reset_all -cp $TMHOME/genesis.json.bak $TMHOME/genesis.json + echo "==> Resetting tendermint..." + tendermint unsafe_reset_all + cp "$TMHOME/genesis.json.bak" "$TMHOME/genesis.json" } reset -# empty block function empty_block(){ -tendermint node --proxy_app=persistent_dummy &> /dev/null & -sleep 5 -killall tendermint + echo "==> Starting tendermint..." + tendermint node --proxy_app=persistent_dummy &> /dev/null & + sleep 5 + echo "==> Killing tendermint..." + killall tendermint -# /q would print up to and including the match, then quit. -# /Q doesn't include the match. -# http://unix.stackexchange.com/questions/11305/grep-show-all-the-file-up-to-the-match -# note on macbook we need `gnu-sed` for Q to work -sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal + echo "==> Copying WAL log..." + $cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_empty_block.cswal + mv consensus/test_data/new_empty_block.cswal consensus/test_data/empty_block.cswal -reset + reset } -# many blocks function many_blocks(){ -bash scripts/txs/random.sh 1000 36657 &> /dev/null & -PID=$! -tendermint node --proxy_app=persistent_dummy &> /dev/null & -sleep 10 -killall tendermint -kill -9 $PID + bash scripts/txs/random.sh 1000 36657 &> /dev/null & + PID=$! + echo "==> Starting tendermint..." + tendermint node --proxy_app=persistent_dummy &> /dev/null & + sleep 10 + echo "==> Killing tendermint..." + kill -9 $PID + killall tendermint -sed '/ENDHEIGHT: 6/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal + echo "==> Copying WAL log..." + $cutWALUntil "$TMHOME/data/cs.wal/wal" 6 consensus/test_data/new_many_blocks.cswal + mv consensus/test_data/new_many_blocks.cswal consensus/test_data/many_blocks.cswal -reset + reset } -# small block 1 function small_block1(){ -bash scripts/txs/random.sh 1000 36657 &> /dev/null & -PID=$! -tendermint node --proxy_app=persistent_dummy &> /dev/null & -sleep 10 -killall tendermint -kill -9 $PID + bash scripts/txs/random.sh 1000 36657 &> /dev/null & + PID=$! + echo "==> Starting tendermint..." + tendermint node --proxy_app=persistent_dummy &> /dev/null & + sleep 10 + echo "==> Killing tendermint..." + kill -9 $PID + killall tendermint -sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal + echo "==> Copying WAL log..." + $cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_small_block1.cswal + mv consensus/test_data/new_small_block1.cswal consensus/test_data/small_block1.cswal -reset + reset } -# small block 2 (part size = 512) +# block part size = 512 function small_block2(){ -cat ~/.tendermint/genesis.json | jq '. + {"consensus_params": {"block_size_params": {"max_bytes":1000000}, "block_gossip_params": {"block_part_size_bytes":512}}}' > genesis.json.new -mv genesis.json.new ~/.tendermint/genesis.json -bash scripts/txs/random.sh 1000 36657 &> /dev/null & -PID=$! -tendermint node --proxy_app=persistent_dummy &> /dev/null & -sleep 10 -killall tendermint -kill -9 $PID + cat "$TMHOME/genesis.json" | jq '. + {consensus_params: {block_size_params: {max_bytes: 22020096}, block_gossip_params: {block_part_size_bytes: 512}}}' > "$TMHOME/new_genesis.json" + mv "$TMHOME/new_genesis.json" "$TMHOME/genesis.json" + bash scripts/txs/random.sh 1000 36657 &> /dev/null & + PID=$! + echo "==> Starting tendermint..." + tendermint node --proxy_app=persistent_dummy &> /dev/null & + sleep 5 + echo "==> Killing tendermint..." + kill -9 $PID + killall tendermint -sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal + echo "==> Copying WAL log..." + $cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_small_block2.cswal + mv consensus/test_data/new_small_block2.cswal consensus/test_data/small_block2.cswal -reset + reset } case "$1" in - "small_block1") - small_block1 - ;; - "small_block2") - small_block2 - ;; - "empty_block") - empty_block - ;; - "many_blocks") - many_blocks - ;; - *) - small_block1 - small_block2 - empty_block - many_blocks + "small_block1") + small_block1 + ;; + "small_block2") + small_block2 + ;; + "empty_block") + empty_block + ;; + "many_blocks") + many_blocks + ;; + *) + small_block1 + small_block2 + empty_block + many_blocks esac - +echo "==> Cleaning up..." +rm -rf "$TMHOME" diff --git a/consensus/test_data/empty_block.cswal b/consensus/test_data/empty_block.cswal index e8021312a..609f4ddf3 100644 Binary files a/consensus/test_data/empty_block.cswal and b/consensus/test_data/empty_block.cswal differ diff --git a/consensus/test_data/many_blocks.cswal b/consensus/test_data/many_blocks.cswal index cf6822809..ab486b5a1 100644 Binary files a/consensus/test_data/many_blocks.cswal and b/consensus/test_data/many_blocks.cswal differ diff --git a/consensus/test_data/small_block1.cswal b/consensus/test_data/small_block1.cswal index 6705b8a8a..b7c7e777f 100644 Binary files a/consensus/test_data/small_block1.cswal and b/consensus/test_data/small_block1.cswal differ diff --git a/consensus/test_data/small_block2.cswal b/consensus/test_data/small_block2.cswal index 5177f1b20..2ef077dcd 100644 Binary files a/consensus/test_data/small_block2.cswal and b/consensus/test_data/small_block2.cswal differ diff --git a/consensus/wal.go b/consensus/wal.go index f9a2a8015..80f4b809e 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -1,6 +1,11 @@ package consensus import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + "io" "time" wire "github.com/tendermint/go-wire" @@ -12,11 +17,21 @@ import ( //-------------------------------------------------------- // types and functions for savings consensus messages +var ( + walSeparator = []byte{55, 127, 6, 130} // 0x377f0682 - magic number +) + type TimedWALMessage struct { - Time time.Time `json:"time"` + Time time.Time `json:"time"` // for debugging purposes Msg WALMessage `json:"msg"` } +// EndHeightMessage marks the end of the given height inside WAL. +// @internal used by scripts/cutWALUntil util. +type EndHeightMessage struct { + Height uint64 `json:"height"` +} + type WALMessage interface{} var _ = wire.RegisterInterface( @@ -24,6 +39,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{types.EventDataRoundState{}, 0x01}, wire.ConcreteType{msgInfo{}, 0x02}, wire.ConcreteType{timeoutInfo{}, 0x03}, + wire.ConcreteType{EndHeightMessage{}, 0x04}, ) //-------------------------------------------------------- @@ -38,6 +54,8 @@ type WAL struct { group *auto.Group light bool // ignore block parts + + enc *WALEncoder } func NewWAL(walFile string, light bool) (*WAL, error) { @@ -48,6 +66,7 @@ func NewWAL(walFile string, light bool) (*WAL, error) { wal := &WAL{ group: group, light: light, + enc: NewWALEncoder(group), } wal.BaseService = *cmn.NewBaseService(nil, "WAL", wal) return wal, nil @@ -58,7 +77,7 @@ func (wal *WAL) OnStart() error { if err != nil { return err } else if size == 0 { - wal.writeEndHeight(0) + wal.Save(EndHeightMessage{0}) } _, err = wal.group.Start() return err @@ -70,35 +89,191 @@ func (wal *WAL) OnStop() { } // called in newStep and for each pass in receiveRoutine -func (wal *WAL) Save(wmsg WALMessage) { +func (wal *WAL) Save(msg WALMessage) { if wal == nil { return } + if wal.light { // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts) - if mi, ok := wmsg.(msgInfo); ok { + if mi, ok := msg.(msgInfo); ok { if mi.PeerKey != "" { return } } } + // Write the wal message - var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg}) - err := wal.group.WriteLine(string(wmsgBytes)) + if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil { + cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg)) + } + + // TODO: only flush when necessary + if err := wal.group.Flush(); err != nil { + cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err)) + } +} + +// SearchForEndHeight searches for the EndHeightMessage with the height and +// returns an auto.GroupReader, whenever it was found or not and an error. +// Group reader will be nil if found equals false. +// +// CONTRACT: caller must close group reader. +func (wal *WAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { + var msg *TimedWALMessage + + // NOTE: starting from the last file in the group because we're usually + // searching for the last height. See replay.go + min, max := wal.group.MinIndex(), wal.group.MaxIndex() + wal.Logger.Debug("Searching for height", "height", height, "min", min, "max", max) + for index := max; index >= min; index-- { + gr, err = wal.group.NewReader(index) + if err != nil { + return nil, false, err + } + + dec := NewWALDecoder(gr) + for { + msg, err = dec.Decode() + if err == io.EOF { + // check next file + break + } + if err != nil { + gr.Close() + return nil, false, err + } + + if m, ok := msg.Msg.(EndHeightMessage); ok { + if m.Height == height { // found + wal.Logger.Debug("Found", "height", height, "index", index) + return gr, true, nil + } + } + } + + gr.Close() + } + + return nil, false, nil +} + +/////////////////////////////////////////////////////////////////////////////// + +// A WALEncoder writes custom-encoded WAL messages to an output stream. +// +// Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value (go-wire encoded) +type WALEncoder struct { + wr io.Writer +} + +// NewWALEncoder returns a new encoder that writes to wr. +func NewWALEncoder(wr io.Writer) *WALEncoder { + return &WALEncoder{wr} +} + +// Encode writes the custom encoding of v to the stream. +func (enc *WALEncoder) Encode(v interface{}) error { + data := wire.BinaryBytes(v) + + crc := crc32.Checksum(data, crc32c) + length := uint32(len(data)) + totalLength := 8 + int(length) + + msg := make([]byte, totalLength) + binary.BigEndian.PutUint32(msg[0:4], crc) + binary.BigEndian.PutUint32(msg[4:8], length) + copy(msg[8:], data) + + _, err := enc.wr.Write(msg) + + if err == nil { + // TODO [Anton Kaliaev 23 Oct 2017]: remove separator + _, err = enc.wr.Write(walSeparator) + } + + return err +} + +/////////////////////////////////////////////////////////////////////////////// + +// A WALDecoder reads and decodes custom-encoded WAL messages from an input +// stream. See WALEncoder for the format used. +// +// It will also compare the checksums and make sure data size is equal to the +// length from the header. If that is not the case, error will be returned. +type WALDecoder struct { + rd io.Reader +} + +// NewWALDecoder returns a new decoder that reads from rd. +func NewWALDecoder(rd io.Reader) *WALDecoder { + return &WALDecoder{rd} +} + +// Decode reads the next custom-encoded value from its reader and returns it. +func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { + b := make([]byte, 4) + + n, err := dec.rd.Read(b) + if err == io.EOF { + return nil, err + } if err != nil { - cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg)) + return nil, fmt.Errorf("failed to read checksum: %v", err) } - // TODO: only flush when necessary - if err := wal.group.Flush(); err != nil { - cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err)) + crc := binary.BigEndian.Uint32(b) + + b = make([]byte, 4) + n, err = dec.rd.Read(b) + if err == io.EOF { + return nil, err } + if err != nil { + return nil, fmt.Errorf("failed to read length: %v", err) + } + length := binary.BigEndian.Uint32(b) + + data := make([]byte, length) + n, err = dec.rd.Read(data) + if err == io.EOF { + return nil, err + } + if err != nil { + return nil, fmt.Errorf("not enough bytes for data: %v (want: %d, read: %v)", err, length, n) + } + + // check checksum before decoding data + actualCRC := crc32.Checksum(data, crc32c) + if actualCRC != crc { + return nil, fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC) + } + + var nn int + var res *TimedWALMessage + res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage) + if err != nil { + return nil, fmt.Errorf("failed to decode data: %v", err) + } + + // TODO [Anton Kaliaev 23 Oct 2017]: remove separator + if err = readSeparator(dec.rd); err != nil { + return nil, err + } + + return res, err } -func (wal *WAL) writeEndHeight(height int) { - wal.group.WriteLine(cmn.Fmt("#ENDHEIGHT: %v", height)) - - // TODO: only flush when necessary - if err := wal.group.Flush(); err != nil { - cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err)) +// readSeparator reads a separator from r. It returns any error from underlying +// reader or if it's not a separator. +func readSeparator(r io.Reader) error { + b := make([]byte, len(walSeparator)) + _, err := r.Read(b) + if err != nil { + return fmt.Errorf("failed to read separator: %v", err) } + if !bytes.Equal(b, walSeparator) { + return fmt.Errorf("not a separator: %v", b) + } + return nil } diff --git a/consensus/wal_test.go b/consensus/wal_test.go new file mode 100644 index 000000000..0235afab3 --- /dev/null +++ b/consensus/wal_test.go @@ -0,0 +1,62 @@ +package consensus + +import ( + "bytes" + "path" + "testing" + "time" + + "github.com/tendermint/tendermint/consensus/types" + tmtypes "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWALEncoderDecoder(t *testing.T) { + now := time.Now() + msgs := []TimedWALMessage{ + TimedWALMessage{Time: now, Msg: EndHeightMessage{0}}, + TimedWALMessage{Time: now, Msg: timeoutInfo{Duration: time.Second, Height: 1, Round: 1, Step: types.RoundStepPropose}}, + } + + b := new(bytes.Buffer) + + for _, msg := range msgs { + b.Reset() + + enc := NewWALEncoder(b) + err := enc.Encode(&msg) + require.NoError(t, err) + + dec := NewWALDecoder(b) + decoded, err := dec.Decode() + require.NoError(t, err) + + assert.Equal(t, msg.Time.Truncate(time.Millisecond), decoded.Time) + assert.Equal(t, msg.Msg, decoded.Msg) + } +} + +func TestSearchForEndHeight(t *testing.T) { + wal, err := NewWAL(path.Join(data_dir, "many_blocks.cswal"), false) + if err != nil { + t.Fatal(err) + } + + h := 3 + gr, found, err := wal.SearchForEndHeight(uint64(h)) + assert.NoError(t, err, cmn.Fmt("expected not to err on height %d", h)) + assert.True(t, found, cmn.Fmt("expected to find end height for %d", h)) + assert.NotNil(t, gr, "expected group not to be nil") + defer gr.Close() + + dec := NewWALDecoder(gr) + msg, err := dec.Decode() + assert.NoError(t, err, "expected to decode a message") + rs, ok := msg.Msg.(tmtypes.EventDataRoundState) + assert.True(t, ok, "expected message of type EventDataRoundState") + assert.Equal(t, rs.Height, h+1, cmn.Fmt("wrong height")) + +} diff --git a/glide.lock b/glide.lock index a8e571de5..ab3d5e856 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 9867fa4543ca4daea1a96a3883a7f483819c067ca34ed6d3aa67aace4a289e93 -updated: 2017-10-25T07:15:06.075544403Z +hash: 58d209dee0c21d507226d6b56f7b7f49d24f60090ef9a6c1d89bc27ff00f90e4 +updated: 2017-10-26T00:04:10.142172009-04:00 imports: - name: github.com/btcsuite/btcd version: c7588cbf7690cd9f047a28efa2dcd8f2435a4e5e @@ -10,7 +10,7 @@ imports: - name: github.com/fsnotify/fsnotify version: 4da3e2cfbabc9f751898f250b49f2439785783a1 - name: github.com/go-kit/kit - version: e2b298466b32c7cd5579a9b9b07e968fc9d9452c + version: 4dc7be5d2d12881735283bcab7352178e190fc71 subpackages: - log - log/level @@ -26,7 +26,7 @@ imports: - name: github.com/go-stack/stack version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf - name: github.com/gogo/protobuf - version: 117892bf1866fbaa2318c03e50e40564c8845457 + version: 342cbe0a04158f6dcb03ca0079991a51a4248c02 subpackages: - proto - name: github.com/golang/protobuf @@ -40,7 +40,7 @@ imports: - name: github.com/golang/snappy version: 553a641470496b2327abcac10b36396bd98e45c9 - name: github.com/gorilla/websocket - version: 71fa72d4842364bc5f74185f4161e0099ea3624a + version: ea4d1f681babbce9545c9c5f3d5194a789c89f5b - name: github.com/hashicorp/hcl version: 23c074d0eceb2b8a5bfdbb271ab780cde70f05a8 subpackages: @@ -63,7 +63,7 @@ imports: - name: github.com/mitchellh/mapstructure version: 06020f85339e21b2478f756a78e295255ffa4d6a - name: github.com/pelletier/go-toml - version: 8c31c2ec65b208cc2ad1608bf25a3ff91adf1944 + version: 4e9e0ee19b60b13eb79915933f44d8ed5f268bdd - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d - name: github.com/rcrowley/go-metrics @@ -81,7 +81,7 @@ imports: - name: github.com/spf13/pflag version: 97afa5e7ca8a08a383cb259e06636b5e2cc7897f - name: github.com/spf13/viper - version: 8ef37cbca71638bf32f3d5e194117d4cb46da163 + version: 25b30aa063fc18e48662b86996252eabdcf2f0c7 - name: github.com/syndtr/goleveldb version: b89cc31ef7977104127d34c1bd31ebd1a9db2199 subpackages: @@ -98,7 +98,7 @@ imports: - leveldb/table - leveldb/util - name: github.com/tendermint/abci - version: bb9bb4aa465a31fd6a272765be381888e6898c74 + version: a0e38dc58374f485481ea07b23659d85f670a694 subpackages: - client - example/counter @@ -118,11 +118,11 @@ imports: - data - data/base58 - name: github.com/tendermint/iavl - version: 721710e7aa59f61dbfbf558943a207ba3fe6b926 + version: 595f3dcd5b6cd4a292e90757ae6d367fd7a6e653 subpackages: - iavl - name: github.com/tendermint/tmlibs - version: 0a652499ead7cd20a57a6a592f0491a2b493bb85 + version: b30e3ba26d4077edeed83c50a4e0c38b0ec9ddb3 subpackages: - autofile - cli @@ -136,7 +136,7 @@ imports: - merkle - test - name: golang.org/x/crypto - version: edd5e9b0879d13ee6970a50153d85b8fec9f7686 + version: 2509b142fb2b797aa7587dad548f113b2c0f20ce subpackages: - curve25519 - nacl/box @@ -147,7 +147,7 @@ imports: - ripemd160 - salsa20/salsa - name: golang.org/x/net - version: cd69bc3fc700721b709c3a59e16e24c67b58f6ff + version: 4b14673ba32bee7f5ac0f990a48f033919fd418b subpackages: - context - http2 @@ -157,11 +157,11 @@ imports: - lex/httplex - trace - name: golang.org/x/sys - version: 8dbc5d05d6edcc104950cc299a1ce6641235bc86 + version: 176de7413414c01569163271c745672ff04a7267 subpackages: - unix - name: golang.org/x/text - version: c01e4764d870b77f8abe5096ee19ad20d80e8075 + version: 6eab0e8f74e86c598ec3b6fad4888e0c11482d48 subpackages: - secure/bidirule - transform @@ -172,10 +172,9 @@ imports: subpackages: - googleapis/rpc/status - name: google.golang.org/grpc - version: a5986a5c88227370a9c0a82e5277167229c034cd + version: f7bf885db0b7479a537ec317c6e48ce53145f3db subpackages: - balancer - - balancer/roundrobin - codes - connectivity - credentials @@ -187,8 +186,6 @@ imports: - naming - peer - resolver - - resolver/dns - - resolver/passthrough - stats - status - tap diff --git a/glide.yaml b/glide.yaml index a74870ab2..82c2cd7b6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -2,17 +2,21 @@ package: github.com/tendermint/tendermint import: - package: github.com/ebuchman/fail-test - package: github.com/gogo/protobuf + version: v0.5 subpackages: - proto - package: github.com/golang/protobuf subpackages: - proto - package: github.com/gorilla/websocket + version: v1.2.0 - package: github.com/pkg/errors version: ~0.8.0 - package: github.com/rcrowley/go-metrics - package: github.com/spf13/cobra + version: v0.0.1 - package: github.com/spf13/viper + version: v1.0.0 - package: github.com/tendermint/abci version: develop subpackages: @@ -50,6 +54,7 @@ import: subpackages: - context - package: google.golang.org/grpc + version: v1.7.0 testImport: - package: github.com/go-kit/kit subpackages: diff --git a/scripts/cutWALUntil/main.go b/scripts/cutWALUntil/main.go new file mode 100644 index 000000000..a7948a267 --- /dev/null +++ b/scripts/cutWALUntil/main.go @@ -0,0 +1,65 @@ +/* + cutWALUntil is a small utility for cutting a WAL until the given height + (inclusively). Note it does not include last cs.EndHeightMessage. + + Usage: + cutWALUntil height-to-stop +*/ +package main + +import ( + "fmt" + "io" + "os" + "strconv" + + cs "github.com/tendermint/tendermint/consensus" +) + +func main() { + if len(os.Args) < 4 { + fmt.Println("3 arguments required: ") + os.Exit(1) + } + + var heightToStop uint64 + var err error + if heightToStop, err = strconv.ParseUint(os.Args[2], 10, 64); err != nil { + panic(fmt.Errorf("failed to parse height: %v", err)) + } + + in, err := os.Open(os.Args[1]) + if err != nil { + panic(fmt.Errorf("failed to open input WAL file: %v", err)) + } + defer in.Close() + + out, err := os.Create(os.Args[3]) + if err != nil { + panic(fmt.Errorf("failed to open output WAL file: %v", err)) + } + defer out.Close() + + enc := cs.NewWALEncoder(out) + dec := cs.NewWALDecoder(in) + + for { + msg, err := dec.Decode() + if err == io.EOF { + break + } else if err != nil { + panic(fmt.Errorf("failed to decode msg: %v", err)) + } + + if m, ok := msg.Msg.(cs.EndHeightMessage); ok { + if m.Height == heightToStop { + break + } + } + + err = enc.Encode(msg) + if err != nil { + panic(fmt.Errorf("failed to encode msg: %v", err)) + } + } +} diff --git a/scripts/wal2json/main.go b/scripts/wal2json/main.go new file mode 100644 index 000000000..2cf40c57d --- /dev/null +++ b/scripts/wal2json/main.go @@ -0,0 +1,50 @@ +/* + wal2json converts binary WAL file to JSON. + + Usage: + wal2json +*/ +package main + +import ( + "encoding/json" + "fmt" + "io" + "os" + + cs "github.com/tendermint/tendermint/consensus" +) + +func main() { + if len(os.Args) < 2 { + fmt.Println("missing one argument: ") + os.Exit(1) + } + + f, err := os.Open(os.Args[1]) + if err != nil { + panic(fmt.Errorf("failed to open WAL file: %v", err)) + } + defer f.Close() + + dec := cs.NewWALDecoder(f) + for { + msg, err := dec.Decode() + if err == io.EOF { + break + } else if err != nil { + panic(fmt.Errorf("failed to decode msg: %v", err)) + } + + json, err := json.Marshal(msg) + if err != nil { + panic(fmt.Errorf("failed to marshal msg: %v", err)) + } + + os.Stdout.Write(json) + os.Stdout.Write([]byte("\n")) + if end, ok := msg.Msg.(cs.EndHeightMessage); ok { + os.Stdout.Write([]byte(fmt.Sprintf("ENDHEIGHT %d\n", end.Height))) + } + } +}