p2p: re-implement the the wrapper abstraction to de-duplicate logic

This commit is contained in:
William Banfield
2022-10-20 15:19:32 -04:00
parent cbe179cc09
commit 0ea99d089b
27 changed files with 580 additions and 534 deletions

View File

@@ -19,55 +19,6 @@ const (
BlockResponseMessageFieldKeySize
)
func wrapMsg(pb proto.Message) (proto.Message, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
return &msg, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
return UnwrapMessage(pb)
}
func UnwrapMessage(pb *bcproto.Message) (proto.Message, error) {
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -150,18 +150,12 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msg, err := wrapMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: msg,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
})
// it's OK if send fails. will try later in poolRoutine
@@ -187,50 +181,30 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}
wm, err := wrapMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not convert msg to proto message", "err", err)
return false
}
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
Message: &bcproto.BlockResponse{Block: bl},
})
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
wm, err := wrapMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(e p2p.Envelope) {
msg, err := UnwrapMessage(e.Message.(*bcproto.Message))
if err != nil {
bcR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", msg)
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
@@ -242,18 +216,12 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
case *bcproto.StatusRequest:
// Send peer our state.
wm, err := wrapMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to proto message", "err", err)
return
}
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
})
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
@@ -300,14 +268,9 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
wm, err := wrapMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
Message: &bcproto.BlockRequest{Height: request.Height},
})
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
@@ -447,15 +410,9 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
wm, err := wrapMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto message", "err", err)
return fmt.Errorf("could not convert msg to proto message: %w", err)
}
bcR.Switch.Broadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
Message: &bcproto.StatusRequest{},
})
return nil
}

View File

@@ -26,6 +26,7 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
@@ -166,13 +167,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{prevote1}),
Message: &tmcons.Vote{prevote1.ToProto()},
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{prevote2}),
Message: &tmcons.Vote{prevote2.ToProto()},
ChannelID: VoteChannel,
})
}
@@ -526,23 +527,25 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
})
}
@@ -553,11 +556,11 @@ func sendProposalAndParts(
cs.mtx.Unlock()
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(&VoteMessage{prevote}),
Message: &tmcons.Vote{prevote.ToProto()},
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(&VoteMessage{precommit}),
Message: &tmcons.Vote{precommit.ToProto()},
})
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
@@ -95,7 +96,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{precommit}),
Message: &tmcons.Vote{precommit.ToProto()},
ChannelID: VoteChannel,
})
}

View File

@@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmmath "github.com/tendermint/tendermint/libs/math"
@@ -16,6 +14,8 @@ import (
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
//
// TODO: This needs to be removed, but WALToProto depends on this.
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@@ -143,14 +143,6 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
return &pb, nil
}
func MustMsgToProto(msg Message) *tmcons.Message {
m, err := MsgToProto(msg)
if err != nil {
panic(err)
}
return m
}
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
@@ -268,20 +260,6 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage

View File

