blockchain: proto migration (#4969)

## Description

migration of blockchain reactors to protobuf

Closes: #XXX
This commit is contained in:
Marko
2020-06-08 14:45:03 +02:00
committed by GitHub
parent db8f1b3df3
commit a89f2581fc
22 changed files with 519 additions and 811 deletions

View File

@@ -1,14 +0,0 @@
package v0
import (
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/types"
)
var cdc = amino.NewCodec()
func init() {
RegisterBlockchainMessages(cdc)
types.RegisterBlockAmino(cdc)
}

View File

@@ -1,15 +1,14 @@
package v0
import (
"errors"
"fmt"
"reflect"
"time"
amino "github.com/tendermint/go-amino"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/blockchain"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
@@ -29,13 +28,6 @@ const (
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
// NOTE: keep up to date with bcBlockResponseMessage
bcBlockResponseMessagePrefixSize = 4
bcBlockResponseMessageFieldKeySize = 1
maxMsgSize = types.MaxBlockSizeBytes +
bcBlockResponseMessagePrefixSize +
bcBlockResponseMessageFieldKeySize
)
type consensusReactor interface {
@@ -147,17 +139,21 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
Priority: 10,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
RecvMessageCapacity: bc.MaxMsgSize,
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{
Height: bcR.store.Height(),
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
})
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
@@ -172,31 +168,47 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
bl, err := block.ToProto()
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height})
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
if err = bc.ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
@@ -205,20 +217,30 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcBlockRequestMessage:
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
case *bcBlockResponseMessage:
bcR.pool.AddBlock(src.ID(), msg.Block, len(msgBytes))
case *bcStatusRequestMessage:
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
case *bcproto.StatusRequest:
// Send peer our state.
src.TrySend(BlockchainChannel, cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
}))
case *bcStatusResponseMessage:
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlockchainChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcNoBlockResponseMessage:
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
@@ -255,7 +277,12 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{request.Height})
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
@@ -381,132 +408,16 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{
bm, err := bc.EncodeMsg(&bcproto.StatusRequest{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}
bcR.Switch.Broadcast(BlockchainChannel, bm)
return nil
}
//-----------------------------------------------------------------------------
// Messages
// BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface {
ValidateBasic() error
}
// RegisterBlockchainMessages registers the fast sync messages for amino encoding.
func RegisterBlockchainMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*BlockchainMessage)(nil), nil)
cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil)
cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil)
cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil)
cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil)
cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil)
}
func decodeMsg(bz []byte) (msg BlockchainMessage, err error) {
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
}
//-------------------------------------
type bcBlockRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcBlockRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
}
type bcNoBlockResponseMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcNoBlockResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcNoBlockResponseMessage) String() string {
return fmt.Sprintf("[bcNoBlockResponseMessage %d]", m.Height)
}
//-------------------------------------
type bcBlockResponseMessage struct {
Block *types.Block
}
// ValidateBasic performs basic validation.
func (m *bcBlockResponseMessage) ValidateBasic() error {
return m.Block.ValidateBasic()
}
func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
}
//-------------------------------------
type bcStatusRequestMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v:%v]", m.Base, m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v:%v]", m.Base, m.Height)
}

View File

@@ -263,86 +263,6 @@ func TestBadBlockStopsPeer(t *testing.T) {
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
}
func TestBcBlockRequestMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
requestHeight int64
expectErr bool
}{
{"Valid Request Message", 0, false},
{"Valid Request Message", 1, false},
{"Invalid Request Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
request := bcBlockRequestMessage{Height: tc.requestHeight}
assert.Equal(t, tc.expectErr, request.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}
func TestBcNoBlockResponseMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
nonResponseHeight int64
expectErr bool
}{
{"Valid Non-Response Message", 0, false},
{"Valid Non-Response Message", 1, false},
{"Invalid Non-Response Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
nonResponse := bcNoBlockResponseMessage{Height: tc.nonResponseHeight}
assert.Equal(t, tc.expectErr, nonResponse.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}
func TestBcStatusRequestMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
requestHeight int64
expectErr bool
}{
{"Valid Request Message", 0, false},
{"Valid Request Message", 1, false},
{"Invalid Request Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
request := bcStatusRequestMessage{Height: tc.requestHeight}
assert.Equal(t, tc.expectErr, request.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}
func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
responseHeight int64
expectErr bool
}{
{"Valid Response Message", 0, false},
{"Valid Response Message", 1, false},
{"Invalid Response Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
response := bcStatusResponseMessage{Height: tc.responseHeight}
assert.Equal(t, tc.expectErr, response.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}
//----------------------------------------------
// utility funcs