Compare commits

..

51 Commits

Author SHA1 Message Date
William Banfield
ce62eeb3d9 update envelope to match v0.35 2022-10-20 14:05:50 -04:00
William Banfield
4d47ec1753 fix block size caculation 2022-10-20 14:01:14 -04:00
William Banfield
f8d37520e9 delete remaining NewReceive and convert to Receive 2022-10-20 13:56:42 -04:00
William Banfield
d0bfcef698 update blocksync receive method 2022-10-20 13:56:25 -04:00
William Banfield
e18106eeda remove old receive funcs 2022-10-20 13:47:54 -04:00
William Banfield
922ba6ab67 update statesync receive 2022-10-20 13:41:22 -04:00
William Banfield
28aeb39bab remove receive from peer base type 2022-10-20 13:31:04 -04:00
William Banfield
4dc961ec0c fix reactor test by adding new receive 2022-10-20 13:23:40 -04:00
William Banfield
68fae0c53d fix test compile fail 2022-10-20 13:12:07 -04:00
William Banfield
934594f231 continue cleanup 2022-10-20 13:10:05 -04:00
William Banfield
934bf233c9 must prefix in pex conversion code 2022-10-20 13:06:36 -04:00
William Banfield
78fbb5db08 further consolidation 2022-10-20 13:02:46 -04:00
William Banfield
7e5f932402 reduce extra vars 2022-10-20 12:57:41 -04:00
William Banfield
b1145cf769 simplify invalid cons code 2022-10-20 12:54:26 -04:00
William Banfield
eda5595a11 simplify byzantine code 2022-10-20 12:32:59 -04:00
William Banfield
7fb6cef26a re-add error to proto convert 2022-10-20 12:29:53 -04:00
William Banfield
5dea81f2ac rename statesync wrap function 2022-10-20 12:19:14 -04:00
William Banfield
ef17093bb4 add todo for log line related to unmarshal in mconn receive 2022-10-20 12:16:02 -04:00
William Banfield
41df563982 clean up TrySend comment 2022-10-20 12:08:43 -04:00
William Banfield
eef513acf8 clean up consensus package use of conversion to proto 2022-10-20 11:41:37 -04:00
William Banfield
5e513fbb4f clean up blocksync proto wrap func 2022-10-20 11:34:56 -04:00
William Banfield
d677121ae0 pex tests passing 2022-10-19 22:25:56 -04:00
William Banfield
d2fb6835c8 mempool v1 tests pass 2022-10-19 22:04:40 -04:00
William Banfield
ae266b2da6 mempool v0 tests pass 2022-10-19 21:58:48 -04:00
William Banfield
3e9223ba79 evidence reactor works 2022-10-19 21:52:21 -04:00
William Banfield
d2fe1d3b36 cons reactor works 2022-10-19 21:52:08 -04:00
William Banfield
396e3d80ac add message type to channel descriptor 2022-10-19 21:21:58 -04:00
William Banfield
1919727904 add tmcons message as message type 2022-10-19 21:13:09 -04:00
William Banfield
0dcec4b545 shim receive bytes metric 2022-10-19 21:07:55 -04:00
William Banfield
4daee74d9c create new map of channel id to message type 2022-10-19 21:05:04 -04:00
William Banfield
1053a49b4e improve NewReceive on cons reactor 2022-10-19 20:52:20 -04:00
William Banfield
7b41c03409 shim in new receive to the reactor interface 2022-10-19 20:51:42 -04:00
William Banfield
e20d28429d receive start 2022-10-19 17:27:23 -04:00
William Banfield
14510c4eb0 fix evidence tests 2022-10-19 17:21:50 -04:00
William Banfield
c529a902eb sync any passes 2022-10-19 14:57:37 -04:00
William Banfield
adbcd0c450 fix statesync test 2022-10-19 14:50:35 -04:00
William Banfield
8644f69aa4 pex suite passing 2022-10-19 14:20:12 -04:00
William Banfield
fd234903b6 fix non-wrapped proto messages in statesync 2022-10-19 14:10:27 -04:00
William Banfield
cfe2ca68da consensus test passed 2022-10-19 14:07:01 -04:00
William Banfield
2786e3db55 tests compile 2022-10-19 13:13:52 -04:00
William Banfield
7e107c45fb remove old TrySends 2022-10-19 13:10:49 -04:00
William Banfield
64c18a4406 remove old Sends 2022-10-19 13:10:11 -04:00
William Banfield
73d0f8116a compiles after NewBroadcast change 2022-10-19 13:05:05 -04:00
William Banfield
01db5dc019 tests compile 2022-10-19 12:28:39 -04:00
William Banfield
5795877dbc update test mocks to include new sends 2022-10-19 11:44:21 -04:00
William Banfield
7f9e609720 broadcast and sends updated in consensus reactor 2022-10-19 11:39:53 -04:00
William Banfield
833e4a15d4 experiment continued 2022-10-19 11:14:18 -04:00
William Banfield
8989fb4a79 broken version of send 2022-10-19 11:07:46 -04:00
William Banfield
6c81ecb1b5 basic envelope type 2022-10-19 10:54:24 -04:00
William Banfield
069f460e16 add type to channel description 2022-10-19 10:48:33 -04:00
William Banfield
c77898aad9 add the initial message metric 2022-10-18 17:09:46 -04:00
29 changed files with 622 additions and 619 deletions

