blockchain v2: update message usage

This commit is contained in:
Aleksandr Bezobchuk
2021-01-05 12:05:13 -05:00
parent faa0e09f17
commit 537464d26a
3 changed files with 84 additions and 23 deletions

View File

@@ -3,7 +3,7 @@ package v2
import (
"errors"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
"github.com/tendermint/tendermint/state"
@@ -48,7 +48,13 @@ type consensusReactor interface {
}
func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
msgProto := new(bcproto.Message)
if err := msgProto.Wrap(&bcproto.BlockRequest{Height: height}); err != nil {
return err
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@@ -61,7 +67,13 @@ func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
}
func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
msgProto := new(bcproto.Message)
if err := msgProto.Wrap(&bcproto.StatusResponse{Height: height, Base: base}); err != nil {
return err
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@@ -83,10 +95,17 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
return err
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bpb})
msgProto := new(bcproto.Message)
if err := msgProto.Wrap(&bcproto.BlockResponse{Block: bpb}); err != nil {
return err
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return errPeerQueueFull
}
@@ -95,7 +114,13 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
}
func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height})
msgProto := new(bcproto.Message)
if err := msgProto.Wrap(&bcproto.NoBlockResponse{Height: height}); err != nil {
return err
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@@ -116,7 +141,13 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool
}
func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
msgProto := new(bcproto.Message)
if err := msgProto.Wrap(&bcproto.StatusRequest{}); err != nil {
return err
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@@ -129,7 +160,13 @@ func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
}
func (sio *switchIO) broadcastStatusRequest() error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
msgProto := new(bcproto.Message)
if err := msgProto.Wrap(&bcproto.StatusRequest{}); err != nil {
return err
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"time"
proto "github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/behaviour"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
@@ -466,20 +468,28 @@ func (r *BlockchainReactor) Stop() error {
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := r.logger.With("src", src.ID(), "chID", chID)
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
msgProto := new(bcproto.Message)
if err := proto.Unmarshal(msgBytes, msgProto); err != nil {
logger.Error("error decoding message", "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
if err = bc.ValidateMsg(msg); err != nil {
logger.Error("peer sent us invalid msg", "msg", msg, "err", err)
if err := msgProto.Validate(); err != nil {
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
r.logger.Debug("Receive", "msg", msg)
msg, err := msgProto.Unwrap()
if err != nil {
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
r.logger.Debug("received", "msg", msg)
switch msg := msg.(type) {
case *bcproto.StatusRequest:

View File

@@ -9,13 +9,13 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/behaviour"
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@@ -409,23 +409,37 @@ func TestReactorHelperMode(t *testing.T) {
switch ev := step.event.(type) {
case bcproto.StatusRequest:
old := mockSwitch.numStatusResponse
msg, err := bc.EncodeMsg(&ev)
assert.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
msgProto := new(bcproto.Message)
require.NoError(t, msgProto.Wrap(&ev))
msgBz, err := proto.Marshal(msgProto)
require.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
assert.Equal(t, old+1, mockSwitch.numStatusResponse)
case bcproto.BlockRequest:
if ev.Height > params.startHeight {
old := mockSwitch.numNoBlockResponse
msg, err := bc.EncodeMsg(&ev)
assert.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
msgProto := new(bcproto.Message)
require.NoError(t, msgProto.Wrap(&ev))
msgBz, err := proto.Marshal(msgProto)
require.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
assert.Equal(t, old+1, mockSwitch.numNoBlockResponse)
} else {
old := mockSwitch.numBlockResponse
msg, err := bc.EncodeMsg(&ev)
assert.NoError(t, err)
assert.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
msgProto := new(bcproto.Message)
require.NoError(t, msgProto.Wrap(&ev))
msgBz, err := proto.Marshal(msgProto)
require.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
assert.Equal(t, old+1, mockSwitch.numBlockResponse)
}
}