mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-15 17:22:50 +00:00
Compare commits
51 Commits
wb/impleme
...
wb/envelop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce62eeb3d9 | ||
|
|
4d47ec1753 | ||
|
|
f8d37520e9 | ||
|
|
d0bfcef698 | ||
|
|
e18106eeda | ||
|
|
922ba6ab67 | ||
|
|
28aeb39bab | ||
|
|
4dc961ec0c | ||
|
|
68fae0c53d | ||
|
|
934594f231 | ||
|
|
934bf233c9 | ||
|
|
78fbb5db08 | ||
|
|
7e5f932402 | ||
|
|
b1145cf769 | ||
|
|
eda5595a11 | ||
|
|
7fb6cef26a | ||
|
|
5dea81f2ac | ||
|
|
ef17093bb4 | ||
|
|
41df563982 | ||
|
|
eef513acf8 | ||
|
|
5e513fbb4f | ||
|
|
d677121ae0 | ||
|
|
d2fb6835c8 | ||
|
|
ae266b2da6 | ||
|
|
3e9223ba79 | ||
|
|
d2fe1d3b36 | ||
|
|
396e3d80ac | ||
|
|
1919727904 | ||
|
|
0dcec4b545 | ||
|
|
4daee74d9c | ||
|
|
1053a49b4e | ||
|
|
7b41c03409 | ||
|
|
e20d28429d | ||
|
|
14510c4eb0 | ||
|
|
c529a902eb | ||
|
|
adbcd0c450 | ||
|
|
8644f69aa4 | ||
|
|
fd234903b6 | ||
|
|
cfe2ca68da | ||
|
|
2786e3db55 | ||
|
|
7e107c45fb | ||
|
|
64c18a4406 | ||
|
|
73d0f8116a | ||
|
|
01db5dc019 | ||
|
|
5795877dbc | ||
|
|
7f9e609720 | ||
|
|
833e4a15d4 | ||
|
|
8989fb4a79 | ||
|
|
6c81ecb1b5 | ||
|
|
069f460e16 | ||
|
|
c77898aad9 |
@@ -19,6 +19,55 @@ const (
|
||||
BlockResponseMessageFieldKeySize
|
||||
)
|
||||
|
||||
func wrapMsg(pb proto.Message) (proto.Message, error) {
|
||||
msg := bcproto.Message{}
|
||||
|
||||
switch pb := pb.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
|
||||
case *bcproto.BlockResponse:
|
||||
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
|
||||
case *bcproto.NoBlockResponse:
|
||||
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
|
||||
case *bcproto.StatusRequest:
|
||||
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
|
||||
case *bcproto.StatusResponse:
|
||||
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", pb)
|
||||
}
|
||||
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// DecodeMsg decodes a Protobuf message.
|
||||
func DecodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &bcproto.Message{}
|
||||
|
||||
err := proto.Unmarshal(bz, pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return UnwrapMessage(pb)
|
||||
}
|
||||
|
||||
func UnwrapMessage(pb *bcproto.Message) (proto.Message, error) {
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *bcproto.Message_BlockRequest:
|
||||
return msg.BlockRequest, nil
|
||||
case *bcproto.Message_BlockResponse:
|
||||
return msg.BlockResponse, nil
|
||||
case *bcproto.Message_NoBlockResponse:
|
||||
return msg.NoBlockResponse, nil
|
||||
case *bcproto.Message_StatusRequest:
|
||||
return msg.StatusRequest, nil
|
||||
case *bcproto.Message_StatusResponse:
|
||||
return msg.StatusResponse, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateMsg validates a message.
|
||||
func ValidateMsg(pb proto.Message) error {
|
||||
if pb == nil {
|
||||
|
||||
@@ -150,12 +150,18 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
|
||||
// AddPeer implements Reactor by sending our state to peer.
|
||||
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
msg, err := wrapMsg(&bcproto.StatusResponse{
|
||||
Base: bcR.store.Base(),
|
||||
Height: bcR.store.Height(),
|
||||
})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Base: bcR.store.Base(),
|
||||
Height: bcR.store.Height(),
|
||||
},
|
||||
Message: msg,
|
||||
})
|
||||
// it's OK if send fails. will try later in poolRoutine
|
||||
|
||||
@@ -181,30 +187,50 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
|
||||
return false
|
||||
}
|
||||
|
||||
wm, err := wrapMsg(&bcproto.BlockResponse{Block: bl})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to proto message", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return src.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.BlockResponse{Block: bl},
|
||||
Message: wm,
|
||||
})
|
||||
}
|
||||
|
||||
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
|
||||
|
||||
wm, err := wrapMsg(&bcproto.NoBlockResponse{Height: msg.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return src.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.NoBlockResponse{Height: msg.Height},
|
||||
Message: wm,
|
||||
})
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling 4 types of messages (look below).
|
||||
func (bcR *Reactor) Receive(e p2p.Envelope) {
|
||||
if err := ValidateMsg(e.Message); err != nil {
|
||||
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
|
||||
msg, err := UnwrapMessage(e.Message.(*bcproto.Message))
|
||||
if err != nil {
|
||||
bcR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
bcR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
|
||||
if err = ValidateMsg(msg); err != nil {
|
||||
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
|
||||
bcR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
switch msg := e.Message.(type) {
|
||||
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", msg)
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
bcR.respondToPeer(msg, e.Src)
|
||||
case *bcproto.BlockResponse:
|
||||
@@ -216,12 +242,18 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
|
||||
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
|
||||
case *bcproto.StatusRequest:
|
||||
// Send peer our state.
|
||||
wm, err := wrapMsg(&bcproto.StatusResponse{
|
||||
Height: bcR.store.Height(),
|
||||
Base: bcR.store.Base(),
|
||||
})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to proto message", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
e.Src.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Height: bcR.store.Height(),
|
||||
Base: bcR.store.Base(),
|
||||
},
|
||||
Message: wm,
|
||||
})
|
||||
case *bcproto.StatusResponse:
|
||||
// Got a peer status. Unverified.
|
||||
@@ -268,9 +300,14 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
|
||||
if peer == nil {
|
||||
continue
|
||||
}
|
||||
wm, err := wrapMsg(&bcproto.BlockRequest{Height: request.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to proto", "err", err)
|
||||
continue
|
||||
}
|
||||
queued := peer.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.BlockRequest{Height: request.Height},
|
||||
Message: wm,
|
||||
})
|
||||
if !queued {
|
||||
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
|
||||
@@ -410,9 +447,15 @@ FOR_LOOP:
|
||||
|
||||
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
|
||||
func (bcR *Reactor) BroadcastStatusRequest() error {
|
||||
wm, err := wrapMsg(&bcproto.StatusRequest{})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to proto message", "err", err)
|
||||
return fmt.Errorf("could not convert msg to proto message: %w", err)
|
||||
}
|
||||
bcR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusRequest{},
|
||||
Message: wm,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ import (
|
||||
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
|
||||
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
@@ -167,13 +166,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
if i < len(peerList)/2 {
|
||||
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
|
||||
peer.Send(p2p.Envelope{
|
||||
Message: &tmcons.Vote{prevote1.ToProto()},
|
||||
Message: MustMsgToProto(&VoteMessage{prevote1}),
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
} else {
|
||||
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
|
||||
peer.Send(p2p.Envelope{
|
||||
Message: &tmcons.Vote{prevote2.ToProto()},
|
||||
Message: MustMsgToProto(&VoteMessage{prevote2}),
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
}
|
||||
@@ -527,25 +526,23 @@ func sendProposalAndParts(
|
||||
parts *types.PartSet,
|
||||
) {
|
||||
// proposal
|
||||
msg := &ProposalMessage{Proposal: proposal}
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
|
||||
Message: MustMsgToProto(msg),
|
||||
})
|
||||
|
||||
// parts
|
||||
for i := 0; i < int(parts.Total()); i++ {
|
||||
part := parts.GetPart(i)
|
||||
pp, err := part.ToProto()
|
||||
if err != nil {
|
||||
panic(err) // TODO: wbanfield better error handling
|
||||
msg := &BlockPartMessage{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
Round: round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
}
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
Round: round, // This tells peer that this part applies to us.
|
||||
Part: *pp,
|
||||
},
|
||||
Message: MustMsgToProto(msg),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -556,11 +553,11 @@ func sendProposalAndParts(
|
||||
cs.mtx.Unlock()
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{prevote.ToProto()},
|
||||
Message: MustMsgToProto(&VoteMessage{prevote}),
|
||||
})
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{precommit.ToProto()},
|
||||
Message: MustMsgToProto(&VoteMessage{precommit}),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -96,7 +95,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
|
||||
for _, peer := range peers {
|
||||
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
|
||||
peer.Send(p2p.Envelope{
|
||||
Message: &tmcons.Vote{precommit.ToProto()},
|
||||
Message: MustMsgToProto(&VoteMessage{precommit}),
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/bits"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
@@ -14,8 +16,6 @@ import (
|
||||
)
|
||||
|
||||
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
|
||||
//
|
||||
// TODO: This needs to be removed, but WALToProto depends on this.
|
||||
func MsgToProto(msg Message) (*tmcons.Message, error) {
|
||||
if msg == nil {
|
||||
return nil, errors.New("consensus: message is nil")
|
||||
@@ -143,6 +143,14 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
|
||||
return &pb, nil
|
||||
}
|
||||
|
||||
func MustMsgToProto(msg Message) *tmcons.Message {
|
||||
m, err := MsgToProto(msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// MsgFromProto takes a consensus proto message and returns the native go type
|
||||
func MsgFromProto(msg *tmcons.Message) (Message, error) {
|
||||
if msg == nil {
|
||||
@@ -260,6 +268,20 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
|
||||
return pb, nil
|
||||
}
|
||||
|
||||
// MustEncode takes the reactors msg, makes it proto and marshals it
|
||||
// this mimics `MustMarshalBinaryBare` in that is panics on error
|
||||
func MustEncode(msg Message) []byte {
|
||||
pb, err := MsgToProto(msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
enc, err := proto.Marshal(pb)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return enc
|
||||
}
|
||||
|
||||
// WALToProto takes a WAL message and return a proto walMessage and error
|
||||
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
|
||||
var pb tmcons.WALMessage
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/bits"
|
||||
tmevents "github.com/tendermint/tendermint/libs/events"
|
||||
@@ -231,16 +233,6 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
|
||||
return
|
||||
}
|
||||
|
||||
if w, ok := e.Message.(p2p.Wrapper); ok {
|
||||
var err error
|
||||
e.Message, err = w.Wrap()
|
||||
if err != nil {
|
||||
conR.Logger.Error("Error wrapping message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
conR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
|
||||
if err != nil {
|
||||
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
@@ -304,18 +296,15 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
|
||||
default:
|
||||
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
|
||||
}
|
||||
eMsg := &tmcons.VoteSetBits{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
BlockID: msg.BlockID.ToProto(),
|
||||
}
|
||||
if votes := ourVotes.ToProto(); votes != nil {
|
||||
eMsg.Votes = *votes
|
||||
}
|
||||
e.Src.TrySend(p2p.Envelope{
|
||||
ChannelID: VoteSetBitsChannel,
|
||||
Message: eMsg,
|
||||
Message: MustMsgToProto(&VoteSetBitsMessage{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
BlockID: msg.BlockID,
|
||||
Votes: ourVotes,
|
||||
}),
|
||||
})
|
||||
default:
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
@@ -450,28 +439,28 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: nrsMsg,
|
||||
Message: MustMsgToProto(nrsMsg),
|
||||
})
|
||||
}
|
||||
|
||||
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
|
||||
psh := rs.ProposalBlockParts.Header()
|
||||
csMsg := &tmcons.NewValidBlock{
|
||||
csMsg := &NewValidBlockMessage{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
BlockPartSetHeader: psh.ToProto(),
|
||||
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
|
||||
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
|
||||
BlockParts: rs.ProposalBlockParts.BitArray(),
|
||||
IsCommit: rs.Step == cstypes.RoundStepCommit,
|
||||
}
|
||||
MsgToProto(csMsg)
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: csMsg,
|
||||
Message: MustMsgToProto(csMsg),
|
||||
})
|
||||
}
|
||||
|
||||
// Broadcasts HasVoteMessage to peers that care.
|
||||
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
msg := &tmcons.HasVote{
|
||||
msg := &HasVoteMessage{
|
||||
Height: vote.Height,
|
||||
Round: vote.Round,
|
||||
Type: vote.Type,
|
||||
@@ -479,7 +468,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
}
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: msg,
|
||||
Message: MustMsgToProto(msg),
|
||||
})
|
||||
/*
|
||||
// TODO: Make this broadcast more selective.
|
||||
@@ -505,11 +494,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
*/
|
||||
}
|
||||
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
|
||||
nrsMsg = &tmcons.NewRoundStep{
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
|
||||
nrsMsg = &NewRoundStepMessage{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Step: uint32(rs.Step),
|
||||
Step: rs.Step,
|
||||
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
|
||||
LastCommitRound: rs.LastCommit.GetRound(),
|
||||
}
|
||||
@@ -521,7 +510,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: nrsMsg,
|
||||
Message: MustMsgToProto(nrsMsg),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -561,18 +550,15 @@ OUTER_LOOP:
|
||||
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
|
||||
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
|
||||
part := rs.ProposalBlockParts.GetPart(index)
|
||||
parts, err := part.ToProto()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
msg := &BlockPartMessage{
|
||||
Height: rs.Height, // This tells peer that this part applies to us.
|
||||
Round: rs.Round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
}
|
||||
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
|
||||
if peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: rs.Height, // This tells peer that this part applies to us.
|
||||
Round: rs.Round, // This tells peer that this part applies to us.
|
||||
Part: *parts,
|
||||
},
|
||||
Message: MustMsgToProto(msg),
|
||||
}) {
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
}
|
||||
@@ -619,10 +605,11 @@ OUTER_LOOP:
|
||||
if rs.Proposal != nil && !prs.Proposal {
|
||||
// Proposal: share the proposal metadata with peer.
|
||||
{
|
||||
msg := &ProposalMessage{Proposal: rs.Proposal}
|
||||
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
|
||||
if peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
|
||||
Message: MustMsgToProto(msg),
|
||||
}) {
|
||||
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
|
||||
ps.SetHasProposal(rs.Proposal)
|
||||
@@ -633,14 +620,15 @@ OUTER_LOOP:
|
||||
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
|
||||
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
|
||||
if 0 <= rs.Proposal.POLRound {
|
||||
msg := &ProposalPOLMessage{
|
||||
Height: rs.Height,
|
||||
ProposalPOLRound: rs.Proposal.POLRound,
|
||||
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
|
||||
}
|
||||
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.ProposalPOL{
|
||||
Height: rs.Height,
|
||||
ProposalPolRound: rs.Proposal.POLRound,
|
||||
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
|
||||
},
|
||||
Message: MustMsgToProto(msg),
|
||||
})
|
||||
}
|
||||
continue OUTER_LOOP
|
||||
@@ -678,19 +666,15 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
|
||||
return
|
||||
}
|
||||
// Send the part
|
||||
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
|
||||
pp, err := part.ToProto()
|
||||
if err != nil {
|
||||
logger.Error("Could not convert part to proto", "index", index, "error", err)
|
||||
return
|
||||
msg := &BlockPartMessage{
|
||||
Height: prs.Height, // Not our height, so it doesn't matter.
|
||||
Round: prs.Round, // Not our height, so it doesn't matter.
|
||||
Part: part,
|
||||
}
|
||||
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
|
||||
if peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: prs.Height, // Not our height, so it doesn't matter.
|
||||
Round: prs.Round, // Not our height, so it doesn't matter.
|
||||
Part: *pp,
|
||||
},
|
||||
Message: MustMsgToProto(msg),
|
||||
}) {
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
} else {
|
||||
@@ -853,12 +837,12 @@ OUTER_LOOP:
|
||||
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Message: MustMsgToProto(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrevoteType,
|
||||
BlockID: maj23.ToProto(),
|
||||
},
|
||||
BlockID: maj23,
|
||||
}),
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
@@ -871,14 +855,15 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
|
||||
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Message: MustMsgToProto(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
BlockID: maj23.ToProto(),
|
||||
},
|
||||
BlockID: maj23,
|
||||
}),
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
@@ -894,12 +879,12 @@ OUTER_LOOP:
|
||||
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Message: MustMsgToProto(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.ProposalPOLRound,
|
||||
Type: tmproto.PrevoteType,
|
||||
BlockID: maj23.ToProto(),
|
||||
},
|
||||
BlockID: maj23,
|
||||
}),
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
@@ -917,12 +902,12 @@ OUTER_LOOP:
|
||||
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Message: MustMsgToProto(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: commit.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
BlockID: commit.BlockID.ToProto(),
|
||||
},
|
||||
BlockID: commit.BlockID,
|
||||
}),
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
@@ -1137,12 +1122,11 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
|
||||
// Returns true if vote was sent.
|
||||
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
|
||||
if vote, ok := ps.PickVoteToSend(votes); ok {
|
||||
msg := &VoteMessage{vote}
|
||||
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
|
||||
if ps.peer.Send(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{
|
||||
Vote: vote.ToProto(),
|
||||
},
|
||||
Message: MustMsgToProto(msg),
|
||||
}) {
|
||||
ps.SetHasVote(vote)
|
||||
return true
|
||||
@@ -1509,6 +1493,15 @@ func init() {
|
||||
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (msg Message, err error) {
|
||||
pb := &tmcons.Message{}
|
||||
if err = proto.Unmarshal(bz, pb); err != nil {
|
||||
return msg, err
|
||||
}
|
||||
|
||||
return MsgFromProto(pb)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
|
||||
|
||||
@@ -33,7 +33,6 @@ import (
|
||||
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
p2pmock "github.com/tendermint/tendermint/p2p/mock"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
statemocks "github.com/tendermint/tendermint/state/mocks"
|
||||
@@ -266,18 +265,15 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
|
||||
var (
|
||||
reactor = reactors[0]
|
||||
peer = p2pmock.NewPeer(nil)
|
||||
msg = MustEncode(&HasVoteMessage{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType})
|
||||
)
|
||||
|
||||
reactor.InitPeer(peer)
|
||||
|
||||
// simulate switch calling Receive before AddPeer
|
||||
assert.NotPanics(t, func() {
|
||||
reactor.Receive(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Src: peer,
|
||||
Message: &tmcons.HasVote{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType},
|
||||
})
|
||||
reactor.Receive(StateChannel, peer, msg)
|
||||
reactor.AddPeer(peer)
|
||||
})
|
||||
}
|
||||
@@ -292,17 +288,20 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
|
||||
var (
|
||||
reactor = reactors[0]
|
||||
peer = p2pmock.NewPeer(nil)
|
||||
msg = MustEncode(&HasVoteMessage{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType})
|
||||
)
|
||||
|
||||
// we should call InitPeer here
|
||||
|
||||
// simulate switch calling Receive before AddPeer
|
||||
assert.Panics(t, func() {
|
||||
reactor.Receive(p2p.Envelope{
|
||||
reactor.Receive(StateChannel, peer, msg)
|
||||
reactor.NewReceive(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Src: peer,
|
||||
Message: &tmcons.HasVote{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType},
|
||||
Message: MustMsgToProto(&HasVoteMessage{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType}),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -214,6 +214,24 @@ type PeerState interface {
|
||||
GetHeight() int64
|
||||
}
|
||||
|
||||
// encodemsg takes a array of evidence
|
||||
// returns the byte encoding of the List Message
|
||||
func encodeMsg(evis []types.Evidence) ([]byte, error) {
|
||||
evi := make([]tmproto.Evidence, len(evis))
|
||||
for i := 0; i < len(evis); i++ {
|
||||
ev, err := types.EvidenceToProto(evis[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evi[i] = *ev
|
||||
}
|
||||
epl := tmproto.EvidenceList{
|
||||
Evidence: evi,
|
||||
}
|
||||
|
||||
return epl.Marshal()
|
||||
}
|
||||
|
||||
// encodemsg takes a array of evidence
|
||||
// returns the byte encoding of the List Message
|
||||
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
|
||||
@@ -231,6 +249,31 @@ func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
|
||||
return &epl, nil
|
||||
}
|
||||
|
||||
// decodemsg takes an array of bytes
|
||||
// returns an array of evidence
|
||||
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
|
||||
lm := tmproto.EvidenceList{}
|
||||
if err := lm.Unmarshal(bz); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evis = make([]types.Evidence, len(lm.Evidence))
|
||||
for i := 0; i < len(lm.Evidence); i++ {
|
||||
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evis[i] = ev
|
||||
}
|
||||
|
||||
for i, ev := range evis {
|
||||
if err := ev.ValidateBasic(); err != nil {
|
||||
return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
return evis, nil
|
||||
}
|
||||
func evidenceListFromProto(m proto.Message) ([]types.Evidence, error) {
|
||||
lm := m.(*tmproto.EvidenceList)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -156,33 +157,27 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(e p2p.Envelope) {
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
switch msg := e.Message.(type) {
|
||||
case *protomem.Txs:
|
||||
protoTxs := msg.GetTxs()
|
||||
if len(protoTxs) == 0 {
|
||||
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
|
||||
return
|
||||
}
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, tx := range protoTxs {
|
||||
ntx := types.Tx(tx)
|
||||
err = memR.mempool.CheckTx(ntx, nil, txInfo)
|
||||
if errors.Is(err, mempool.ErrTxInCache) {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
msg, err := msgFromProto(e.Message)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
memR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
|
||||
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
|
||||
for _, tx := range msg.Txs {
|
||||
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
||||
if errors.Is(err, mempool.ErrTxInCache) {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// broadcasting happens from go routines per peer
|
||||
}
|
||||
@@ -267,6 +262,39 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
}
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
msg := protomem.Message{}
|
||||
err := msg.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return TxsMessage{}, err
|
||||
}
|
||||
|
||||
return msgFromProto(&msg)
|
||||
}
|
||||
|
||||
func msgFromProto(m proto.Message) (TxsMessage, error) {
|
||||
msg := m.(*protomem.Message)
|
||||
var message TxsMessage
|
||||
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
||||
txs := i.Txs.GetTxs()
|
||||
|
||||
if len(txs) == 0 {
|
||||
return message, errors.New("empty TxsMessage")
|
||||
}
|
||||
|
||||
decoded := make([]types.Tx, len(txs))
|
||||
for j, tx := range txs {
|
||||
decoded[j] = types.Tx(tx)
|
||||
}
|
||||
|
||||
message = TxsMessage{
|
||||
Txs: decoded,
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
return message, fmt.Errorf("msg type: %T is not supported", msg)
|
||||
}
|
||||
|
||||
// TxsMessage is a Message containing transactions.
|
||||
type TxsMessage struct {
|
||||
Txs []types.Tx
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/p2p/mock"
|
||||
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
||||
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -280,12 +279,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
|
||||
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
|
||||
peer := mock.NewPeer(nil)
|
||||
reactor.Receive(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Src: peer,
|
||||
Message: &protomem.Txs{
|
||||
Txs: [][]byte{{0x01, 0x02, 0x03}},
|
||||
}})
|
||||
reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
|
||||
reactor.AddPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -155,34 +157,26 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(e p2p.Envelope) {
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
switch msg := e.Message.(type) {
|
||||
case *protomem.Txs:
|
||||
protoTxs := msg.GetTxs()
|
||||
if len(protoTxs) == 0 {
|
||||
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
|
||||
return
|
||||
}
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, tx := range protoTxs {
|
||||
ntx := types.Tx(tx)
|
||||
err = memR.mempool.CheckTx(ntx, nil, txInfo)
|
||||
if errors.Is(err, mempool.ErrTxInCache) {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
msg, err := protoToMsg(e.Message)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
memR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
|
||||
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
for _, tx := range msg.Txs {
|
||||
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
||||
if err == mempool.ErrTxInCache {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
// broadcasting happens from go routines per peer
|
||||
}
|
||||
|
||||
@@ -273,6 +267,42 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
func decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
msg := protomem.Message{}
|
||||
err := msg.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return TxsMessage{}, err
|
||||
}
|
||||
|
||||
return protoToMsg(&msg)
|
||||
}
|
||||
|
||||
func protoToMsg(m proto.Message) (TxsMessage, error) {
|
||||
msg := m.(*protomem.Message)
|
||||
var message TxsMessage
|
||||
|
||||
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
||||
txs := i.Txs.GetTxs()
|
||||
|
||||
if len(txs) == 0 {
|
||||
return message, errors.New("empty TxsMessage")
|
||||
}
|
||||
|
||||
decoded := make([]types.Tx, len(txs))
|
||||
for j, tx := range txs {
|
||||
decoded[j] = types.Tx(tx)
|
||||
}
|
||||
|
||||
message = TxsMessage{
|
||||
Txs: decoded,
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
return message, fmt.Errorf("msg type: %T is not supported", msg)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// TxsMessage is a Message containing transactions.
|
||||
type TxsMessage struct {
|
||||
Txs []types.Tx
|
||||
|
||||
@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
|
||||
func (r *Reactor) AddPeer(peer p2p.Peer) {}
|
||||
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
|
||||
func (r *Reactor) Receive(e p2p.Envelope) {}
|
||||
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
|
||||
func (r *Reactor) AddPeer(peer p2p.Peer) {}
|
||||
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
|
||||
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
|
||||
|
||||
27
p2p/peer.go
27
p2p/peer.go
@@ -261,15 +261,7 @@ func (p *peer) Send(e Envelope) bool {
|
||||
} else if !p.hasChannel(e.ChannelID) {
|
||||
return false
|
||||
}
|
||||
msg := e.Message
|
||||
if w, ok := msg.(Wrapper); ok {
|
||||
var err error
|
||||
msg, err = w.Wrap()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
msgBytes, err := proto.Marshal(msg)
|
||||
msgBytes, err := proto.Marshal(e.Message)
|
||||
if err != nil {
|
||||
panic(err) // Q: should this panic or error?
|
||||
}
|
||||
@@ -292,15 +284,7 @@ func (p *peer) TrySend(e Envelope) bool {
|
||||
} else if !p.hasChannel(e.ChannelID) {
|
||||
return false
|
||||
}
|
||||
msg := e.Message
|
||||
if w, ok := msg.(Wrapper); ok {
|
||||
var err error
|
||||
msg, err = w.Wrap()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
msgBytes, err := proto.Marshal(msg)
|
||||
msgBytes, err := proto.Marshal(e.Message)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -436,13 +420,6 @@ func createMConnection(
|
||||
"peer_id", string(p.ID()),
|
||||
"chID", fmt.Sprintf("%#x", chID),
|
||||
}
|
||||
if w, ok := msg.(Unwrapper); ok {
|
||||
msg, err = w.Unwrap()
|
||||
if err != nil {
|
||||
// TODO(williambanfield) add error log line.
|
||||
return
|
||||
}
|
||||
}
|
||||
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
||||
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
|
||||
reactor.Receive(Envelope{
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/cmap"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
@@ -236,9 +238,15 @@ func (r *Reactor) logErrAddrBook(err error) {
|
||||
|
||||
// Receive implements Reactor by handling incoming PEX messages.
|
||||
func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
msg, err := msgFromProto(e.Message)
|
||||
if err != nil {
|
||||
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", msg)
|
||||
|
||||
switch msg := e.Message.(type) {
|
||||
switch msg := msg.(type) {
|
||||
case *tmp2p.PexRequest:
|
||||
|
||||
// NOTE: this is a prime candidate for amplification attacks,
|
||||
@@ -343,7 +351,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
|
||||
r.requestsSent.Set(id, struct{}{})
|
||||
p.Send(p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Message: &tmp2p.PexRequest{},
|
||||
Message: mustMsgToWrappedProto(&tmp2p.PexRequest{}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -404,7 +412,7 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
|
||||
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
|
||||
e := p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
|
||||
Message: mustMsgToWrappedProto(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}),
|
||||
}
|
||||
p.Send(e)
|
||||
}
|
||||
@@ -763,3 +771,51 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
|
||||
book.MarkAttempt(addr)
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
// mustEncode proto encodes a tmp2p.Message
|
||||
func mustEncode(pb proto.Message) []byte {
|
||||
msg := mustMsgToWrappedProto(pb)
|
||||
bz, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
|
||||
}
|
||||
return bz
|
||||
}
|
||||
|
||||
func mustMsgToWrappedProto(pb proto.Message) proto.Message {
|
||||
msg := tmp2p.Message{}
|
||||
switch pb := pb.(type) {
|
||||
case *tmp2p.PexRequest:
|
||||
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
|
||||
case *tmp2p.PexAddrs:
|
||||
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown message type %T", pb))
|
||||
}
|
||||
return &msg
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &tmp2p.Message{}
|
||||
|
||||
err := pb.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return msgFromProto(pb)
|
||||
}
|
||||
|
||||
func msgFromProto(m proto.Message) (proto.Message, error) {
|
||||
pb := m.(*tmp2p.Message)
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *tmp2p.Message_PexRequest:
|
||||
return msg.PexRequest, nil
|
||||
case *tmp2p.Message_PexAddrs:
|
||||
return msg.PexAddrs, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,10 +132,12 @@ func TestPEXReactorReceive(t *testing.T) {
|
||||
|
||||
size := book.Size()
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
r.Receive(PexChannel, peer, mustEncode(msg))
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
|
||||
assert.Equal(t, size+1, book.Size())
|
||||
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.Receive(PexChannel, peer, mustEncode(&tmp2p.PexRequest{})) // should not panic.
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
|
||||
}
|
||||
|
||||
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||
@@ -154,19 +156,23 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||
require.True(t, book.HasAddress(peerAddr))
|
||||
|
||||
id := string(peer.ID())
|
||||
msg := mustEncode(&tmp2p.PexRequest{})
|
||||
|
||||
// first time creates the entry
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
|
||||
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// next time sets the last time value
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
|
||||
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// third time is too many too soon - peer is removed
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(&tmp2p.PexRequest{})})
|
||||
assert.False(t, r.lastReceivedRequests.Has(id))
|
||||
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||
assert.True(t, book.IsBanned(peerAddr))
|
||||
@@ -193,12 +199,14 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
|
||||
// receive some addrs. should clear the request
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
r.Receive(PexChannel, peer, mustEncode(msg))
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
|
||||
assert.False(t, r.requestsSent.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// receiving more unsolicited addrs causes a disconnect and ban
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
r.Receive(PexChannel, peer, mustEncode(msg))
|
||||
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: mustMsgToWrappedProto(msg)})
|
||||
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||
assert.True(t, book.IsBanned(peer.SocketAddr()))
|
||||
}
|
||||
@@ -484,12 +492,8 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
|
||||
pexR.RequestAddrs(peer)
|
||||
|
||||
size := book.Size()
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
pexR.Receive(p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Src: peer,
|
||||
Message: msg,
|
||||
})
|
||||
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
|
||||
pexR.Receive(PexChannel, peer, msg)
|
||||
assert.Equal(t, size, book.Size())
|
||||
|
||||
pexR.AddPeer(peer)
|
||||
@@ -697,10 +701,7 @@ func TestPexVectors(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
|
||||
w, err := tc.msg.(p2p.Wrapper).Wrap()
|
||||
require.NoError(t, err)
|
||||
bz, err := proto.Marshal(w)
|
||||
require.NoError(t, err)
|
||||
bz := mustEncode(tc.msg)
|
||||
|
||||
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
|
||||
}
|
||||
|
||||
@@ -265,7 +265,39 @@ func (sw *Switch) OnStop() {
|
||||
// closed once msg bytes are sent to all peers (or time out).
|
||||
//
|
||||
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
|
||||
func (sw *Switch) Broadcast(e Envelope) chan bool {
|
||||
/*
|
||||
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
|
||||
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
|
||||
|
||||
peers := sw.peers.List()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(peers))
|
||||
successChan := make(chan bool, len(peers))
|
||||
|
||||
for _, peer := range peers {
|
||||
go func(p Peer) {
|
||||
defer wg.Done()
|
||||
success := p.Send(chID, msgBytes)
|
||||
successChan <- success
|
||||
}(peer)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(successChan)
|
||||
}()
|
||||
|
||||
return successChan
|
||||
}
|
||||
*/
|
||||
|
||||
// NewBroadcast runs a go routine for each attempted send, which will block trying
|
||||
// to send for defaultSendTimeoutSeconds. Returns a channel which receives
|
||||
// success values for each attempted send (false if times out). Channel will be
|
||||
// closed once msg bytes are sent to all peers (or time out).
|
||||
//
|
||||
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
|
||||
func (sw *Switch) NewBroadcast(e Envelope) chan bool {
|
||||
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
|
||||
|
||||
peers := sw.peers.List()
|
||||
|
||||
@@ -39,8 +39,9 @@ func init() {
|
||||
}
|
||||
|
||||
type PeerMessage struct {
|
||||
Contents proto.Message
|
||||
Counter int
|
||||
PeerID ID
|
||||
Bytes []byte
|
||||
Counter int
|
||||
}
|
||||
|
||||
type TestReactor struct {
|
||||
@@ -72,12 +73,12 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
|
||||
|
||||
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
|
||||
|
||||
func (tr *TestReactor) Receive(e Envelope) {
|
||||
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
|
||||
if tr.logMessages {
|
||||
tr.mtx.Lock()
|
||||
defer tr.mtx.Unlock()
|
||||
fmt.Printf("Received: %X, %X\n", e.ChannelID, e.Message)
|
||||
tr.msgsReceived[e.ChannelID] = append(tr.msgsReceived[e.ChannelID], PeerMessage{Contents: e.Message, Counter: tr.msgsCounter})
|
||||
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
|
||||
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
|
||||
tr.msgsCounter++
|
||||
}
|
||||
}
|
||||
@@ -105,12 +106,12 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
|
||||
|
||||
// Make two reactors of two channels each
|
||||
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
|
||||
{ID: byte(0x00), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
{ID: byte(0x01), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
{ID: byte(0x00), Priority: 10},
|
||||
{ID: byte(0x01), Priority: 10},
|
||||
}, true))
|
||||
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
|
||||
{ID: byte(0x02), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
{ID: byte(0x03), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
{ID: byte(0x02), Priority: 10},
|
||||
{ID: byte(0x03), Priority: 10},
|
||||
}, true))
|
||||
|
||||
return sw
|
||||
@@ -137,47 +138,66 @@ func TestSwitches(t *testing.T) {
|
||||
}
|
||||
|
||||
// Lets send some messages
|
||||
ch0Msg := &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "1",
|
||||
ch0Msg := &p2pproto.Message{
|
||||
Sum: &p2pproto.Message_PexAddrs{
|
||||
PexAddrs: &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
ch1Msg := &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "1",
|
||||
ch1Msg := &p2pproto.Message{
|
||||
Sum: &p2pproto.Message_PexAddrs{
|
||||
PexAddrs: &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
ch2Msg := &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "2",
|
||||
ch2Msg := &p2pproto.Message{
|
||||
Sum: &p2pproto.Message_PexAddrs{
|
||||
PexAddrs: &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
|
||||
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
|
||||
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
|
||||
s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
|
||||
s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
|
||||
s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
|
||||
|
||||
msgBytes, err := proto.Marshal(ch0Msg)
|
||||
require.NoError(t, err)
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch0Msg,
|
||||
msgBytes,
|
||||
byte(0x00),
|
||||
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
|
||||
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||
msgBytes, err = proto.Marshal(ch1Msg)
|
||||
require.NoError(t, err)
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch1Msg,
|
||||
msgBytes,
|
||||
byte(0x01),
|
||||
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
|
||||
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||
msgBytes, err = proto.Marshal(ch2Msg)
|
||||
require.NoError(t, err)
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch2Msg,
|
||||
msgBytes,
|
||||
byte(0x02),
|
||||
s2.Reactor("bar").(*TestReactor), 200*time.Millisecond, 5*time.Second)
|
||||
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||
}
|
||||
|
||||
func assertMsgReceivedWithTimeout(
|
||||
t *testing.T,
|
||||
msg proto.Message,
|
||||
msgBytes []byte,
|
||||
channel byte,
|
||||
reactor *TestReactor,
|
||||
checkPeriod,
|
||||
@@ -188,13 +208,9 @@ func assertMsgReceivedWithTimeout(
|
||||
select {
|
||||
case <-ticker.C:
|
||||
msgs := reactor.getMsgs(channel)
|
||||
expectedBytes, err := proto.Marshal(msgs[0].Contents)
|
||||
require.NoError(t, err)
|
||||
gotBytes, err := proto.Marshal(msg)
|
||||
require.NoError(t, err)
|
||||
if len(msgs) > 0 {
|
||||
if !bytes.Equal(expectedBytes, gotBytes) {
|
||||
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msg, msgs[0].Counter)
|
||||
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
|
||||
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -849,7 +865,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
|
||||
// Send random message from foo channel to another
|
||||
for i := 0; i < b.N; i++ {
|
||||
chID := byte(i % 4)
|
||||
successChan := s1.Broadcast(Envelope{ChannelID: chID})
|
||||
successChan := s1.NewBroadcast(Envelope{ChannelID: chID})
|
||||
for s := range successChan {
|
||||
if s {
|
||||
numSuccess++
|
||||
|
||||
24
p2p/types.go
24
p2p/types.go
@@ -3,7 +3,6 @@ package p2p
|
||||
import (
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
)
|
||||
|
||||
type ChannelDescriptor = conn.ChannelDescriptor
|
||||
@@ -15,26 +14,3 @@ type Envelope struct {
|
||||
Message proto.Message // message payload
|
||||
ChannelID byte
|
||||
}
|
||||
|
||||
// Wrapper is a Protobuf message that can contain a variety of inner messages
|
||||
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
|
||||
// Router will automatically wrap outbound messages and unwrap inbound messages,
|
||||
// such that reactors do not have to do this themselves.
|
||||
type Unwrapper interface {
|
||||
proto.Message
|
||||
|
||||
// Unwrap will unwrap the inner message contained in this message.
|
||||
Unwrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
type Wrapper interface {
|
||||
proto.Message
|
||||
|
||||
// Wrap will take the underlying message and wrap it in its wrapper type.
|
||||
Wrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
var (
|
||||
_ Wrapper = &tmp2p.PexRequest{}
|
||||
_ Wrapper = &tmp2p.PexAddrs{}
|
||||
)
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
package blocksync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &StatusRequest{}
|
||||
var _ p2p.Wrapper = &StatusResponse{}
|
||||
var _ p2p.Wrapper = &NoBlockResponse{}
|
||||
var _ p2p.Wrapper = &BlockResponse{}
|
||||
var _ p2p.Wrapper = &BlockRequest{}
|
||||
|
||||
const (
|
||||
BlockResponseMessagePrefixSize = 4
|
||||
BlockResponseMessageFieldKeySize = 1
|
||||
)
|
||||
|
||||
func (m *BlockRequest) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_BlockRequest{BlockRequest: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *BlockResponse) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_BlockResponse{BlockResponse: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *NoBlockResponse) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_NoBlockResponse{NoBlockResponse: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *StatusRequest) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_StatusRequest{StatusRequest: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *StatusResponse) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_StatusResponse{StatusResponse: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_BlockRequest:
|
||||
return m.GetBlockRequest(), nil
|
||||
|
||||
case *Message_BlockResponse:
|
||||
return m.GetBlockResponse(), nil
|
||||
|
||||
case *Message_NoBlockResponse:
|
||||
return m.GetNoBlockResponse(), nil
|
||||
|
||||
case *Message_StatusRequest:
|
||||
return m.GetStatusRequest(), nil
|
||||
|
||||
case *Message_StatusResponse:
|
||||
return m.GetStatusResponse(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
@@ -1,109 +0,0 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &VoteSetBits{}
|
||||
var _ p2p.Wrapper = &VoteSetMaj23{}
|
||||
var _ p2p.Wrapper = &Vote{}
|
||||
var _ p2p.Wrapper = &ProposalPOL{}
|
||||
var _ p2p.Wrapper = &Proposal{}
|
||||
var _ p2p.Wrapper = &NewValidBlock{}
|
||||
var _ p2p.Wrapper = &NewRoundStep{}
|
||||
var _ p2p.Wrapper = &HasVote{}
|
||||
var _ p2p.Wrapper = &BlockPart{}
|
||||
|
||||
func (m *VoteSetBits) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_VoteSetBits{VoteSetBits: m}
|
||||
return cm, nil
|
||||
|
||||
}
|
||||
|
||||
func (m *VoteSetMaj23) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_VoteSetMaj23{VoteSetMaj23: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *HasVote) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_HasVote{HasVote: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *Vote) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_Vote{Vote: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *BlockPart) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_BlockPart{BlockPart: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *ProposalPOL) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_ProposalPol{ProposalPol: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *Proposal) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_Proposal{Proposal: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *NewValidBlock) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_NewValidBlock{NewValidBlock: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *NewRoundStep) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_NewRoundStep{NewRoundStep: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
|
||||
// proto message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_NewRoundStep:
|
||||
return m.GetNewRoundStep(), nil
|
||||
|
||||
case *Message_NewValidBlock:
|
||||
return m.GetNewValidBlock(), nil
|
||||
|
||||
case *Message_Proposal:
|
||||
return m.GetProposal(), nil
|
||||
|
||||
case *Message_ProposalPol:
|
||||
return m.GetProposalPol(), nil
|
||||
|
||||
case *Message_BlockPart:
|
||||
return m.GetBlockPart(), nil
|
||||
|
||||
case *Message_Vote:
|
||||
return m.GetVote(), nil
|
||||
|
||||
case *Message_HasVote:
|
||||
return m.GetHasVote(), nil
|
||||
|
||||
case *Message_VoteSetMaj23:
|
||||
return m.GetVoteSetMaj23(), nil
|
||||
|
||||
case *Message_VoteSetBits:
|
||||
return m.GetVoteSetBits(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &Txs{}
|
||||
var _ p2p.Unwrapper = &Message{}
|
||||
|
||||
// Wrap implements the p2p Wrapper interface and wraps a mempool message.
|
||||
func (m *Txs) Wrap() (proto.Message, error) {
|
||||
return &Message{
|
||||
Sum: &Message_Txs{Txs: m},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_Txs:
|
||||
return m.GetTxs(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
)
|
||||
|
||||
func (m *PexAddrs) Wrap() (proto.Message, error) {
|
||||
pm := &Message{}
|
||||
pm.Sum = &Message_PexAddrs{PexAddrs: m}
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
func (m *PexRequest) Wrap() (proto.Message, error) {
|
||||
pm := &Message{}
|
||||
pm.Sum = &Message_PexRequest{PexRequest: m}
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_PexRequest:
|
||||
return msg.PexRequest, nil
|
||||
case *Message_PexAddrs:
|
||||
return msg.PexAddrs, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown pex message: %T", msg)
|
||||
}
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &ChunkRequest{}
|
||||
var _ p2p.Wrapper = &ChunkResponse{}
|
||||
var _ p2p.Wrapper = &SnapshotsRequest{}
|
||||
var _ p2p.Wrapper = &SnapshotsResponse{}
|
||||
|
||||
func (m *SnapshotsResponse) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_SnapshotsResponse{SnapshotsResponse: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (m *SnapshotsRequest) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_SnapshotsRequest{SnapshotsRequest: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (m *ChunkResponse) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_ChunkResponse{ChunkResponse: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (m *ChunkRequest) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_ChunkRequest{ChunkRequest: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
|
||||
// proto message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_ChunkRequest:
|
||||
return m.GetChunkRequest(), nil
|
||||
|
||||
case *Message_ChunkResponse:
|
||||
return m.GetChunkResponse(), nil
|
||||
|
||||
case *Message_SnapshotsRequest:
|
||||
return m.GetSnapshotsRequest(), nil
|
||||
|
||||
case *Message_SnapshotsResponse:
|
||||
return m.GetSnapshotsResponse(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,58 @@ const (
|
||||
chunkMsgSize = int(16e6)
|
||||
)
|
||||
|
||||
// mustEncodeMsg encodes a Protobuf message, panicing on error.
|
||||
func mustEncodeMsg(pb proto.Message) []byte {
|
||||
msg := mustWrapToProto(pb)
|
||||
bz, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
|
||||
}
|
||||
return bz
|
||||
}
|
||||
|
||||
func mustWrapToProto(pb proto.Message) proto.Message {
|
||||
msg := ssproto.Message{}
|
||||
switch pb := pb.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
|
||||
case *ssproto.ChunkResponse:
|
||||
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
|
||||
case *ssproto.SnapshotsRequest:
|
||||
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
|
||||
case *ssproto.SnapshotsResponse:
|
||||
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
|
||||
default:
|
||||
panic(fmt.Errorf("unknown message type %T", pb))
|
||||
}
|
||||
return &msg
|
||||
}
|
||||
|
||||
// decodeMsg decodes a Protobuf message.
|
||||
func decodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &ssproto.Message{}
|
||||
err := proto.Unmarshal(bz, pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return msgFromProto(pb)
|
||||
}
|
||||
|
||||
func msgFromProto(pb *ssproto.Message) (proto.Message, error) {
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *ssproto.Message_ChunkRequest:
|
||||
return msg.ChunkRequest, nil
|
||||
case *ssproto.Message_ChunkResponse:
|
||||
return msg.ChunkResponse, nil
|
||||
case *ssproto.Message_SnapshotsRequest:
|
||||
return msg.SnapshotsRequest, nil
|
||||
case *ssproto.Message_SnapshotsResponse:
|
||||
return msg.SnapshotsResponse, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
// validateMsg validates a message.
|
||||
func validateMsg(pb proto.Message) error {
|
||||
if pb == nil {
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
)
|
||||
@@ -100,10 +99,8 @@ func TestStateSyncVectors(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
w, err := tc.msg.(p2p.Wrapper).Wrap()
|
||||
require.NoError(t, err)
|
||||
bz, err := proto.Marshal(w)
|
||||
require.NoError(t, err)
|
||||
|
||||
bz := mustEncodeMsg(tc.msg)
|
||||
|
||||
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
|
||||
}
|
||||
|
||||
@@ -107,16 +107,22 @@ func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
return
|
||||
}
|
||||
|
||||
err := validateMsg(e.Message)
|
||||
msg, err := msgFromProto(e.Message.(*ssproto.Message))
|
||||
if err != nil {
|
||||
r.Logger.Error("Invalid message", "peer", e.Src, "msg", e.Message, "err", err)
|
||||
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
err = validateMsg(msg)
|
||||
if err != nil {
|
||||
r.Logger.Error("Invalid message", "peer", e.Src, "msg", msg, "err", err)
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
switch e.ChannelID {
|
||||
case SnapshotChannel:
|
||||
switch msg := e.Message.(type) {
|
||||
switch msg := msg.(type) {
|
||||
case *ssproto.SnapshotsRequest:
|
||||
snapshots, err := r.recentSnapshots(recentSnapshots)
|
||||
if err != nil {
|
||||
@@ -128,13 +134,13 @@ func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
"format", snapshot.Format, "peer", e.Src.ID())
|
||||
e.Src.Send(p2p.Envelope{
|
||||
ChannelID: e.ChannelID,
|
||||
Message: &ssproto.SnapshotsResponse{
|
||||
Message: mustWrapToProto(&ssproto.SnapshotsResponse{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Chunks: snapshot.Chunks,
|
||||
Hash: snapshot.Hash,
|
||||
Metadata: snapshot.Metadata,
|
||||
},
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -165,7 +171,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
case ChunkChannel:
|
||||
switch msg := e.Message.(type) {
|
||||
switch msg := msg.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
|
||||
"chunk", msg.Index, "peer", e.Src.ID())
|
||||
@@ -183,13 +189,13 @@ func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
"chunk", msg.Index, "peer", e.Src.ID())
|
||||
e.Src.Send(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Message: &ssproto.ChunkResponse{
|
||||
Message: mustWrapToProto(&ssproto.ChunkResponse{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
Index: msg.Index,
|
||||
Chunk: resp.Chunk,
|
||||
Missing: resp.Chunk == nil,
|
||||
},
|
||||
}),
|
||||
})
|
||||
|
||||
case *ssproto.ChunkResponse:
|
||||
@@ -274,7 +280,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
|
||||
|
||||
r.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = proto.Unmarshal(bz, e.Message)
|
||||
require.NoError(t, err)
|
||||
response = e.Message.(*ssproto.ChunkResponse)
|
||||
response = e.Message.(*ssproto.Message).GetChunkResponse()
|
||||
}).Return(true)
|
||||
}
|
||||
|
||||
@@ -80,10 +80,10 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
r.Receive(p2p.Envelope{
|
||||
r.NewReceive(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Src: peer,
|
||||
Message: tc.request,
|
||||
Message: mustWrapToProto(tc.request),
|
||||
})
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
assert.Equal(t, tc.expectResponse, response)
|
||||
@@ -155,7 +155,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = proto.Unmarshal(bz, e.Message)
|
||||
require.NoError(t, err)
|
||||
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
|
||||
responses = append(responses, e.Message.(*ssproto.Message).GetSnapshotsResponse())
|
||||
}).Return(true)
|
||||
}
|
||||
|
||||
@@ -170,11 +170,12 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
r.Receive(p2p.Envelope{
|
||||
r.NewReceive(p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Src: peer,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
|
||||
})
|
||||
r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
assert.Equal(t, tc.expectResponses, responses)
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) {
|
||||
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
|
||||
e := p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
Message: mustWrapToProto(&ssproto.SnapshotsRequest{}),
|
||||
}
|
||||
peer.Send(e)
|
||||
}
|
||||
@@ -473,11 +473,11 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
|
||||
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Message: &ssproto.ChunkRequest{
|
||||
Message: mustWrapToProto(&ssproto.ChunkRequest{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Index: chunk,
|
||||
},
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -100,10 +100,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
peerA.On("ID").Return(p2p.ID("a"))
|
||||
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
req, ok := e.Message.(*ssproto.SnapshotsRequest)
|
||||
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
|
||||
return ok && e.ChannelID == SnapshotChannel && req != nil
|
||||
})).Return(true)
|
||||
syncer.AddPeer(peerA)
|
||||
@@ -113,10 +110,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
peerB.On("ID").Return(p2p.ID("b"))
|
||||
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
req, ok := e.Message.(*ssproto.SnapshotsRequest)
|
||||
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
|
||||
return ok && e.ChannelID == SnapshotChannel && req != nil
|
||||
})).Return(true)
|
||||
syncer.AddPeer(peerB)
|
||||
@@ -163,7 +157,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
onChunkRequest := func(args mock.Arguments) {
|
||||
e, ok := args[0].(p2p.Envelope)
|
||||
require.True(t, ok)
|
||||
msg := e.Message.(*ssproto.ChunkRequest)
|
||||
msg := e.Message.(*ssproto.Message).GetChunkRequest()
|
||||
require.EqualValues(t, 1, msg.Height)
|
||||
require.EqualValues(t, 1, msg.Format)
|
||||
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
|
||||
|
||||
Reference in New Issue
Block a user