View File

@@ -19,6 +19,55 @@ const (
BlockResponseMessageFieldKeySize
)
func wrapMsg(pb proto.Message) (proto.Message, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
return &msg, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
return UnwrapMessage(pb)
}
func UnwrapMessage(pb *bcproto.Message) (proto.Message, error) {
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -150,12 +150,18 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msg, err := wrapMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
Message: msg,
})
// it's OK if send fails. will try later in poolRoutine
@@ -181,30 +187,50 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}
wm, err := wrapMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not convert msg to proto message", "err", err)
return false
}
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
Message: wm,
})
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
wm, err := wrapMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
Message: wm,
})
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
msg, err := UnwrapMessage(e.Message.(*bcproto.Message))
if err != nil {
bcR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
switch msg := e.Message.(type) {
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
@@ -216,12 +242,18 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
case *bcproto.StatusRequest:
// Send peer our state.
wm, err := wrapMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to proto message", "err", err)
return
}
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
Message: wm,
})
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
@@ -268,9 +300,14 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
wm, err := wrapMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
Message: wm,
})
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
@@ -410,9 +447,15 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
wm, err := wrapMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto message", "err", err)
return fmt.Errorf("could not convert msg to proto message: %w", err)
}
bcR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
Message: wm,
})
return nil
}

View File

@@ -26,7 +26,6 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
@@ -167,13 +166,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote1.ToProto()},
Message: MustMsgToProto(&VoteMessage{prevote1}),
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote2.ToProto()},
Message: MustMsgToProto(&VoteMessage{prevote2}),
ChannelID: VoteChannel,
})
}
@@ -527,25 +526,23 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
Message: MustMsgToProto(msg),
})
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
Message: MustMsgToProto(msg),
})
}
@@ -556,11 +553,11 @@ func sendProposalAndParts(
cs.mtx.Unlock()
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{prevote.ToProto()},
Message: MustMsgToProto(&VoteMessage{prevote}),
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{precommit.ToProto()},
Message: MustMsgToProto(&VoteMessage{precommit}),
})
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
@@ -96,7 +95,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{precommit.ToProto()},
Message: MustMsgToProto(&VoteMessage{precommit}),
ChannelID: VoteChannel,
})
}

View File

