p2p: fix MConnection inbound traffic statistics and rate limiting (#5868) (#5870)

Fixes #5866. Inbound traffic monitoring (and by extension inbound rate limiting) was inadvertently removed in 660e72a.
This commit is contained in:
Erik Grinaker
2021-01-06 16:10:28 +01:00
committed by GitHub
parent 17ce2ccc92
commit 13833cba9e
10 changed files with 110 additions and 58 deletions

View File

@@ -46,7 +46,7 @@ type WriteCloser interface {
}
type Reader interface {
ReadMsg(msg proto.Message) error
ReadMsg(msg proto.Message) (int, error)
}
type ReadCloser interface {
@@ -72,25 +72,28 @@ func getSize(v interface{}) (int, bool) {
}
}
// byteReader wraps an io.Reader and implements io.ByteReader. Reading one byte at a
// time is extremely slow, but this is what Amino did already, and the caller can
// wrap the reader in bufio.Reader if appropriate.
// byteReader wraps an io.Reader and implements io.ByteReader, required by
// binary.ReadUvarint(). Reading one byte at a time is extremely slow, but this
// is what Amino did previously anyway, and the caller can wrap the underlying
// reader in a bufio.Reader if appropriate.
type byteReader struct {
io.Reader
bytes []byte
reader io.Reader
buf []byte
bytesRead int // keeps track of bytes read via ReadByte()
}
func newByteReader(r io.Reader) *byteReader {
return &byteReader{
Reader: r,
bytes: make([]byte, 1),
reader: r,
buf: make([]byte, 1),
}
}
func (r *byteReader) ReadByte() (byte, error) {
_, err := r.Read(r.bytes)
n, err := r.reader.Read(r.buf)
r.bytesRead += n
if err != nil {
return 0, err
return 0x00, err
}
return r.bytes[0], nil
return r.buf[0], nil
}

View File

@@ -39,6 +39,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/test"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/protoio"
)
@@ -47,6 +48,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error {
varint := make([]byte, binary.MaxVarintLen64)
size := 1000
msgs := make([]*test.NinOptNative, size)
lens := make([]int, size)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = test.NewPopulatedNinOptNative(r, true)
@@ -71,6 +73,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error {
if n != len(bz)+visize {
return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize) // nolint
}
lens[i] = n
}
if err := writer.Close(); err != nil {
return err
@@ -78,11 +81,13 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error {
i := 0
for {
msg := &test.NinOptNative{}
if err := reader.ReadMsg(msg); err != nil {
if n, err := reader.ReadMsg(msg); err != nil {
if err == io.EOF {
break
}
return err
} else if n != lens[i] {
return fmt.Errorf("read %v bytes, expected %v", n, lens[i])
}
if err := msg.VerboseEqual(msgs[i]); err != nil {
return err
@@ -116,21 +121,17 @@ func TestVarintNormal(t *testing.T) {
buf := newBuffer()
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
if err := iotest(writer, reader); err != nil {
t.Error(err)
}
if !buf.closed {
t.Fatalf("did not close buffer")
}
err := iotest(writer, reader)
require.NoError(t, err)
require.True(t, buf.closed, "did not close buffer")
}
func TestVarintNoClose(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
if err := iotest(writer, reader); err != nil {
t.Error(err)
}
err := iotest(writer, reader)
require.NoError(t, err)
}
// issue 32
@@ -138,11 +139,8 @@ func TestVarintMaxSize(t *testing.T) {
buf := newBuffer()
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 20)
if err := iotest(writer, reader); err == nil {
t.Error(err)
} else {
t.Logf("%s", err)
}
err := iotest(writer, reader)
require.Error(t, err)
}
func TestVarintError(t *testing.T) {
@@ -150,8 +148,37 @@ func TestVarintError(t *testing.T) {
buf.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f})
reader := protoio.NewDelimitedReader(buf, 1024*1024)
msg := &test.NinOptNative{}
err := reader.ReadMsg(msg)
if err == nil {
t.Fatalf("Expected error")
}
n, err := reader.ReadMsg(msg)
require.Error(t, err)
require.Equal(t, 10, n)
}
func TestVarintTruncated(t *testing.T) {
buf := newBuffer()
buf.Write([]byte{0xff, 0xff})
reader := protoio.NewDelimitedReader(buf, 1024*1024)
msg := &test.NinOptNative{}
n, err := reader.ReadMsg(msg)
require.Error(t, err)
require.Equal(t, 2, n)
}
func TestShort(t *testing.T) {
buf := newBuffer()
varintBuf := make([]byte, binary.MaxVarintLen64)
varintLen := binary.PutUvarint(varintBuf, 100)
_, err := buf.Write(varintBuf[:varintLen])
require.NoError(t, err)
bz, err := proto.Marshal(&test.NinOptNative{Field15: []byte{0x01, 0x02, 0x03}})
require.NoError(t, err)
buf.Write(bz)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
require.NoError(t, err)
msg := &test.NinOptNative{}
n, err := reader.ReadMsg(msg)
require.Error(t, err)
require.Equal(t, varintLen+len(bz), n)
}

View File

@@ -39,41 +39,58 @@ import (
"github.com/gogo/protobuf/proto"
)
// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. Unlike the gogoproto
// NewDelimitedReader, this does not buffer the reader, which may cause poor performance but is
// necessary when only reading single messages (e.g. in the p2p package).
// NewDelimitedReader reads varint-delimited Protobuf messages from a reader.
// Unlike the gogoproto NewDelimitedReader, this does not buffer the reader,
// which may cause poor performance but is necessary when only reading single
// messages (e.g. in the p2p package). It also returns the number of bytes
// read, which is necessary for the p2p package.
func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
var closer io.Closer
if c, ok := r.(io.Closer); ok {
closer = c
}
return &varintReader{newByteReader(r), nil, maxSize, closer}
return &varintReader{r, nil, maxSize, closer}
}
type varintReader struct {
r *byteReader
r io.Reader
buf []byte
maxSize int
closer io.Closer
}
func (r *varintReader) ReadMsg(msg proto.Message) error {
length64, err := binary.ReadUvarint(newByteReader(r.r))
func (r *varintReader) ReadMsg(msg proto.Message) (int, error) {
// ReadUvarint needs an io.ByteReader, and we also need to keep track of the
// number of bytes read, so we use our own byteReader. This can't be
// buffered, so the caller should pass a buffered io.Reader to avoid poor
// performance.
byteReader := newByteReader(r.r)
l, err := binary.ReadUvarint(byteReader)
n := byteReader.bytesRead
if err != nil {
return err
return n, err
}
length := int(length64)
if length < 0 || length > r.maxSize {
return fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize)
// Make sure length doesn't overflow the native int size (e.g. 32-bit),
// and that the returned sum of n+length doesn't overflow either.
length := int(l)
if l >= uint64(^uint(0)>>1) || length < 0 || n+length < 0 {
return n, fmt.Errorf("invalid out-of-range message length %v", l)
}
if length > r.maxSize {
return n, fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize)
}
if len(r.buf) < length {
r.buf = make([]byte, length)
}
buf := r.buf[:length]
if _, err := io.ReadFull(r.r, buf); err != nil {
return err
nr, err := io.ReadFull(r.r, buf)
n += nr
if err != nil {
return n, err
}
return proto.Unmarshal(buf, msg)
return n, proto.Unmarshal(buf, msg)
}
func (r *varintReader) Close() error {
@@ -84,5 +101,6 @@ func (r *varintReader) Close() error {
}
func UnmarshalDelimited(data []byte, msg proto.Message) error {
return NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg)
_, err := NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg)
return err
}