From 61c5791fa3aa0d16451285f705f48526d26ce7be Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 29 Jun 2018 12:17:26 +0400 Subject: [PATCH] revert back to Jae's original payload size limit except now we calculate the max size using the maxPacketMsgSize() function, which frees developers from having to know amino encoding details. plus, 10 additional bytes are added to leave the room for amino upgrades (both making it more efficient / less efficient) --- benchmarks/codec_test.go | 2 +- config/config.go | 39 +++++++------- config/toml.go | 5 +- crypto/merkle/doc.go | 2 +- crypto/merkle/simple_tree.go | 8 +-- crypto/merkle/simple_tree_test.go | 2 +- docs/running-in-production.md | 2 +- docs/specification/configuration.md | 2 +- evidence/wire.go | 2 +- p2p/conn/connection.go | 81 +++++++++++++---------------- p2p/conn/connection_test.go | 4 +- p2p/switch.go | 2 +- privval/priv_validator.go | 2 +- privval/priv_validator_test.go | 2 +- rpc/core/pipe.go | 2 +- 15 files changed, 73 insertions(+), 84 deletions(-) diff --git a/benchmarks/codec_test.go b/benchmarks/codec_test.go index ee61cc9a4..53cbf632c 100644 --- a/benchmarks/codec_test.go +++ b/benchmarks/codec_test.go @@ -6,8 +6,8 @@ import ( "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/crypto" proto "github.com/tendermint/tendermint/benchmarks/proto" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" ) diff --git a/config/config.go b/config/config.go index f5361c22b..e01819305 100644 --- a/config/config.go +++ b/config/config.go @@ -299,9 +299,8 @@ type P2PConfig struct { // Time to wait before flushing messages out on the connection, in ms FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"` - // Maximum size of a message packet, in bytes - // Includes a header, which is ~13 bytes - MaxPacketMsgSize int `mapstructure:"max_packet_msg_size"` + // Maximum size of a message packet payload, in bytes + MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"` // Rate at which packets can be sent, in bytes/second SendRate int64 `mapstructure:"send_rate"` @@ -340,23 +339,23 @@ type P2PConfig struct { // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:26656", - UPNP: false, - AddrBook: defaultAddrBookPath, - AddrBookStrict: true, - MaxNumPeers: 50, - FlushThrottleTimeout: 100, - MaxPacketMsgSize: 1024, // 1 kB - SendRate: 512000, // 500 kB/s - RecvRate: 512000, // 500 kB/s - PexReactor: true, - SeedMode: false, - AllowDuplicateIP: true, // so non-breaking yet - HandshakeTimeout: 20 * time.Second, - DialTimeout: 3 * time.Second, - TestDialFail: false, - TestFuzz: false, - TestFuzzConfig: DefaultFuzzConnConfig(), + ListenAddress: "tcp://0.0.0.0:26656", + UPNP: false, + AddrBook: defaultAddrBookPath, + AddrBookStrict: true, + MaxNumPeers: 50, + FlushThrottleTimeout: 100, + MaxPacketMsgPayloadSize: 1024, // 1 kB + SendRate: 512000, // 500 kB/s + RecvRate: 512000, // 500 kB/s + PexReactor: true, + SeedMode: false, + AllowDuplicateIP: true, // so non-breaking yet + HandshakeTimeout: 20 * time.Second, + DialTimeout: 3 * time.Second, + TestDialFail: false, + TestFuzz: false, + TestFuzzConfig: DefaultFuzzConnConfig(), } } diff --git a/config/toml.go b/config/toml.go index b3745d3c8..4569291d4 100644 --- a/config/toml.go +++ b/config/toml.go @@ -164,9 +164,8 @@ flush_throttle_timeout = {{ .P2P.FlushThrottleTimeout }} # Maximum number of peers to connect to max_num_peers = {{ .P2P.MaxNumPeers }} -# Maximum size of a message packet, in bytes -# Includes a header, which is ~13 bytes -max_packet_msg_size = {{ .P2P.MaxPacketMsgSize }} +# Maximum size of a message packet payload, in bytes +max_packet_msg_payload_size = {{ .P2P.MaxPacketMsgPayloadSize }} # Rate at which packets can be sent, in bytes/second send_rate = {{ .P2P.SendRate }} diff --git a/crypto/merkle/doc.go b/crypto/merkle/doc.go index da65dd858..865c30217 100644 --- a/crypto/merkle/doc.go +++ b/crypto/merkle/doc.go @@ -28,4 +28,4 @@ https://bitcointalk.org/?topic=102395 TODO(ismail): add 2nd pre-image protection or clarify further on how we use this and why this secure. */ -package merkle \ No newline at end of file +package merkle diff --git a/crypto/merkle/simple_tree.go b/crypto/merkle/simple_tree.go index 35a6eaa7c..46a075909 100644 --- a/crypto/merkle/simple_tree.go +++ b/crypto/merkle/simple_tree.go @@ -9,12 +9,12 @@ func SimpleHashFromTwoHashes(left, right []byte) []byte { var hasher = tmhash.New() err := encodeByteSlice(hasher, left) if err != nil { - panic(err) - } + panic(err) + } err = encodeByteSlice(hasher, right) if err != nil { - panic(err) - } + panic(err) + } return hasher.Sum(nil) } diff --git a/crypto/merkle/simple_tree_test.go b/crypto/merkle/simple_tree_test.go index a721bccea..6eef93623 100644 --- a/crypto/merkle/simple_tree_test.go +++ b/crypto/merkle/simple_tree_test.go @@ -6,8 +6,8 @@ import ( cmn "github.com/tendermint/tmlibs/common" . "github.com/tendermint/tmlibs/test" - "testing" "github.com/tendermint/tendermint/crypto/tmhash" + "testing" ) type testItem []byte diff --git a/docs/running-in-production.md b/docs/running-in-production.md index 3ceded499..225a97853 100644 --- a/docs/running-in-production.md +++ b/docs/running-in-production.md @@ -21,7 +21,7 @@ to prevent Denial-of-service attacks. You can read more about it ### P2P The core of the Tendermint peer-to-peer system is `MConnection`. Each -connection has `MaxPacketMsgSize`, which is the maximum packet +connection has `MaxPacketMsgPayloadSize`, which is the maximum packet size and bounded send & receive queues. One can impose restrictions on send & receive rate per connection (`SendRate`, `RecvRate`). diff --git a/docs/specification/configuration.md b/docs/specification/configuration.md index 9ceb8be72..214757b9f 100644 --- a/docs/specification/configuration.md +++ b/docs/specification/configuration.md @@ -119,7 +119,7 @@ flush_throttle_timeout = 100 max_num_peers = 50 # Maximum size of a message packet payload, in bytes -max_msg_packet_payload_size = 1024 +max_packet_msg_payload_size = 1024 # Rate at which packets can be sent, in bytes/second send_rate = 512000 diff --git a/evidence/wire.go b/evidence/wire.go index d4db37c54..fb3a177cc 100644 --- a/evidence/wire.go +++ b/evidence/wire.go @@ -2,8 +2,8 @@ package evidence import ( "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/types" ) var cdc = amino.NewCodec() diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 1392c8051..b19a1ca19 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -18,7 +18,7 @@ import ( ) const ( - defaultMaxPacketMsgSize = 1024 + defaultMaxPacketMsgPayloadSize = 1024 numBatchPacketMsgs = 10 minReadBufferSize = 1024 @@ -96,7 +96,7 @@ type MConnection struct { created time.Time // time of creation - emptyPacketMsgSize int + _maxPacketMsgSize int } // MConnConfig is a MConnection configuration. @@ -105,7 +105,7 @@ type MConnConfig struct { RecvRate int64 `mapstructure:"recv_rate"` // Maximum payload size - MaxPacketMsgSize int `mapstructure:"max_packet_msg_size"` + MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"` // Interval to flush writes (throttled) FlushThrottle time.Duration `mapstructure:"flush_throttle"` @@ -120,12 +120,12 @@ type MConnConfig struct { // DefaultMConnConfig returns the default config. func DefaultMConnConfig() MConnConfig { return MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, - MaxPacketMsgSize: defaultMaxPacketMsgSize, - FlushThrottle: defaultFlushThrottle, - PingInterval: defaultPingInterval, - PongTimeout: defaultPongTimeout, + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize, + FlushThrottle: defaultFlushThrottle, + PingInterval: defaultPingInterval, + PongTimeout: defaultPongTimeout, } } @@ -146,17 +146,16 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec } mconn := &MConnection{ - conn: conn, - bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), - bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - sendMonitor: flow.New(0, 0), - recvMonitor: flow.New(0, 0), - send: make(chan struct{}, 1), - pong: make(chan struct{}, 1), - onReceive: onReceive, - onError: onError, - config: config, - emptyPacketMsgSize: emptyPacketMsgSize(config.MaxPacketMsgSize), + conn: conn, + bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), + bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), + sendMonitor: flow.New(0, 0), + recvMonitor: flow.New(0, 0), + send: make(chan struct{}, 1), + pong: make(chan struct{}, 1), + onReceive: onReceive, + onError: onError, + config: config, } // Create channels @@ -173,6 +172,9 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn) + // maxPacketMsgSize() is a bit heavy, so call just once + mconn._maxPacketMsgSize = mconn.maxPacketMsgSize() + return mconn } @@ -397,7 +399,7 @@ func (c *MConnection) sendSomePacketMsgs() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(c.config.MaxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) + c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some PacketMsgs. for i := 0; i < numBatchPacketMsgs; i++ { @@ -455,7 +457,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(c.config.MaxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) + c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) // Peek into bufConnReader for debugging /* @@ -475,7 +477,7 @@ FOR_LOOP: var packet Packet var _n int64 var err error - _n, err = cdc.UnmarshalBinaryReader(c.bufConnReader, &packet, int64(c.config.MaxPacketMsgSize)) + _n, err = cdc.UnmarshalBinaryReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize)) c.recvMonitor.Update(int(_n)) if err != nil { if c.IsRunning() { @@ -548,6 +550,16 @@ func (c *MConnection) stopPongTimer() { } } +// maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead +// of amino encoding. +func (c *MConnection) maxPacketMsgSize() int { + return len(cdc.MustMarshalBinary(PacketMsg{ + ChannelID: 0x01, + EOF: 1, + Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize), + })) + 10 // leave room for changes in amino +} + type ConnectionStatus struct { Duration time.Duration SendMonitor flow.Status @@ -631,7 +643,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { desc: desc, sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), - maxPacketMsgPayloadSize: conn.config.MaxPacketMsgSize - conn.emptyPacketMsgSize, + maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize, } } @@ -780,24 +792,3 @@ type PacketMsg struct { func (mp PacketMsg) String() string { return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF) } - -// - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes -// (as long as it's less than 16,384 bytes) -// - Prefix bytes = 4 bytes -// - ChannelID field key + byte = 2 bytes -// - EOF field key + byte = 2 bytes -// - Bytes field key = 1 bytes -// - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes -// = up to 13 bytes overhead for the packet. - -func emptyPacketMsgSize(maxPayloadSize int) int { - emptyPacketMsgSize := len(cdc.MustMarshalBinary(PacketMsg{ - ChannelID: 0x01, - EOF: 1, - Bytes: make([]byte, maxPayloadSize), - })) - // -1 byte of data - // +1 byte because uvarint length of MustMarshalBinary(bytes) will be 2 bytes for big packets - // +1 byte because uvarint length of MustMarshalBinary(packet) will be 2 bytes for big packets - return emptyPacketMsgSize - maxPayloadSize + 1 + 1 -} diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 6e4c1a090..8006b37a8 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -426,7 +426,7 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) { var packet = PacketMsg{ ChannelID: 0x01, EOF: 1, - Bytes: make([]byte, mconnClient.config.MaxPacketMsgSize-emptyPacketMsgSize(mconnClient.config.MaxPacketMsgSize)), + Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize), } _, err = cdc.MarshalBinaryWriter(buf, packet) assert.Nil(t, err) @@ -440,7 +440,7 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) { packet = PacketMsg{ ChannelID: 0x01, EOF: 1, - Bytes: make([]byte, mconnClient.config.MaxPacketMsgSize+1), + Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100), } _, err = cdc.MarshalBinaryWriter(buf, packet) assert.Nil(t, err) diff --git a/p2p/switch.go b/p2p/switch.go index ae322b54f..bf5f9747f 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -100,7 +100,7 @@ func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch { mConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond mConfig.SendRate = cfg.SendRate mConfig.RecvRate = cfg.RecvRate - mConfig.MaxPacketMsgSize = cfg.MaxPacketMsgSize + mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize sw.mConfig = mConfig diff --git a/privval/priv_validator.go b/privval/priv_validator.go index 9f02482ab..8a54b5ccf 100644 --- a/privval/priv_validator.go +++ b/privval/priv_validator.go @@ -287,7 +287,7 @@ func (pv *FilePV) saveSigned(height int64, round int, step int8, func (pv *FilePV) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error { pv.mtx.Lock() defer pv.mtx.Unlock() - sig, err:= pv.PrivKey.Sign(heartbeat.SignBytes(chainID)) + sig, err := pv.PrivKey.Sign(heartbeat.SignBytes(chainID)) if err != nil { return err } diff --git a/privval/priv_validator_test.go b/privval/priv_validator_test.go index 4fc8f97fc..345b51438 100644 --- a/privval/priv_validator_test.go +++ b/privval/priv_validator_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" ) diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 9fcb75e19..bf32c9c66 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -3,8 +3,8 @@ package core import ( "time" - crypto "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/consensus" + crypto "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state"