diff --git a/blockchain/v2/io.go b/blockchain/v2/io.go index 69995e4c5..595b67615 100644 --- a/blockchain/v2/io.go +++ b/blockchain/v2/io.go @@ -3,7 +3,7 @@ package v2 import ( "errors" - bc "github.com/tendermint/tendermint/blockchain" + "github.com/gogo/protobuf/proto" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/state" @@ -48,7 +48,13 @@ type consensusReactor interface { } func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error { - msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height}) + msgProto := new(bcproto.Message) + + if err := msgProto.Wrap(&bcproto.BlockRequest{Height: height}); err != nil { + return err + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -61,7 +67,13 @@ func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error { } func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base}) + msgProto := new(bcproto.Message) + + if err := msgProto.Wrap(&bcproto.StatusResponse{Height: height, Base: base}); err != nil { + return err + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -83,10 +95,17 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error { return err } - msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bpb}) + msgProto := new(bcproto.Message) + + if err := msgProto.Wrap(&bcproto.BlockResponse{Block: bpb}); err != nil { + return err + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } + if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { return errPeerQueueFull } @@ -95,7 +114,13 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error { } func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error { - msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height}) + msgProto := new(bcproto.Message) + + if err := msgProto.Wrap(&bcproto.NoBlockResponse{Height: height}); err != nil { + return err + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -116,7 +141,13 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool } func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) + msgProto := new(bcproto.Message) + + if err := msgProto.Wrap(&bcproto.StatusRequest{}); err != nil { + return err + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -129,7 +160,13 @@ func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error { } func (sio *switchIO) broadcastStatusRequest() error { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) + msgProto := new(bcproto.Message) + + if err := msgProto.Wrap(&bcproto.StatusRequest{}); err != nil { + return err + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 6386b0f1b..5fb4b794b 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + proto "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/behaviour" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" @@ -466,20 +468,28 @@ func (r *BlockchainReactor) Stop() error { func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { logger := r.logger.With("src", src.ID(), "chID", chID) - msg, err := bc.DecodeMsg(msgBytes) - if err != nil { + msgProto := new(bcproto.Message) + + if err := proto.Unmarshal(msgBytes, msgProto); err != nil { logger.Error("error decoding message", "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return } - if err = bc.ValidateMsg(msg); err != nil { - logger.Error("peer sent us invalid msg", "msg", msg, "err", err) + if err := msgProto.Validate(); err != nil { + logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return } - r.logger.Debug("Receive", "msg", msg) + msg, err := msgProto.Unwrap() + if err != nil { + logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err) + _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) + return + } + + r.logger.Debug("received", "msg", msg) switch msg := msg.(type) { case *bcproto.StatusRequest: diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index fe0154f81..099d311a0 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -9,13 +9,13 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/behaviour" - bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -409,23 +409,37 @@ func TestReactorHelperMode(t *testing.T) { switch ev := step.event.(type) { case bcproto.StatusRequest: old := mockSwitch.numStatusResponse - msg, err := bc.EncodeMsg(&ev) - assert.NoError(t, err) - reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg) + + msgProto := new(bcproto.Message) + require.NoError(t, msgProto.Wrap(&ev)) + + msgBz, err := proto.Marshal(msgProto) + require.NoError(t, err) + + reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz) assert.Equal(t, old+1, mockSwitch.numStatusResponse) case bcproto.BlockRequest: if ev.Height > params.startHeight { old := mockSwitch.numNoBlockResponse - msg, err := bc.EncodeMsg(&ev) - assert.NoError(t, err) - reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg) + + msgProto := new(bcproto.Message) + require.NoError(t, msgProto.Wrap(&ev)) + + msgBz, err := proto.Marshal(msgProto) + require.NoError(t, err) + + reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz) assert.Equal(t, old+1, mockSwitch.numNoBlockResponse) } else { old := mockSwitch.numBlockResponse - msg, err := bc.EncodeMsg(&ev) - assert.NoError(t, err) - assert.NoError(t, err) - reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg) + + msgProto := new(bcproto.Message) + require.NoError(t, msgProto.Wrap(&ev)) + + msgBz, err := proto.Marshal(msgProto) + require.NoError(t, err) + + reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz) assert.Equal(t, old+1, mockSwitch.numBlockResponse) } }