@@ -7,8 +7,6 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -233,6 +231,16 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
return
}
if w, ok := e.Message.(p2p.Wrapper); ok {
var err error
e.Message, err = w.Wrap()
if err != nil {
conR.Logger.Error("Error wrapping message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
}
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
if err != nil {
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
@@ -296,15 +304,18 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
eMsg := &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID.ToProto(),
}
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: MustMsgToProto(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}),
Message: eMsg,
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
@@ -439,27 +450,28 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(nrsMsg),
Message: nrsMsg,
})
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
psh := rs.ProposalBlockParts.Header()
csMsg := &tmcons.NewValidBlock{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
BlockPartSetHeader: psh.ToProto(),
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.Broadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(csMsg),
Message: csMsg,
})
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &HasVoteMessage{
msg := &tmcons.HasVote{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
@@ -467,7 +479,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
}
conR.Switch.Broadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(msg),
Message: msg,
})
/*
// TODO: Make this broadcast more selective.
@@ -493,11 +505,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
nrsMsg = &tmcons.NewRoundStep{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
Step: uint32(rs.Step),
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.GetRound(),
}
@@ -509,7 +521,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
nrsMsg := makeRoundStepMessage(rs)
peer.Send(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(nrsMsg),
Message: nrsMsg,
})
}
@@ -549,15 +561,18 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
parts, err := part.ToProto()
if err != nil {
panic(err)
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: *parts,
},
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
@@ -604,11 +619,10 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
}) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
@@ -619,15 +633,14 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
})
}
continue OUTER_LOOP
@@ -665,15 +678,19 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
pp, err := part.ToProto()
if err != nil {
logger.Error("Could not convert part to proto", "index", index, "error", err)
return
}
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.BlockPart{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: *pp,
},
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
@@ -836,12 +853,12 @@ OUTER_LOOP:
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23,
}),
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -854,15 +871,14 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23,
}),
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -878,12 +894,12 @@ OUTER_LOOP:
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23,
}),
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -901,12 +917,12 @@ OUTER_LOOP:
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID,
}),
BlockID: commit.BlockID.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -1121,11 +1137,12 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.Vote{
Vote: vote.ToProto(),
},
}) {
ps.SetHasVote(vote)
return true
@@ -1492,15 +1509,6 @@ func init() {
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
func decodeMsg(bz []byte) (msg Message, err error) {
pb := &tmcons.Message{}
if err = proto.Unmarshal(bz, pb); err != nil {
return msg, err
}
return MsgFromProto(pb)
}
//-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.

View File

@@ -33,6 +33,7 @@ import (
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
statemocks "github.com/tendermint/tendermint/state/mocks"
@@ -265,15 +266,18 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
reactor.InitPeer(peer)
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
reactor.AddPeer(peer)
})
}
@@ -288,20 +292,17 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
// we should call InitPeer here
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.NewReceive(p2p.Envelope{
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: MustMsgToProto(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType}),
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
})
}

View File

@@ -214,24 +214,6 @@ type PeerState interface {
GetHeight() int64
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func encodeMsg(evis []types.Evidence) ([]byte, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
if err != nil {
return nil, err
}
evi[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: evi,
}
return epl.Marshal()
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
@@ -249,31 +231,6 @@ func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
return &epl, nil
}
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := tmproto.EvidenceList{}
if err := lm.Unmarshal(bz); err != nil {
return nil, err
}
evis = make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
if err != nil {
return nil, err
}
evis[i] = ev
}
for i, ev := range evis {
if err := ev.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err)
}
}
return evis, nil
}
func evidenceListFromProto(m proto.Message) ([]types.Evidence, error) {
lm := m.(*tmproto.EvidenceList)

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -157,26 +156,32 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
msg, err := msgFromProto(e.Message)
if err != nil {
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
memR.Switch.StopPeerForError(e.Src, err)
return
}
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
return
}
// broadcasting happens from go routines per peer
@@ -262,39 +267,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}
func decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
return msgFromProto(&msg)
}
func msgFromProto(m proto.Message) (TxsMessage, error) {
msg := m.(*protomem.Message)
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -22,6 +22,7 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -279,7 +280,12 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.Receive(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Src: peer,
Message: &protomem.Txs{
Txs: [][]byte{{0x01, 0x02, 0x03}},
}})
reactor.AddPeer(peer)
}
}

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -157,26 +155,34 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
msg, err := protoToMsg(e.Message)
if err != nil {
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
memR.Switch.StopPeerForError(e.Src, err)
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
return
}
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if err == mempool.ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
@@ -267,42 +273,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
//-----------------------------------------------------------------------------
// Messages
func decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
return protoToMsg(&msg)
}
func protoToMsg(m proto.Message) (TxsMessage, error) {
msg := m.(*protomem.Message)
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
//-------------------------------------
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
return r
}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(e p2p.Envelope) {}

View File

@@ -261,7 +261,15 @@ func (p *peer) Send(e Envelope) bool {
} else if !p.hasChannel(e.ChannelID) {
return false
}
msgBytes, err := proto.Marshal(e.Message)
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
panic(err) // Q: should this panic or error?
}
@@ -284,7 +292,15 @@ func (p *peer) TrySend(e Envelope) bool {
} else if !p.hasChannel(e.ChannelID) {
return false
}
msgBytes, err := proto.Marshal(e.Message)
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
panic(err)
}
@@ -420,6 +436,13 @@ func createMConnection(
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
// TODO(williambanfield) add error log line.
return
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
reactor.Receive(Envelope{

View File

@@ -6,8 +6,6 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
@@ -238,15 +236,9 @@ func (r *Reactor) logErrAddrBook(err error) {
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(e p2p.Envelope) {
msg, err := msgFromProto(e.Message)
if err != nil {
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", msg)
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
@@ -351,7 +343,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
r.requestsSent.Set(id, struct{}{})
p.Send(p2p.Envelope{
ChannelID: PexChannel,
Message: mustMsgToWrappedProto(&tmp2p.PexRequest{}),
Message: &tmp2p.PexRequest{},
})
}
@@ -412,7 +404,7 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
e := p2p.Envelope{
ChannelID: PexChannel,
Message: mustMsgToWrappedProto(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}),
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
}
p.Send(e)
}
@@ -771,51 +763,3 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
book.MarkAttempt(addr)
}
}
//-----------------------------------------------------------------------------
// Messages
// mustEncode proto encodes a tmp2p.Message
func mustEncode(pb proto.Message) []byte {
msg := mustMsgToWrappedProto(pb)
bz, err := proto.Marshal(msg)
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func mustMsgToWrappedProto(pb proto.Message) proto.Message {
msg := tmp2p.Message{}
switch pb := pb.(type) {
case *tmp2p.PexRequest:
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
case *tmp2p.PexAddrs:
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
default:
panic(fmt.Sprintf("Unknown message type %T", pb))
}
return &msg
}
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &tmp2p.Message{}
err := pb.Unmarshal(bz)
if err != nil {
return nil, err
}
return msgFromProto(pb)
}
func msgFromProto(m proto.Message) (proto.Message, error) {
pb := m.(*tmp2p.Message)
switch msg := pb.Sum.(type) {
case *tmp2p.Message_PexRequest:
return msg.PexRequest, nil
case *tmp2p.Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -132,12 +132,10 @@ func TestPEXReactorReceive(t *testing.T) {
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.Equal(t, size+1, book.Size())
r.Receive(PexChannel, peer, mustEncode(&tmp2p.PexRequest{})) // should not panic.
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
}
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
@@ -156,23 +154,19 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
require.True(t, book.HasAddress(peerAddr))
id := string(peer.ID())
msg := mustEncode(&tmp2p.PexRequest{})
// first time creates the entry
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// next time sets the last time value
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// third time is too many too soon - peer is removed
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peerAddr))
@@ -199,14 +193,12 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
// receive some addrs. should clear the request
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// receiving more unsolicited addrs causes a disconnect and ban
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peer.SocketAddr()))
}
@@ -492,8 +484,12 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
pexR.RequestAddrs(peer)
size := book.Size()
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
pexR.Receive(PexChannel, peer, msg)
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
pexR.Receive(p2p.Envelope{
ChannelID: PexChannel,
Src: peer,
Message: msg,
})
assert.Equal(t, size, book.Size())
pexR.AddPeer(peer)
@@ -701,7 +697,10 @@ func TestPexVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
bz := mustEncode(tc.msg)
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -3,6 +3,7 @@ package p2p
import (
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
type ChannelDescriptor = conn.ChannelDescriptor
@@ -14,3 +15,26 @@ type Envelope struct {
Message proto.Message // message payload
ChannelID byte
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,
// such that reactors do not have to do this themselves.
type Unwrapper interface {
proto.Message
// Unwrap will unwrap the inner message contained in this message.
Unwrap() (proto.Message, error)
}
type Wrapper interface {
proto.Message
// Wrap will take the underlying message and wrap it in its wrapper type.
Wrap() (proto.Message, error)
}
var (
_ Wrapper = &tmp2p.PexRequest{}
_ Wrapper = &tmp2p.PexAddrs{}
)

View File

@@ -0,0 +1,73 @@
package blocksync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &StatusRequest{}
var _ p2p.Wrapper = &StatusResponse{}
var _ p2p.Wrapper = &NoBlockResponse{}
var _ p2p.Wrapper = &BlockResponse{}
var _ p2p.Wrapper = &BlockRequest{}
const (
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
)
func (m *BlockRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockRequest{BlockRequest: m}
return bm, nil
}
func (m *BlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockResponse{BlockResponse: m}
return bm, nil
}
func (m *NoBlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_NoBlockResponse{NoBlockResponse: m}
return bm, nil
}
func (m *StatusRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusRequest{StatusRequest: m}
return bm, nil
}
func (m *StatusResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusResponse{StatusResponse: m}
return bm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_BlockRequest:
return m.GetBlockRequest(), nil
case *Message_BlockResponse:
return m.GetBlockResponse(), nil
case *Message_NoBlockResponse:
return m.GetNoBlockResponse(), nil
case *Message_StatusRequest:
return m.GetStatusRequest(), nil
case *Message_StatusResponse:
return m.GetStatusResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -0,0 +1,109 @@
package consensus
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &VoteSetBits{}
var _ p2p.Wrapper = &VoteSetMaj23{}
var _ p2p.Wrapper = &Vote{}
var _ p2p.Wrapper = &ProposalPOL{}
var _ p2p.Wrapper = &Proposal{}
var _ p2p.Wrapper = &NewValidBlock{}
var _ p2p.Wrapper = &NewRoundStep{}
var _ p2p.Wrapper = &HasVote{}
var _ p2p.Wrapper = &BlockPart{}
func (m *VoteSetBits) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetBits{VoteSetBits: m}
return cm, nil
}
func (m *VoteSetMaj23) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetMaj23{VoteSetMaj23: m}
return cm, nil
}
func (m *HasVote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_HasVote{HasVote: m}
return cm, nil
}
func (m *Vote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Vote{Vote: m}
return cm, nil
}
func (m *BlockPart) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_BlockPart{BlockPart: m}
return cm, nil
}
func (m *ProposalPOL) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_ProposalPol{ProposalPol: m}
return cm, nil
}
func (m *Proposal) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Proposal{Proposal: m}
return cm, nil
}
func (m *NewValidBlock) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewValidBlock{NewValidBlock: m}
return cm, nil
}
func (m *NewRoundStep) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewRoundStep{NewRoundStep: m}
return cm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_NewRoundStep:
return m.GetNewRoundStep(), nil
case *Message_NewValidBlock:
return m.GetNewValidBlock(), nil
case *Message_Proposal:
return m.GetProposal(), nil
case *Message_ProposalPol:
return m.GetProposalPol(), nil
case *Message_BlockPart:
return m.GetBlockPart(), nil
case *Message_Vote:
return m.GetVote(), nil
case *Message_HasVote:
return m.GetHasVote(), nil
case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -0,0 +1,30 @@
package mempool
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &Txs{}
var _ p2p.Unwrapper = &Message{}
// Wrap implements the p2p Wrapper interface and wraps a mempool message.
func (m *Txs) Wrap() (proto.Message, error) {
return &Message{
Sum: &Message_Txs{Txs: m},
}, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_Txs:
return m.GetTxs(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -0,0 +1,32 @@
package p2p
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
)
func (m *PexAddrs) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexAddrs{PexAddrs: m}
return pm, nil
}
func (m *PexRequest) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexRequest{PexRequest: m}
return pm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_PexRequest:
return msg.PexRequest, nil
case *Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown pex message: %T", msg)
}
}

View File

@@ -0,0 +1,58 @@
package statesync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &ChunkRequest{}
var _ p2p.Wrapper = &ChunkResponse{}
var _ p2p.Wrapper = &SnapshotsRequest{}
var _ p2p.Wrapper = &SnapshotsResponse{}
func (m *SnapshotsResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsResponse{SnapshotsResponse: m}
return sm, nil
}
func (m *SnapshotsRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsRequest{SnapshotsRequest: m}
return sm, nil
}
func (m *ChunkResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkResponse{ChunkResponse: m}
return sm, nil
}
func (m *ChunkRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkRequest{ChunkRequest: m}
return sm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_ChunkRequest:
return m.GetChunkRequest(), nil
case *Message_ChunkResponse:
return m.GetChunkResponse(), nil
case *Message_SnapshotsRequest:
return m.GetSnapshotsRequest(), nil
case *Message_SnapshotsResponse:
return m.GetSnapshotsResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -16,58 +16,6 @@ const (
chunkMsgSize = int(16e6)
)
// mustEncodeMsg encodes a Protobuf message, panicing on error.
func mustEncodeMsg(pb proto.Message) []byte {
msg := mustWrapToProto(pb)
bz, err := proto.Marshal(msg)
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func mustWrapToProto(pb proto.Message) proto.Message {
msg := ssproto.Message{}
switch pb := pb.(type) {
case *ssproto.ChunkRequest:
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
case *ssproto.ChunkResponse:
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
case *ssproto.SnapshotsRequest:
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
case *ssproto.SnapshotsResponse:
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
default:
panic(fmt.Errorf("unknown message type %T", pb))
}
return &msg
}
// decodeMsg decodes a Protobuf message.
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &ssproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
return msgFromProto(pb)
}
func msgFromProto(pb *ssproto.Message) (proto.Message, error) {
switch msg := pb.Sum.(type) {
case *ssproto.Message_ChunkRequest:
return msg.ChunkRequest, nil
case *ssproto.Message_ChunkResponse:
return msg.ChunkResponse, nil
case *ssproto.Message_SnapshotsRequest:
return msg.SnapshotsRequest, nil
case *ssproto.Message_SnapshotsResponse:
return msg.SnapshotsResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// validateMsg validates a message.
func validateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -7,6 +7,7 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/p2p"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)
@@ -99,8 +100,10 @@ func TestStateSyncVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
bz := mustEncodeMsg(tc.msg)
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -107,22 +107,16 @@ func (r *Reactor) Receive(e p2p.Envelope) {
return
}
msg, err := msgFromProto(e.Message.(*ssproto.Message))
err := validateMsg(e.Message)
if err != nil {
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
err = validateMsg(msg)
if err != nil {
r.Logger.Error("Invalid message", "peer", e.Src, "msg", msg, "err", err)
r.Logger.Error("Invalid message", "peer", e.Src, "msg", e.Message, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
switch e.ChannelID {
case SnapshotChannel:
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *ssproto.SnapshotsRequest:
snapshots, err := r.recentSnapshots(recentSnapshots)
if err != nil {
@@ -134,13 +128,13 @@ func (r *Reactor) Receive(e p2p.Envelope) {
"format", snapshot.Format, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: e.ChannelID,
Message: mustWrapToProto(&ssproto.SnapshotsResponse{
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
}),
},
})
}
@@ -171,7 +165,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
}
case ChunkChannel:
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *ssproto.ChunkRequest:
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", e.Src.ID())
@@ -189,13 +183,13 @@ func (r *Reactor) Receive(e p2p.Envelope) {
"chunk", msg.Index, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: mustWrapToProto(&ssproto.ChunkResponse{
Message: &ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
}),
},
})
case *ssproto.ChunkResponse:
@@ -280,7 +274,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.Switch.Broadcast(p2p.Envelope{
ChannelID: SnapshotChannel,
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
Message: &ssproto.SnapshotsRequest{},
})
}

View File

@@ -65,7 +65,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
require.NoError(t, err)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
response = e.Message.(*ssproto.Message).GetChunkResponse()
response = e.Message.(*ssproto.ChunkResponse)
}).Return(true)
}
@@ -80,10 +80,10 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
}
})
r.NewReceive(p2p.Envelope{
r.Receive(p2p.Envelope{
ChannelID: ChunkChannel,
Src: peer,
Message: mustWrapToProto(tc.request),
Message: tc.request,
})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponse, response)
@@ -155,7 +155,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
require.NoError(t, err)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
responses = append(responses, e.Message.(*ssproto.Message).GetSnapshotsResponse())
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
}).Return(true)
}
@@ -170,12 +170,11 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
}
})
r.NewReceive(p2p.Envelope{
r.Receive(p2p.Envelope{
ChannelID: SnapshotChannel,
Src: peer,
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
Message: &ssproto.SnapshotsRequest{},
})
r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponses, responses)

View File

@@ -128,7 +128,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) {
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
Message: &ssproto.SnapshotsRequest{},
}
peer.Send(e)
}
@@ -473,11 +473,11 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: mustWrapToProto(&ssproto.ChunkRequest{
Message: &ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
}),
},
})
}

View File

@@ -100,7 +100,10 @@ func TestSyncer_SyncAny(t *testing.T) {
peerA.On("ID").Return(p2p.ID("a"))
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerA)
@@ -110,7 +113,10 @@ func TestSyncer_SyncAny(t *testing.T) {
peerB.On("ID").Return(p2p.ID("b"))
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerB)
@@ -157,7 +163,7 @@ func TestSyncer_SyncAny(t *testing.T) {
onChunkRequest := func(args mock.Arguments) {
e, ok := args[0].(p2p.Envelope)
require.True(t, ok)
msg := e.Message.(*ssproto.Message).GetChunkRequest()
msg := e.Message.(*ssproto.ChunkRequest)
require.EqualValues(t, 1, msg.Height)
require.EqualValues(t, 1, msg.Format)
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))