@@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmmath "github.com/tendermint/tendermint/libs/math"
@@ -14,8 +16,6 @@ import (
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
//
// TODO: This needs to be removed, but WALToProto depends on this.
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@@ -143,6 +143,14 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
return &pb, nil
}
func MustMsgToProto(msg Message) *tmcons.Message {
m, err := MsgToProto(msg)
if err != nil {
panic(err)
}
return m
}
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
@@ -260,6 +268,20 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -231,16 +233,6 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
return
}
if w, ok := e.Message.(p2p.Wrapper); ok {
var err error
e.Message, err = w.Wrap()
if err != nil {
conR.Logger.Error("Error wrapping message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
}
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
if err != nil {
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
@@ -304,18 +296,15 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
eMsg := &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID.ToProto(),
}
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: eMsg,
Message: MustMsgToProto(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}),
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
@@ -450,28 +439,28 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
Message: MustMsgToProto(nrsMsg),
})
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
psh := rs.ProposalBlockParts.Header()
csMsg := &tmcons.NewValidBlock{
csMsg := &NewValidBlockMessage{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: psh.ToProto(),
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
MsgToProto(csMsg)
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: csMsg,
Message: MustMsgToProto(csMsg),
})
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &tmcons.HasVote{
msg := &HasVoteMessage{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
@@ -479,7 +468,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
}
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: msg,
Message: MustMsgToProto(msg),
})
/*
// TODO: Make this broadcast more selective.
@@ -505,11 +494,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
nrsMsg = &tmcons.NewRoundStep{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: uint32(rs.Step),
Step: rs.Step,
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.GetRound(),
}
@@ -521,7 +510,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
nrsMsg := makeRoundStepMessage(rs)
peer.Send(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
Message: MustMsgToProto(nrsMsg),
})
}
@@ -561,18 +550,15 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
parts, err := part.ToProto()
if err != nil {
panic(err)
msg := &BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: *parts,
},
Message: MustMsgToProto(msg),
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
@@ -619,10 +605,11 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
Message: MustMsgToProto(msg),
}) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
@@ -633,14 +620,15 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
Message: MustMsgToProto(msg),
})
}
continue OUTER_LOOP
@@ -678,19 +666,15 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Send the part
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
pp, err := part.ToProto()
if err != nil {
logger.Error("Could not convert part to proto", "index", index, "error", err)
return
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: *pp,
},
Message: MustMsgToProto(msg),
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
@@ -853,12 +837,12 @@ OUTER_LOOP:
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
BlockID: maj23,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -871,14 +855,15 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23.ToProto(),
},
BlockID: maj23,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -894,12 +879,12 @@ OUTER_LOOP:
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
BlockID: maj23,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -917,12 +902,12 @@ OUTER_LOOP:
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID.ToProto(),
},
BlockID: commit.BlockID,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -1137,12 +1122,11 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{
Vote: vote.ToProto(),
},
Message: MustMsgToProto(msg),
}) {
ps.SetHasVote(vote)
return true
@@ -1509,6 +1493,15 @@ func init() {
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
func decodeMsg(bz []byte) (msg Message, err error) {
pb := &tmcons.Message{}
if err = proto.Unmarshal(bz, pb); err != nil {
return msg, err
}
return MsgFromProto(pb)
}
//-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.

View File

@@ -33,7 +33,6 @@ import (
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
statemocks "github.com/tendermint/tendermint/state/mocks"
@@ -266,18 +265,15 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
reactor.InitPeer(peer)
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
reactor.Receive(StateChannel, peer, msg)
reactor.AddPeer(peer)
})
}
@@ -292,17 +288,20 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
// we should call InitPeer here
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(p2p.Envelope{
reactor.Receive(StateChannel, peer, msg)
reactor.NewReceive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
Message: MustMsgToProto(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType}),
})
})
}

View File

@@ -214,6 +214,24 @@ type PeerState interface {
GetHeight() int64
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func encodeMsg(evis []types.Evidence) ([]byte, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
if err != nil {
return nil, err
}
evi[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: evi,
}
return epl.Marshal()
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
@@ -231,6 +249,31 @@ func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
return &epl, nil
}
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := tmproto.EvidenceList{}
if err := lm.Unmarshal(bz); err != nil {
return nil, err
}
evis = make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
if err != nil {
return nil, err
}
evis[i] = ev
}
for i, ev := range evis {
if err := ev.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err)
}
}
return evis, nil
}
func evidenceListFromProto(m proto.Message) ([]types.Evidence, error) {
lm := m.(*tmproto.EvidenceList)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -156,33 +157,27 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
msg, err := msgFromProto(e.Message)
if err != nil {
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
memR.Switch.StopPeerForError(e.Src, err)
return
}
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
@@ -267,6 +262,39 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}
func decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
return msgFromProto(&msg)
}
func msgFromProto(m proto.Message) (TxsMessage, error) {
msg := m.(*protomem.Message)
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -22,7 +22,6 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -280,12 +279,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Src: peer,
Message: &protomem.Txs{
Txs: [][]byte{{0x01, 0x02, 0x03}},
}})
reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.AddPeer(peer)
}
}

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -155,34 +157,26 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
msg, err := protoToMsg(e.Message)
if err != nil {
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
memR.Switch.StopPeerForError(e.Src, err)
return
}
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if err == mempool.ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
@@ -273,6 +267,42 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
//-----------------------------------------------------------------------------
// Messages
func decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
return protoToMsg(&msg)
}
func protoToMsg(m proto.Message) (TxsMessage, error) {
msg := m.(*protomem.Message)
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
//-------------------------------------
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
return r
}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(e p2p.Envelope) {}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}

View File

@@ -261,15 +261,7 @@ func (p *peer) Send(e Envelope) bool {
} else if !p.hasChannel(e.ChannelID) {
return false
}
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
msgBytes, err := proto.Marshal(e.Message)
if err != nil {
panic(err) // Q: should this panic or error?
}
@@ -292,15 +284,7 @@ func (p *peer) TrySend(e Envelope) bool {
} else if !p.hasChannel(e.ChannelID) {
return false
}
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
msgBytes, err := proto.Marshal(e.Message)
if err != nil {
panic(err)
}
@@ -436,13 +420,6 @@ func createMConnection(
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
// TODO(williambanfield) add error log line.
return
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
reactor.Receive(Envelope{

View File

@@ -6,6 +6,8 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
@@ -236,9 +238,15 @@ func (r *Reactor) logErrAddrBook(err error) {
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(e p2p.Envelope) {
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
msg, err := msgFromProto(e.Message)
if err != nil {
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", msg)
switch msg := e.Message.(type) {
switch msg := msg.(type) {
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
@@ -343,7 +351,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
r.requestsSent.Set(id, struct{}{})
p.Send(p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexRequest{},
Message: mustMsgToWrappedProto(&tmp2p.PexRequest{}),
})
}
@@ -404,7 +412,7 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
e := p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
Message: mustMsgToWrappedProto(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}),
}
p.Send(e)
}
@@ -763,3 +771,51 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
book.MarkAttempt(addr)
}
}
//-----------------------------------------------------------------------------
// Messages
// mustEncode proto encodes a tmp2p.Message
func mustEncode(pb proto.Message) []byte {
msg := mustMsgToWrappedProto(pb)
bz, err := proto.Marshal(msg)
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func mustMsgToWrappedProto(pb proto.Message) proto.Message {
msg := tmp2p.Message{}
switch pb := pb.(type) {
case *tmp2p.PexRequest:
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
case *tmp2p.PexAddrs:
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
default:
panic(fmt.Sprintf("Unknown message type %T", pb))
}
return &msg
}
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &tmp2p.Message{}
err := pb.Unmarshal(bz)
if err != nil {
return nil, err
}
return msgFromProto(pb)
}
func msgFromProto(m proto.Message) (proto.Message, error) {
pb := m.(*tmp2p.Message)
switch msg := pb.Sum.(type) {
case *tmp2p.Message_PexRequest:
return msg.PexRequest, nil
case *tmp2p.Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -132,10 +132,12 @@ func TestPEXReactorReceive(t *testing.T) {
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
assert.Equal(t, size+1, book.Size())
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, mustEncode(&tmp2p.PexRequest{})) // should not panic.
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
}
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
@@ -154,19 +156,23 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
require.True(t, book.HasAddress(peerAddr))
id := string(peer.ID())
msg := mustEncode(&tmp2p.PexRequest{})
// first time creates the entry
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// next time sets the last time value
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// third time is too many too soon - peer is removed
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peerAddr))
@@ -193,12 +199,14 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
// receive some addrs. should clear the request
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// receiving more unsolicited addrs causes a disconnect and ban
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peer.SocketAddr()))
}
@@ -484,12 +492,8 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
pexR.RequestAddrs(peer)
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
pexR.Receive(p2p.Envelope{
ChannelID: PexChannel,
Src: peer,
Message: msg,
})
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
pexR.Receive(PexChannel, peer, msg)
assert.Equal(t, size, book.Size())
pexR.AddPeer(peer)
@@ -697,10 +701,7 @@ func TestPexVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
bz := mustEncode(tc.msg)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -265,7 +265,39 @@ func (sw *Switch) OnStop() {
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) Broadcast(e Envelope) chan bool {
/*
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
peers := sw.peers.List()
var wg sync.WaitGroup
wg.Add(len(peers))
successChan := make(chan bool, len(peers))
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(chID, msgBytes)
successChan <- success
}(peer)
}
go func() {
wg.Wait()
close(successChan)
}()
return successChan
}
*/
// NewBroadcast runs a go routine for each attempted send, which will block trying
// to send for defaultSendTimeoutSeconds. Returns a channel which receives
// success values for each attempted send (false if times out). Channel will be
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) NewBroadcast(e Envelope) chan bool {
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
peers := sw.peers.List()

View File

@@ -39,8 +39,9 @@ func init() {
}
type PeerMessage struct {
Contents proto.Message
Counter int
PeerID ID
Bytes []byte
Counter int
}
type TestReactor struct {
@@ -72,12 +73,12 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
func (tr *TestReactor) Receive(e Envelope) {
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
fmt.Printf("Received: %X, %X\n", e.ChannelID, e.Message)
tr.msgsReceived[e.ChannelID] = append(tr.msgsReceived[e.ChannelID], PeerMessage{Contents: e.Message, Counter: tr.msgsCounter})
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
tr.msgsCounter++
}
}
@@ -105,12 +106,12 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x01), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x03), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, true))
return sw
@@ -137,47 +138,66 @@ func TestSwitches(t *testing.T) {
}
// Lets send some messages
ch0Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
ch0Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "0",
},
},
},
},
}
ch1Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
ch1Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
},
},
}
ch2Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
ch2Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
},
},
},
},
}
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
msgBytes, err := proto.Marshal(ch0Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
ch0Msg,
msgBytes,
byte(0x00),
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
msgBytes, err = proto.Marshal(ch1Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
ch1Msg,
msgBytes,
byte(0x01),
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
msgBytes, err = proto.Marshal(ch2Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
ch2Msg,
msgBytes,
byte(0x02),
s2.Reactor("bar").(*TestReactor), 200*time.Millisecond, 5*time.Second)
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
}
func assertMsgReceivedWithTimeout(
t *testing.T,
msg proto.Message,
msgBytes []byte,
channel byte,
reactor *TestReactor,
checkPeriod,
@@ -188,13 +208,9 @@ func assertMsgReceivedWithTimeout(
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
expectedBytes, err := proto.Marshal(msgs[0].Contents)
require.NoError(t, err)
gotBytes, err := proto.Marshal(msg)
require.NoError(t, err)
if len(msgs) > 0 {
if !bytes.Equal(expectedBytes, gotBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msg, msgs[0].Counter)
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
}
return
}
@@ -849,7 +865,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(Envelope{ChannelID: chID})
successChan := s1.NewBroadcast(Envelope{ChannelID: chID})
for s := range successChan {
if s {
numSuccess++

View File

@@ -3,7 +3,6 @@ package p2p
import (
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
type ChannelDescriptor = conn.ChannelDescriptor
@@ -15,26 +14,3 @@ type Envelope struct {
Message proto.Message // message payload
ChannelID byte
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,
// such that reactors do not have to do this themselves.
type Unwrapper interface {
proto.Message
// Unwrap will unwrap the inner message contained in this message.
Unwrap() (proto.Message, error)
}
type Wrapper interface {
proto.Message
// Wrap will take the underlying message and wrap it in its wrapper type.
Wrap() (proto.Message, error)
}
var (
_ Wrapper = &tmp2p.PexRequest{}
_ Wrapper = &tmp2p.PexAddrs{}
)

View File

@@ -1,73 +0,0 @@
package blocksync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &StatusRequest{}
var _ p2p.Wrapper = &StatusResponse{}
var _ p2p.Wrapper = &NoBlockResponse{}
var _ p2p.Wrapper = &BlockResponse{}
var _ p2p.Wrapper = &BlockRequest{}
const (
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
)
func (m *BlockRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockRequest{BlockRequest: m}
return bm, nil
}
func (m *BlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockResponse{BlockResponse: m}
return bm, nil
}
func (m *NoBlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_NoBlockResponse{NoBlockResponse: m}
return bm, nil
}
func (m *StatusRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusRequest{StatusRequest: m}
return bm, nil
}
func (m *StatusResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusResponse{StatusResponse: m}
return bm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_BlockRequest:
return m.GetBlockRequest(), nil
case *Message_BlockResponse:
return m.GetBlockResponse(), nil
case *Message_NoBlockResponse:
return m.GetNoBlockResponse(), nil
case *Message_StatusRequest:
return m.GetStatusRequest(), nil
case *Message_StatusResponse:
return m.GetStatusResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -1,109 +0,0 @@
package consensus
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &VoteSetBits{}
var _ p2p.Wrapper = &VoteSetMaj23{}
var _ p2p.Wrapper = &Vote{}
var _ p2p.Wrapper = &ProposalPOL{}
var _ p2p.Wrapper = &Proposal{}
var _ p2p.Wrapper = &NewValidBlock{}
var _ p2p.Wrapper = &NewRoundStep{}
var _ p2p.Wrapper = &HasVote{}
var _ p2p.Wrapper = &BlockPart{}
func (m *VoteSetBits) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetBits{VoteSetBits: m}
return cm, nil
}
func (m *VoteSetMaj23) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetMaj23{VoteSetMaj23: m}
return cm, nil
}
func (m *HasVote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_HasVote{HasVote: m}
return cm, nil
}
func (m *Vote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Vote{Vote: m}
return cm, nil
}
func (m *BlockPart) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_BlockPart{BlockPart: m}
return cm, nil
}
func (m *ProposalPOL) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_ProposalPol{ProposalPol: m}
return cm, nil
}
func (m *Proposal) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Proposal{Proposal: m}
return cm, nil
}
func (m *NewValidBlock) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewValidBlock{NewValidBlock: m}
return cm, nil
}
func (m *NewRoundStep) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewRoundStep{NewRoundStep: m}
return cm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_NewRoundStep:
return m.GetNewRoundStep(), nil
case *Message_NewValidBlock:
return m.GetNewValidBlock(), nil
case *Message_Proposal:
return m.GetProposal(), nil
case *Message_ProposalPol:
return m.GetProposalPol(), nil
case *Message_BlockPart:
return m.GetBlockPart(), nil
case *Message_Vote:
return m.GetVote(), nil
case *Message_HasVote:
return m.GetHasVote(), nil
case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -1,30 +0,0 @@
package mempool
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &Txs{}
var _ p2p.Unwrapper = &Message{}
// Wrap implements the p2p Wrapper interface and wraps a mempool message.
func (m *Txs) Wrap() (proto.Message, error) {
return &Message{
Sum: &Message_Txs{Txs: m},
}, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_Txs:
return m.GetTxs(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -1,32 +0,0 @@
package p2p
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
)
func (m *PexAddrs) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexAddrs{PexAddrs: m}
return pm, nil
}
func (m *PexRequest) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexRequest{PexRequest: m}
return pm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_PexRequest:
return msg.PexRequest, nil
case *Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown pex message: %T", msg)
}
}

