mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-31 11:26:20 +00:00
cs: panic only when WAL#WriteSync fails
- modify WAL#Write and WAL#WriteSync to return an error
This commit is contained in:
committed by
Jack Zampolin
parent
c4ba93a1e6
commit
470a23f9b4
@@ -228,15 +228,15 @@ func (e ReachedHeightToStopError) Error() string {
|
||||
|
||||
// Write simulate WAL's crashing by sending an error to the panicCh and then
|
||||
// exiting the cs.receiveRoutine.
|
||||
func (w *crashingWAL) Write(m WALMessage) {
|
||||
func (w *crashingWAL) Write(m WALMessage) error {
|
||||
if endMsg, ok := m.(EndHeightMessage); ok {
|
||||
if endMsg.Height == w.heightToStop {
|
||||
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
|
||||
runtime.Goexit()
|
||||
} else {
|
||||
w.next.Write(m)
|
||||
return nil
|
||||
}
|
||||
return
|
||||
|
||||
return w.next.Write(m)
|
||||
}
|
||||
|
||||
if w.msgIndex > w.lastPanickedForMsgIndex {
|
||||
@@ -244,14 +244,15 @@ func (w *crashingWAL) Write(m WALMessage) {
|
||||
_, file, line, _ := runtime.Caller(1)
|
||||
w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
|
||||
runtime.Goexit()
|
||||
} else {
|
||||
w.msgIndex++
|
||||
w.next.Write(m)
|
||||
return nil
|
||||
}
|
||||
|
||||
w.msgIndex++
|
||||
return w.next.Write(m)
|
||||
}
|
||||
|
||||
func (w *crashingWAL) WriteSync(m WALMessage) {
|
||||
w.Write(m)
|
||||
func (w *crashingWAL) WriteSync(m WALMessage) error {
|
||||
return w.Write(m)
|
||||
}
|
||||
|
||||
func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() }
|
||||
|
||||
@@ -632,7 +632,10 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
||||
// may generate internal events (votes, complete proposals, 2/3 majorities)
|
||||
cs.handleMsg(mi)
|
||||
case mi = <-cs.internalMsgQueue:
|
||||
cs.wal.WriteSync(mi) // NOTE: fsync
|
||||
err := cs.wal.WriteSync(mi) // NOTE: fsync
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
|
||||
}
|
||||
|
||||
if _, ok := mi.Msg.(*VoteMessage); ok {
|
||||
// we actually want to simulate failing during
|
||||
@@ -1329,7 +1332,10 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
|
||||
// Either way, the ConsensusState should not be resumed until we
|
||||
// successfully call ApplyBlock (ie. later here, or in Handshake after
|
||||
// restart).
|
||||
cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync
|
||||
me := EndHeightMessage{height}
|
||||
if err := cs.wal.WriteSync(me); err != nil { // NOTE: fsync
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", me, err))
|
||||
}
|
||||
|
||||
fail.Fail() // XXX
|
||||
|
||||
|
||||
@@ -29,8 +29,9 @@ const (
|
||||
//--------------------------------------------------------
|
||||
// types and functions for savings consensus messages
|
||||
|
||||
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
|
||||
type TimedWALMessage struct {
|
||||
Time time.Time `json:"time"` // for debugging purposes
|
||||
Time time.Time `json:"time"`
|
||||
Msg WALMessage `json:"msg"`
|
||||
}
|
||||
|
||||
@@ -55,8 +56,8 @@ func RegisterWALMessages(cdc *amino.Codec) {
|
||||
|
||||
// WAL is an interface for any write-ahead logger.
|
||||
type WAL interface {
|
||||
Write(WALMessage)
|
||||
WriteSync(WALMessage)
|
||||
Write(WALMessage) error
|
||||
WriteSync(WALMessage) error
|
||||
FlushAndSync() error
|
||||
|
||||
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
|
||||
@@ -174,29 +175,39 @@ func (wal *baseWAL) Wait() {
|
||||
// Write is called in newStep and for each receive on the
|
||||
// peerMsgQueue and the timeoutTicker.
|
||||
// NOTE: does not call fsync()
|
||||
func (wal *baseWAL) Write(msg WALMessage) {
|
||||
func (wal *baseWAL) Write(msg WALMessage) error {
|
||||
if wal == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write the wal message
|
||||
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
|
||||
panic(fmt.Sprintf("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
|
||||
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
|
||||
"err", err, "msg", msg)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteSync is called when we receive a msg from ourselves
|
||||
// so that we write to disk before sending signed messages.
|
||||
// NOTE: calls fsync()
|
||||
func (wal *baseWAL) WriteSync(msg WALMessage) {
|
||||
func (wal *baseWAL) WriteSync(msg WALMessage) error {
|
||||
if wal == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
wal.Write(msg)
|
||||
if err := wal.FlushAndSync(); err != nil {
|
||||
panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err))
|
||||
if err := wal.Write(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := wal.FlushAndSync(); err != nil {
|
||||
wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted",
|
||||
"err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WALSearchOptions are optional arguments to SearchForEndHeight.
|
||||
@@ -285,7 +296,7 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
|
||||
crc := crc32.Checksum(data, crc32c)
|
||||
length := uint32(len(data))
|
||||
if length > maxMsgSizeBytes {
|
||||
return fmt.Errorf("Msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
|
||||
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
|
||||
}
|
||||
totalLength := 8 + int(length)
|
||||
|
||||
@@ -295,7 +306,6 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
|
||||
copy(msg[8:], data)
|
||||
|
||||
_, err := enc.wr.Write(msg)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -383,9 +393,9 @@ type nilWAL struct{}
|
||||
|
||||
var _ WAL = nilWAL{}
|
||||
|
||||
func (nilWAL) Write(m WALMessage) {}
|
||||
func (nilWAL) WriteSync(m WALMessage) {}
|
||||
func (nilWAL) FlushAndSync() error { return nil }
|
||||
func (nilWAL) Write(m WALMessage) error { return nil }
|
||||
func (nilWAL) WriteSync(m WALMessage) error { return nil }
|
||||
func (nilWAL) FlushAndSync() error { return nil }
|
||||
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
@@ -168,10 +168,10 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
|
||||
// Save writes message to the internal buffer except when heightToStop is
|
||||
// reached, in which case it will signal the caller via signalWhenStopsTo and
|
||||
// skip writing.
|
||||
func (w *byteBufferWAL) Write(m WALMessage) {
|
||||
func (w *byteBufferWAL) Write(m WALMessage) error {
|
||||
if w.stopped {
|
||||
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if endMsg, ok := m.(EndHeightMessage); ok {
|
||||
@@ -180,7 +180,7 @@ func (w *byteBufferWAL) Write(m WALMessage) {
|
||||
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
|
||||
w.signalWhenStopsTo <- struct{}{}
|
||||
w.stopped = true
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,10 +189,12 @@ func (w *byteBufferWAL) Write(m WALMessage) {
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to encode the msg %v", m))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *byteBufferWAL) WriteSync(m WALMessage) {
|
||||
w.Write(m)
|
||||
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
|
||||
return w.Write(m)
|
||||
}
|
||||
|
||||
func (w *byteBufferWAL) FlushAndSync() error { return nil }
|
||||
|
||||
@@ -11,14 +11,15 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/libs/autofile"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -103,7 +104,7 @@ func TestWALEncoderDecoder(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
|
||||
func TestWALWrite(t *testing.T) {
|
||||
walDir, err := ioutil.TempDir("", "wal")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(walDir)
|
||||
@@ -120,7 +121,24 @@ func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
|
||||
wal.Wait()
|
||||
}()
|
||||
|
||||
assert.Panics(t, func() { wal.Write(make([]byte, maxMsgSizeBytes+1)) })
|
||||
// 1) Write returns an error if msg is too big
|
||||
msg := &BlockPartMessage{
|
||||
Height: 1,
|
||||
Round: 1,
|
||||
Part: &tmtypes.Part{
|
||||
Index: 1,
|
||||
Bytes: make([]byte, 1),
|
||||
Proof: merkle.SimpleProof{
|
||||
Total: 1,
|
||||
Index: 1,
|
||||
LeafHash: make([]byte, maxMsgSizeBytes-30),
|
||||
},
|
||||
},
|
||||
}
|
||||
err = wal.Write(msg)
|
||||
if assert.Error(t, err) {
|
||||
assert.Equal(t, "msg is too big: 1048593 bytes, max: 1048576 bytes", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestWALSearchForEndHeight(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user