Update master (#4087)

* cs: panic only when WAL#WriteSync fails

- modify WAL#Write and WAL#WriteSync to return an error

* fix test

* types: validate Part#Proof

add ValidateBasic to crypto/merkle/SimpleProof

* cs: limit max bit array size and block parts count

* cs: test new limits

* cs: only assert important stuff

* update changelog and bump version to 0.32.7

* fixes after Ethan's review

* align max wal msg and max consensus msg sizes

* fix tests

* fix test

* add change log for 31.11
This commit is contained in:
Marko
2019-10-30 12:25:58 -07:00
committed by GitHub
parent 51a83f0826
commit 5206ce32a0
19 changed files with 339 additions and 126 deletions

View File

@@ -1468,11 +1468,17 @@ func (m *NewValidBlockMessage) ValidateBasic() error {
if err := m.BlockPartsHeader.ValidateBasic(); err != nil {
return fmt.Errorf("Wrong BlockPartsHeader: %v", err)
}
if m.BlockParts.Size() == 0 {
return errors.New("Empty BlockParts")
}
if m.BlockParts.Size() != m.BlockPartsHeader.Total {
return fmt.Errorf("BlockParts bit array size %d not equal to BlockPartsHeader.Total %d",
m.BlockParts.Size(),
m.BlockPartsHeader.Total)
}
if m.BlockParts.Size() > types.MaxBlockPartsCount {
return errors.Errorf("BlockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount)
}
return nil
}
@@ -1519,6 +1525,9 @@ func (m *ProposalPOLMessage) ValidateBasic() error {
if m.ProposalPOL.Size() == 0 {
return errors.New("Empty ProposalPOL bit array")
}
if m.ProposalPOL.Size() > types.MaxVotesCount {
return errors.Errorf("ProposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount)
}
return nil
}
@@ -1662,6 +1671,9 @@ func (m *VoteSetBitsMessage) ValidateBasic() error {
return fmt.Errorf("Wrong BlockID: %v", err)
}
// NOTE: Votes.Size() can be zero if the node does not have any
if m.Votes.Size() > types.MaxVotesCount {
return fmt.Errorf("Votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount)
}
return nil
}

View File

@@ -19,6 +19,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto/tmhash"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool"
@@ -698,67 +699,82 @@ func TestNewRoundStepMessageValidateBasic(t *testing.T) {
}
func TestNewValidBlockMessageValidateBasic(t *testing.T) {
testBitArray := cmn.NewBitArray(1)
testCases := []struct {
testName string
messageHeight int64
messageRound int
messageBlockParts *cmn.BitArray
expectErr bool
malleateFn func(*NewValidBlockMessage)
expErr string
}{
{"Valid Message", 0, 0, testBitArray, false},
{"Invalid Message", -1, 0, testBitArray, true},
{"Invalid Message", 0, -1, testBitArray, true},
{"Invalid Message", 0, 0, cmn.NewBitArray(0), true},
{func(msg *NewValidBlockMessage) {}, ""},
{func(msg *NewValidBlockMessage) { msg.Height = -1 }, "Negative Height"},
{func(msg *NewValidBlockMessage) { msg.Round = -1 }, "Negative Round"},
{
func(msg *NewValidBlockMessage) { msg.BlockPartsHeader.Total = 2 },
"BlockParts bit array size 1 not equal to BlockPartsHeader.Total 2",
},
{
func(msg *NewValidBlockMessage) { msg.BlockPartsHeader.Total = 0; msg.BlockParts = cmn.NewBitArray(0) },
"Empty BlockParts",
},
{
func(msg *NewValidBlockMessage) { msg.BlockParts = cmn.NewBitArray(types.MaxBlockPartsCount + 1) },
"BlockParts bit array size 1602 not equal to BlockPartsHeader.Total 1",
},
}
for _, tc := range testCases {
for i, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := NewValidBlockMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
BlockParts: tc.messageBlockParts,
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &NewValidBlockMessage{
Height: 1,
Round: 0,
BlockPartsHeader: types.PartSetHeader{
Total: 1,
},
BlockParts: cmn.NewBitArray(1),
}
message.BlockPartsHeader.Total = 1
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestProposalPOLMessageValidateBasic(t *testing.T) {
testBitArray := cmn.NewBitArray(1)
testCases := []struct {
testName string
messageHeight int64
messageProposalPOLRound int
messageProposalPOL *cmn.BitArray
expectErr bool
malleateFn func(*ProposalPOLMessage)
expErr string
}{
{"Valid Message", 0, 0, testBitArray, false},
{"Invalid Message", -1, 0, testBitArray, true},
{"Invalid Message", 0, -1, testBitArray, true},
{"Invalid Message", 0, 0, cmn.NewBitArray(0), true},
{func(msg *ProposalPOLMessage) {}, ""},
{func(msg *ProposalPOLMessage) { msg.Height = -1 }, "Negative Height"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOLRound = -1 }, "Negative ProposalPOLRound"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOL = cmn.NewBitArray(0) }, "Empty ProposalPOL bit array"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOL = cmn.NewBitArray(types.MaxVotesCount + 1) },
"ProposalPOL bit array is too big: 10001, max: 10000"},
}
for _, tc := range testCases {
for i, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := ProposalPOLMessage{
Height: tc.messageHeight,
ProposalPOLRound: tc.messageProposalPOLRound,
ProposalPOL: tc.messageProposalPOL,
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &ProposalPOLMessage{
Height: 1,
ProposalPOLRound: 1,
ProposalPOL: cmn.NewBitArray(1),
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestBlockPartMessageValidateBasic(t *testing.T) {
testPart := new(types.Part)
testPart.Proof.LeafHash = tmhash.Sum([]byte("leaf"))
testCases := []struct {
testName string
messageHeight int64
@@ -872,49 +888,43 @@ func TestVoteSetMaj23MessageValidateBasic(t *testing.T) {
}
func TestVoteSetBitsMessageValidateBasic(t *testing.T) {
const (
validSignedMsgType types.SignedMsgType = 0x01
invalidSignedMsgType types.SignedMsgType = 0x03
)
validBlockID := types.BlockID{}
invalidBlockID := types.BlockID{
Hash: cmn.HexBytes{},
PartsHeader: types.PartSetHeader{
Total: -1,
Hash: cmn.HexBytes{},
},
}
testBitArray := cmn.NewBitArray(1)
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int
messageHeight int64
testName string
messageType types.SignedMsgType
messageBlockID types.BlockID
messageVotes *cmn.BitArray
malleateFn func(*VoteSetBitsMessage)
expErr string
}{
{false, 0, 0, "Valid Message", validSignedMsgType, validBlockID, testBitArray},
{true, -1, 0, "Invalid Message", validSignedMsgType, validBlockID, testBitArray},
{true, 0, -1, "Invalid Message", validSignedMsgType, validBlockID, testBitArray},
{true, 0, 0, "Invalid Message", invalidSignedMsgType, validBlockID, testBitArray},
{true, 0, 0, "Invalid Message", validSignedMsgType, invalidBlockID, testBitArray},
{func(msg *VoteSetBitsMessage) {}, ""},
{func(msg *VoteSetBitsMessage) { msg.Height = -1 }, "Negative Height"},
{func(msg *VoteSetBitsMessage) { msg.Round = -1 }, "Negative Round"},
{func(msg *VoteSetBitsMessage) { msg.Type = 0x03 }, "Invalid Type"},
{func(msg *VoteSetBitsMessage) {
msg.BlockID = types.BlockID{
Hash: cmn.HexBytes{},
PartsHeader: types.PartSetHeader{
Total: -1,
Hash: cmn.HexBytes{},
},
}
}, "Wrong BlockID: Wrong PartsHeader: Negative Total"},
{func(msg *VoteSetBitsMessage) { msg.Votes = cmn.NewBitArray(types.MaxVotesCount + 1) },
"Votes bit array is too big: 10001, max: 10000"},
}
for _, tc := range testCases {
for i, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := VoteSetBitsMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Type: tc.messageType,
// Votes: tc.messageVotes,
BlockID: tc.messageBlockID,
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &VoteSetBitsMessage{
Height: 1,
Round: 0,
Type: 0x01,
Votes: cmn.NewBitArray(1),
BlockID: types.BlockID{},
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}

View File

@@ -238,15 +238,15 @@ func (e ReachedHeightToStopError) Error() string {
// Write simulate WAL's crashing by sending an error to the panicCh and then
// exiting the cs.receiveRoutine.
func (w *crashingWAL) Write(m WALMessage) {
func (w *crashingWAL) Write(m WALMessage) error {
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit()
} else {
w.next.Write(m)
return nil
}
return
return w.next.Write(m)
}
if w.msgIndex > w.lastPanickedForMsgIndex {
@@ -254,14 +254,15 @@ func (w *crashingWAL) Write(m WALMessage) {
_, file, line, _ := runtime.Caller(1)
w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
runtime.Goexit()
} else {
w.msgIndex++
w.next.Write(m)
return nil
}
w.msgIndex++
return w.next.Write(m)
}
func (w *crashingWAL) WriteSync(m WALMessage) {
w.Write(m)
func (w *crashingWAL) WriteSync(m WALMessage) error {
return w.Write(m)
}
func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() }

View File

@@ -641,7 +641,10 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
case mi = <-cs.internalMsgQueue:
cs.wal.WriteSync(mi) // NOTE: fsync
err := cs.wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
}
if _, ok := mi.Msg.(*VoteMessage); ok {
// we actually want to simulate failing during
@@ -1406,7 +1409,10 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Either way, the ConsensusState should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync
endMsg := EndHeightMessage{height}
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", endMsg, err))
}
fail.Fail() // XXX

View File

@@ -19,8 +19,8 @@ import (
)
const (
// must be greater than types.BlockPartSizeBytes + a few bytes
maxMsgSizeBytes = 1024 * 1024 // 1MB
// amino overhead + time.Time + max consensus msg size
maxMsgSizeBytes = maxMsgSize + 24
// how often the WAL should be sync'd during period sync'ing
walDefaultFlushInterval = 2 * time.Second
@@ -29,8 +29,9 @@ const (
//--------------------------------------------------------
// types and functions for savings consensus messages
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"` // for debugging purposes
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}
@@ -55,8 +56,8 @@ func RegisterWALMessages(cdc *amino.Codec) {
// WAL is an interface for any write-ahead logger.
type WAL interface {
Write(WALMessage)
WriteSync(WALMessage)
Write(WALMessage) error
WriteSync(WALMessage) error
FlushAndSync() error
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
@@ -174,29 +175,39 @@ func (wal *baseWAL) Wait() {
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *baseWAL) Write(msg WALMessage) {
func (wal *baseWAL) Write(msg WALMessage) error {
if wal == nil {
return
return nil
}
// Write the wal message
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
panic(fmt.Sprintf("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
}
return nil
}
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *baseWAL) WriteSync(msg WALMessage) {
func (wal *baseWAL) WriteSync(msg WALMessage) error {
if wal == nil {
return
return nil
}
wal.Write(msg)
if err := wal.FlushAndSync(); err != nil {
panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err))
if err := wal.Write(msg); err != nil {
return err
}
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted",
"err", err)
return err
}
return nil
}
// WALSearchOptions are optional arguments to SearchForEndHeight.
@@ -287,7 +298,7 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
crc := crc32.Checksum(data, crc32c)
length := uint32(len(data))
if length > maxMsgSizeBytes {
return fmt.Errorf("Msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
}
totalLength := 8 + int(length)
@@ -297,7 +308,6 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
copy(msg[8:], data)
_, err := enc.wr.Write(msg)
return err
}
@@ -388,9 +398,9 @@ type nilWAL struct{}
var _ WAL = nilWAL{}
func (nilWAL) Write(m WALMessage) {}
func (nilWAL) WriteSync(m WALMessage) {}
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) Write(m WALMessage) error { return nil }
func (nilWAL) WriteSync(m WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}

View File

@@ -169,10 +169,10 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Write(m WALMessage) {
func (w *byteBufferWAL) Write(m WALMessage) error {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return
return nil
}
if endMsg, ok := m.(EndHeightMessage); ok {
@@ -181,7 +181,7 @@ func (w *byteBufferWAL) Write(m WALMessage) {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
w.signalWhenStopsTo <- struct{}{}
w.stopped = true
return
return nil
}
}
@@ -190,10 +190,12 @@ func (w *byteBufferWAL) Write(m WALMessage) {
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
}
return nil
}
func (w *byteBufferWAL) WriteSync(m WALMessage) {
w.Write(m)
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
return w.Write(m)
}
func (w *byteBufferWAL) FlushAndSync() error { return nil }

View File

@@ -11,14 +11,15 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@@ -103,7 +104,7 @@ func TestWALEncoderDecoder(t *testing.T) {
}
}
func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
func TestWALWrite(t *testing.T) {
walDir, err := ioutil.TempDir("", "wal")
require.NoError(t, err)
defer os.RemoveAll(walDir)
@@ -120,7 +121,24 @@ func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
wal.Wait()
}()
assert.Panics(t, func() { wal.Write(make([]byte, maxMsgSizeBytes+1)) })
// 1) Write returns an error if msg is too big
msg := &BlockPartMessage{
Height: 1,
Round: 1,
Part: &tmtypes.Part{
Index: 1,
Bytes: make([]byte, 1),
Proof: merkle.SimpleProof{
Total: 1,
Index: 1,
LeafHash: make([]byte, maxMsgSizeBytes-30),
},
},
}
err = wal.Write(msg)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "msg is too big")
}
}
func TestWALSearchForEndHeight(t *testing.T) {