mempool: batch txs per peer in broadcastTxRoutine (#5321)

Closes #625
This commit is contained in:
Anton Kaliaev
2020-09-23 14:13:13 +04:00
committed by GitHub
parent 4b99502d5b
commit ffe2742a6c
11 changed files with 238 additions and 147 deletions

View File

@@ -22,22 +22,22 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Tx struct {
Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"`
type Txs struct {
Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"`
}
func (m *Tx) Reset() { *m = Tx{} }
func (m *Tx) String() string { return proto.CompactTextString(m) }
func (*Tx) ProtoMessage() {}
func (*Tx) Descriptor() ([]byte, []int) {
func (m *Txs) Reset() { *m = Txs{} }
func (m *Txs) String() string { return proto.CompactTextString(m) }
func (*Txs) ProtoMessage() {}
func (*Txs) Descriptor() ([]byte, []int) {
return fileDescriptor_2af51926fdbcbc05, []int{0}
}
func (m *Tx) XXX_Unmarshal(b []byte) error {
func (m *Txs) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Tx) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
func (m *Txs) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Tx.Marshal(b, m, deterministic)
return xxx_messageInfo_Txs.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
@@ -47,28 +47,28 @@ func (m *Tx) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return b[:n], nil
}
}
func (m *Tx) XXX_Merge(src proto.Message) {
xxx_messageInfo_Tx.Merge(m, src)
func (m *Txs) XXX_Merge(src proto.Message) {
xxx_messageInfo_Txs.Merge(m, src)
}
func (m *Tx) XXX_Size() int {
func (m *Txs) XXX_Size() int {
return m.Size()
}
func (m *Tx) XXX_DiscardUnknown() {
xxx_messageInfo_Tx.DiscardUnknown(m)
func (m *Txs) XXX_DiscardUnknown() {
xxx_messageInfo_Txs.DiscardUnknown(m)
}
var xxx_messageInfo_Tx proto.InternalMessageInfo
var xxx_messageInfo_Txs proto.InternalMessageInfo
func (m *Tx) GetTx() []byte {
func (m *Txs) GetTxs() [][]byte {
if m != nil {
return m.Tx
return m.Txs
}
return nil
}
type Message struct {
// Types that are valid to be assigned to Sum:
// *Message_Tx
// *Message_Txs
Sum isMessage_Sum `protobuf_oneof:"sum"`
}
@@ -111,11 +111,11 @@ type isMessage_Sum interface {
Size() int
}
type Message_Tx struct {
Tx *Tx `protobuf:"bytes,1,opt,name=tx,proto3,oneof" json:"tx,omitempty"`
type Message_Txs struct {
Txs *Txs `protobuf:"bytes,1,opt,name=txs,proto3,oneof" json:"txs,omitempty"`
}
func (*Message_Tx) isMessage_Sum() {}
func (*Message_Txs) isMessage_Sum() {}
func (m *Message) GetSum() isMessage_Sum {
if m != nil {
@@ -124,9 +124,9 @@ func (m *Message) GetSum() isMessage_Sum {
return nil
}
func (m *Message) GetTx() *Tx {
if x, ok := m.GetSum().(*Message_Tx); ok {
return x.Tx
func (m *Message) GetTxs() *Txs {
if x, ok := m.GetSum().(*Message_Txs); ok {
return x.Txs
}
return nil
}
@@ -134,33 +134,34 @@ func (m *Message) GetTx() *Tx {
// XXX_OneofWrappers is for the internal use of the proto package.
func (*Message) XXX_OneofWrappers() []interface{} {
return []interface{}{
(*Message_Tx)(nil),
(*Message_Txs)(nil),
}
}
func init() {
proto.RegisterType((*Tx)(nil), "tendermint.mempool.Tx")
proto.RegisterType((*Txs)(nil), "tendermint.mempool.Txs")
proto.RegisterType((*Message)(nil), "tendermint.mempool.Message")
}
func init() { proto.RegisterFile("tendermint/mempool/types.proto", fileDescriptor_2af51926fdbcbc05) }
var fileDescriptor_2af51926fdbcbc05 = []byte{
// 175 bytes of a gzipped FileDescriptorProto
// 179 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2b, 0x49, 0xcd, 0x4b,
0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0xcf, 0x4d, 0xcd, 0x2d, 0xc8, 0xcf, 0xcf, 0xd1, 0x2f,
0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x42, 0xc8, 0xeb, 0x41,
0xe5, 0x95, 0x44, 0xb8, 0x98, 0x42, 0x2a, 0x84, 0xf8, 0xb8, 0x98, 0x4a, 0x2a, 0x24, 0x18, 0x15,
0x18, 0x35, 0x78, 0x82, 0x98, 0x4a, 0x2a, 0x94, 0xac, 0xb8, 0xd8, 0x7d, 0x53, 0x8b, 0x8b, 0x13,
0xd3, 0x53, 0x85, 0x34, 0xe0, 0x52, 0xdc, 0x46, 0x62, 0x7a, 0x98, 0x26, 0xe8, 0x85, 0x54, 0x78,
0x30, 0x80, 0x34, 0x39, 0xb1, 0x72, 0x31, 0x17, 0x97, 0xe6, 0x3a, 0x05, 0x9f, 0x78, 0x24, 0xc7,
0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c,
0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x94, 0x65, 0x7a, 0x66, 0x49, 0x46, 0x69, 0x92, 0x5e, 0x72,
0x7e, 0xae, 0x3e, 0x92, 0x53, 0x91, 0x98, 0x60, 0x77, 0xea, 0x63, 0x7a, 0x23, 0x89, 0x0d, 0x2c,
0x63, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x21, 0xe2, 0x11, 0x57, 0xe3, 0x00, 0x00, 0x00,
0xe5, 0x95, 0xc4, 0xb9, 0x98, 0x43, 0x2a, 0x8a, 0x85, 0x04, 0xb8, 0x98, 0x4b, 0x2a, 0x8a, 0x25,
0x18, 0x15, 0x98, 0x35, 0x78, 0x82, 0x40, 0x4c, 0x25, 0x5b, 0x2e, 0x76, 0xdf, 0xd4, 0xe2, 0xe2,
0xc4, 0xf4, 0x54, 0x21, 0x6d, 0x98, 0x24, 0xa3, 0x06, 0xb7, 0x91, 0xb8, 0x1e, 0xa6, 0x29, 0x7a,
0x21, 0x15, 0xc5, 0x1e, 0x0c, 0x60, 0x7d, 0x4e, 0xac, 0x5c, 0xcc, 0xc5, 0xa5, 0xb9, 0x4e, 0xc1,
0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72,
0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x65, 0x99, 0x9e, 0x59, 0x92, 0x51,
0x9a, 0xa4, 0x97, 0x9c, 0x9f, 0xab, 0x8f, 0xe4, 0x60, 0x24, 0x26, 0xd8, 0xb5, 0xfa, 0x98, 0x9e,
0x49, 0x62, 0x03, 0xcb, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xca, 0xc3, 0xa0, 0xfc, 0xe9,
0x00, 0x00, 0x00,
}
func (m *Tx) Marshal() (dAtA []byte, err error) {
func (m *Txs) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
@@ -170,22 +171,24 @@ func (m *Tx) Marshal() (dAtA []byte, err error) {
return dAtA[:n], nil
}
func (m *Tx) MarshalTo(dAtA []byte) (int, error) {
func (m *Txs) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Tx) MarshalToSizedBuffer(dAtA []byte) (int, error) {
func (m *Txs) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Tx) > 0 {
i -= len(m.Tx)
copy(dAtA[i:], m.Tx)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Tx)))
i--
dAtA[i] = 0xa
if len(m.Txs) > 0 {
for iNdEx := len(m.Txs) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Txs[iNdEx])
copy(dAtA[i:], m.Txs[iNdEx])
i = encodeVarintTypes(dAtA, i, uint64(len(m.Txs[iNdEx])))
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
@@ -222,16 +225,16 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *Message_Tx) MarshalTo(dAtA []byte) (int, error) {
func (m *Message_Txs) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Message_Tx) MarshalToSizedBuffer(dAtA []byte) (int, error) {
func (m *Message_Txs) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.Tx != nil {
if m.Txs != nil {
{
size, err := m.Tx.MarshalToSizedBuffer(dAtA[:i])
size, err := m.Txs.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
@@ -254,15 +257,17 @@ func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return base
}
func (m *Tx) Size() (n int) {
func (m *Txs) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Tx)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
if len(m.Txs) > 0 {
for _, b := range m.Txs {
l = len(b)
n += 1 + l + sovTypes(uint64(l))
}
}
return n
}
@@ -279,14 +284,14 @@ func (m *Message) Size() (n int) {
return n
}
func (m *Message_Tx) Size() (n int) {
func (m *Message_Txs) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Tx != nil {
l = m.Tx.Size()
if m.Txs != nil {
l = m.Txs.Size()
n += 1 + l + sovTypes(uint64(l))
}
return n
@@ -298,7 +303,7 @@ func sovTypes(x uint64) (n int) {
func sozTypes(x uint64) (n int) {
return sovTypes(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Tx) Unmarshal(dAtA []byte) error {
func (m *Txs) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
@@ -321,15 +326,15 @@ func (m *Tx) Unmarshal(dAtA []byte) error {
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Tx: wiretype end group for non-group")
return fmt.Errorf("proto: Txs: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Tx: illegal tag %d (wire type %d)", fieldNum, wire)
return fmt.Errorf("proto: Txs: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Tx", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Txs", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
@@ -356,10 +361,8 @@ func (m *Tx) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Tx = append(m.Tx[:0], dAtA[iNdEx:postIndex]...)
if m.Tx == nil {
m.Tx = []byte{}
}
m.Txs = append(m.Txs, make([]byte, postIndex-iNdEx))
copy(m.Txs[len(m.Txs)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
@@ -416,7 +419,7 @@ func (m *Message) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Tx", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Txs", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
@@ -443,11 +446,11 @@ func (m *Message) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &Tx{}
v := &Txs{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Sum = &Message_Tx{v}
m.Sum = &Message_Txs{v}
iNdEx = postIndex
default:
iNdEx = preIndex

View File

@@ -3,12 +3,12 @@ package tendermint.mempool;
option go_package = "github.com/tendermint/tendermint/proto/tendermint/mempool";
message Tx {
bytes tx = 1;
message Txs {
repeated bytes txs = 1;
}
message Message {
oneof sum {
Tx tx = 1;
Txs txs = 1;
}
}