View File

@@ -1,58 +0,0 @@
package statesync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &ChunkRequest{}
var _ p2p.Wrapper = &ChunkResponse{}
var _ p2p.Wrapper = &SnapshotsRequest{}
var _ p2p.Wrapper = &SnapshotsResponse{}
func (m *SnapshotsResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsResponse{SnapshotsResponse: m}
return sm, nil
}
func (m *SnapshotsRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsRequest{SnapshotsRequest: m}
return sm, nil
}
func (m *ChunkResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkResponse{ChunkResponse: m}
return sm, nil
}
func (m *ChunkRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkRequest{ChunkRequest: m}
return sm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_ChunkRequest:
return m.GetChunkRequest(), nil
case *Message_ChunkResponse:
return m.GetChunkResponse(), nil
case *Message_SnapshotsRequest:
return m.GetSnapshotsRequest(), nil
case *Message_SnapshotsResponse:
return m.GetSnapshotsResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -16,6 +16,58 @@ const (
chunkMsgSize = int(16e6)
)
// mustEncodeMsg encodes a Protobuf message, panicing on error.
func mustEncodeMsg(pb proto.Message) []byte {
msg := mustWrapToProto(pb)
bz, err := proto.Marshal(msg)
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func mustWrapToProto(pb proto.Message) proto.Message {
msg := ssproto.Message{}
switch pb := pb.(type) {
case *ssproto.ChunkRequest:
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
case *ssproto.ChunkResponse:
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
case *ssproto.SnapshotsRequest:
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
case *ssproto.SnapshotsResponse:
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
default:
panic(fmt.Errorf("unknown message type %T", pb))
}
return &msg
}
// decodeMsg decodes a Protobuf message.
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &ssproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
return msgFromProto(pb)
}
func msgFromProto(pb *ssproto.Message) (proto.Message, error) {
switch msg := pb.Sum.(type) {
case *ssproto.Message_ChunkRequest:
return msg.ChunkRequest, nil
case *ssproto.Message_ChunkResponse:
return msg.ChunkResponse, nil
case *ssproto.Message_SnapshotsRequest:
return msg.SnapshotsRequest, nil
case *ssproto.Message_SnapshotsResponse:
return msg.SnapshotsResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// validateMsg validates a message.
func validateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -7,7 +7,6 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/p2p"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)
@@ -100,10 +99,8 @@ func TestStateSyncVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
bz := mustEncodeMsg(tc.msg)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -107,16 +107,22 @@ func (r *Reactor) Receive(e p2p.Envelope) {
return
}
err := validateMsg(e.Message)
msg, err := msgFromProto(e.Message.(*ssproto.Message))
if err != nil {
r.Logger.Error("Invalid message", "peer", e.Src, "msg", e.Message, "err", err)
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
err = validateMsg(msg)
if err != nil {
r.Logger.Error("Invalid message", "peer", e.Src, "msg", msg, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
switch e.ChannelID {
case SnapshotChannel:
switch msg := e.Message.(type) {
switch msg := msg.(type) {
case *ssproto.SnapshotsRequest:
snapshots, err := r.recentSnapshots(recentSnapshots)
if err != nil {
@@ -128,13 +134,13 @@ func (r *Reactor) Receive(e p2p.Envelope) {
"format", snapshot.Format, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: e.ChannelID,
Message: &ssproto.SnapshotsResponse{
Message: mustWrapToProto(&ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
},
}),
})
}
@@ -165,7 +171,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
}
case ChunkChannel:
switch msg := e.Message.(type) {
switch msg := msg.(type) {
case *ssproto.ChunkRequest:
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", e.Src.ID())
@@ -183,13 +189,13 @@ func (r *Reactor) Receive(e p2p.Envelope) {
"chunk", msg.Index, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkResponse{
Message: mustWrapToProto(&ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
},
}),
})
case *ssproto.ChunkResponse:
@@ -274,7 +280,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.Switch.NewBroadcast(p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
})
}

View File

@@ -65,7 +65,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
require.NoError(t, err)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
response = e.Message.(*ssproto.ChunkResponse)
response = e.Message.(*ssproto.Message).GetChunkResponse()
}).Return(true)
}
@@ -80,10 +80,10 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
}
})
r.Receive(p2p.Envelope{
r.NewReceive(p2p.Envelope{
ChannelID: ChunkChannel,
Src: peer,
Message: tc.request,
Message: mustWrapToProto(tc.request),
})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponse, response)
@@ -155,7 +155,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
require.NoError(t, err)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
responses = append(responses, e.Message.(*ssproto.Message).GetSnapshotsResponse())
}).Return(true)
}
@@ -170,11 +170,12 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
}
})
r.Receive(p2p.Envelope{
r.NewReceive(p2p.Envelope{
ChannelID: SnapshotChannel,
Src: peer,
Message: &ssproto.SnapshotsRequest{},
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
})
r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponses, responses)

View File

@@ -128,7 +128,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) {
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
}
peer.Send(e)
}
@@ -473,11 +473,11 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkRequest{
Message: mustWrapToProto(&ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
},
}),
})
}

View File

@@ -100,10 +100,7 @@ func TestSyncer_SyncAny(t *testing.T) {
peerA.On("ID").Return(p2p.ID("a"))
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerA)
@@ -113,10 +110,7 @@ func TestSyncer_SyncAny(t *testing.T) {
peerB.On("ID").Return(p2p.ID("b"))
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerB)
@@ -163,7 +157,7 @@ func TestSyncer_SyncAny(t *testing.T) {
onChunkRequest := func(args mock.Arguments) {
e, ok := args[0].(p2p.Envelope)
require.True(t, ok)
msg := e.Message.(*ssproto.ChunkRequest)
msg := e.Message.(*ssproto.Message).GetChunkRequest()
require.EqualValues(t, 1, msg.Height)
require.EqualValues(t, 1, msg.Format)
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))