proto: move mempool to proto (#4940)

* proto: move mempool to proto

- changes according to moving the mempool reactor to proto

Signed-off-by: Marko Baricevic <marbar3778@yahoo.com>

Closes: #2883
This commit is contained in:
Marko
2020-06-03 07:30:52 +02:00
committed by GitHub
parent 26bea83694
commit 0da7d87351
11 changed files with 62 additions and 167 deletions

View File

@@ -234,7 +234,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
return err
}
// The size of the corresponding amino-encoded TxMessage
// The size of the corresponding TxMessage
// can't be larger than the maxMsgSize, otherwise we can't
// relay it to peers.
if txSize > mem.config.MaxTxBytes {
@@ -518,11 +518,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
// Check total size requirement
aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1)
if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes {
if maxBytes > -1 && totalBytes+int64(len(memTx.tx)) > maxBytes {
return txs
}
totalBytes += int64(len(memTx.tx)) + aminoOverhead
totalBytes += int64(len(memTx.tx))
// Check total gas requirement.
// If maxGas is negative, skip this check.
// Since newTotalGas < masGas, which

View File

@@ -12,11 +12,11 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore"
abciserver "github.com/tendermint/tendermint/abci/server"
@@ -107,7 +107,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
mempool.Flush()
// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
// each tx has 20 bytes
tests := []struct {
numTxsToCreate int
maxBytes int64
@@ -121,11 +121,11 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
{20, 0, -1, 0},
{20, 0, 10, 0},
{20, 10, 10, 0},
{20, 22, 10, 1},
{20, 220, -1, 10},
{20, 220, 5, 5},
{20, 220, 10, 10},
{20, 220, 15, 10},
{20, 20, 10, 1},
{20, 100, 5, 5},
{20, 200, -1, 10},
{20, 200, 10, 10},
{20, 200, 15, 10},
{20, 20000, -1, 20},
{20, 20000, 5, 5},
{20, 20000, 30, 20},
@@ -150,7 +150,7 @@ func TestMempoolFilters(t *testing.T) {
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }
// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
// each tx has 20 bytes
tests := []struct {
numTxsToCreate int
preFilter PreCheckFunc
@@ -158,17 +158,16 @@ func TestMempoolFilters(t *testing.T) {
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, PreCheckAminoMaxBytes(10), nopPostFilter, 0},
{10, PreCheckAminoMaxBytes(20), nopPostFilter, 0},
{10, PreCheckAminoMaxBytes(22), nopPostFilter, 10},
{10, PreCheckMaxBytes(10), nopPostFilter, 0},
{10, PreCheckMaxBytes(20), nopPostFilter, 10},
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
{10, nopPreFilter, PostCheckMaxGas(0), 0},
{10, nopPreFilter, PostCheckMaxGas(1), 10},
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
{10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10},
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
{10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckMaxBytes(20), PostCheckMaxGas(1), 10},
{10, PreCheckMaxBytes(20), PostCheckMaxGas(0), 0},
}
for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
@@ -414,13 +413,6 @@ func TestMempoolCloseWAL(t *testing.T) {
require.Equal(t, 1, len(m3), "expecting the wal match in")
}
// Size of the amino encoded TxMessage is the length of the
// encoded byte array, plus 1 for the struct field, plus 4
// for the amino prefix.
func txMessageSize(tx types.Tx) int {
return amino.ByteSliceSize(tx) + 1 + 4
}
func TestMempoolMaxMsgSize(t *testing.T) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
@@ -458,18 +450,18 @@ func TestMempoolMaxMsgSize(t *testing.T) {
tx := tmrand.Bytes(testCase.len)
err := mempl.CheckTx(tx, nil, TxInfo{})
msg := &TxMessage{tx}
encoded := cdc.MustMarshalBinaryBare(msg)
require.Equal(t, len(encoded), txMessageSize(tx), caseString)
bv := gogotypes.BytesValue{Value: tx}
bz, err2 := bv.Marshal()
require.NoError(t, err2)
require.Equal(t, len(bz), proto.Size(&bv), caseString)
if !testCase.err {
require.True(t, len(encoded) <= maxMsgSize, caseString)
require.True(t, len(bz) <= maxMsgSize, caseString)
require.NoError(t, err, caseString)
} else {
require.True(t, len(encoded) > maxMsgSize, caseString)
require.True(t, len(bz) > maxMsgSize, caseString)
require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString)
}
}
}
func TestMempoolTxsBytes(t *testing.T) {

View File

@@ -1,11 +0,0 @@
package mempool
import (
amino "github.com/tendermint/go-amino"
)
var cdc = amino.NewCodec()
func init() {
RegisterMessages(cdc)
}

View File

@@ -102,19 +102,12 @@ type TxInfo struct {
//--------------------------------------------------------------------------------
// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino
// overhead is smaller or equal to the expected maxBytes.
func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc {
// PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.
func PreCheckMaxBytes(maxBytes int64) PreCheckFunc {
return func(tx types.Tx) error {
// We have to account for the amino overhead in the tx size as well
// NOTE: fieldNum = 1 as types.Block.Data contains Txs []Tx as first field.
// If this field order ever changes this needs to updated here accordingly.
// NOTE: if some []Tx are encoded without a parenting struct, the
// fieldNum is also equal to 1.
aminoOverhead := types.ComputeAminoOverhead(tx, 1)
txSize := int64(len(tx)) + aminoOverhead
txSize := int64(len(tx))
if txSize > maxBytes {
return fmt.Errorf("tx size (including amino overhead) is too big: %d, max: %d",
return fmt.Errorf("tx size is too big: %d, max: %d",
txSize, maxBytes)
}
return nil

View File

@@ -3,12 +3,10 @@ package mempool
import (
"fmt"
"math"
"reflect"
"sync"
"time"
amino "github.com/tendermint/go-amino"
gogotypes "github.com/gogo/protobuf/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -19,7 +17,7 @@ import (
const (
MempoolChannel = byte(0x30)
aminoOverheadForTxMessage = 8
protoOverheadForTxMessage = 4
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
@@ -172,20 +170,15 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *TxMessage:
txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
err := memR.mempool.CheckTx(msg.Tx, nil, txInfo)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
}
// broadcasting happens from go routines per peer
default:
memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
err = memR.mempool.CheckTx(msg.Tx, nil, txInfo)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
}
// broadcasting happens from go routines per peer
}
// PeerState describes the state of a peer.
@@ -238,9 +231,12 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// ensure peer hasn't already sent us this tx
if _, ok := memTx.senders.Load(peerID); !ok {
// send memTx
msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
msg := gogotypes.BytesValue{Value: []byte(memTx.tx)}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(MempoolChannel, bz)
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
@@ -262,17 +258,16 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
//-----------------------------------------------------------------------------
// Messages
// Message is a message sent or received by the Reactor.
type Message interface{}
func RegisterMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*Message)(nil), nil)
cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
}
func (memR *Reactor) decodeMsg(bz []byte) (msg Message, err error) {
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
func (memR *Reactor) decodeMsg(bz []byte) (TxMessage, error) {
msg := gogotypes.BytesValue{}
err := msg.Unmarshal(bz)
if err != nil {
return TxMessage{}, err
}
txMsg := TxMessage{
Tx: types.Tx(msg.Value),
}
return txMsg, err
}
//-------------------------------------
@@ -288,7 +283,7 @@ func (m *TxMessage) String() string {
}
// calcMaxMsgSize returns the max size of TxMessage
// account for amino overhead of TxMessage
// account for proto overhead of bytesValue
func calcMaxMsgSize(maxTxSize int) int {
return maxTxSize + aminoOverheadForTxMessage
return maxTxSize + protoOverheadForTxMessage
}