p2p: ressurrect the p2p envelope and use to calculate message metric

This commit is contained in:
William Banfield
2022-10-18 17:09:46 -04:00
parent b42c439776
commit 4d78096843
32 changed files with 725 additions and 339 deletions

View File

@@ -165,10 +165,16 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{prevote1}),
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{prevote2}),
ChannelID: VoteChannel,
})
}
}
} else {
@@ -521,7 +527,10 @@ func sendProposalAndParts(
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
})
// parts
for i := 0; i < int(parts.Total()); i++ {
@@ -531,7 +540,10 @@ func sendProposalAndParts(
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
})
}
// votes
@@ -539,9 +551,14 @@ func sendProposalAndParts(
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(&VoteMessage{prevote}),
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(&VoteMessage{precommit}),
})
}
//----------------------------------------
@@ -579,7 +596,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

View File

@@ -94,7 +94,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{precommit}),
ChannelID: VoteChannel,
})
}
}()
}

View File

@@ -15,7 +15,7 @@ import (
"github.com/tendermint/tendermint/types"
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@@ -143,6 +143,14 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
return &pb, nil
}
func MustMsgToProto(msg Message) *tmcons.Message {
m, err := MsgToProto(msg)
if err != nil {
panic(err)
}
return m
}
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {

View File

@@ -148,6 +148,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Priority: 6,
SendQueueCapacity: 100,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
@@ -156,6 +157,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: VoteChannel,
@@ -163,6 +165,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: VoteSetBitsChannel,
@@ -170,6 +173,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
}
}
@@ -223,34 +227,34 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
func (conR *Reactor) Receive(e p2p.Envelope) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
return
}
msg, err := decodeMsg(msgBytes)
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
// Get peer states
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
panic(fmt.Sprintf("Peer %v has no state", e.Src))
}
switch chID {
switch e.ChannelID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
@@ -258,8 +262,8 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
@@ -278,7 +282,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(src, err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
@@ -292,13 +296,16 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}))
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: MustMsgToProto(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}),
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -311,13 +318,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -337,7 +344,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
// don't punish (leave room for soft upgrades)
@@ -376,7 +383,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID))
}
}
@@ -430,7 +437,10 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(nrsMsg),
})
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
@@ -441,7 +451,11 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
BlockParts: rs.ProposalBlockParts.BitArray(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
MsgToProto(csMsg)
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(csMsg),
})
}
// Broadcasts HasVoteMessage to peers that care.
@@ -452,7 +466,10 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
Type: vote.Type,
Index: vote.ValidatorIndex,
}
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(msg),
})
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
@@ -463,7 +480,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
prs := ps.GetRoundState()
if prs.Height == vote.Height {
// TODO: Also filter on round?
peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
e := p2p.Envelope{
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
Message: p,
}
peer.TrySend(e)
} else {
// Height doesn't match
// TODO: check a field, maybe CatchupCommitRound?
@@ -487,7 +508,10 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, MustEncode(nrsMsg))
peer.Send(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(nrsMsg),
})
}
func (conR *Reactor) updateRoundStateRoutine() {
@@ -532,7 +556,10 @@ OUTER_LOOP:
Part: part,
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
@@ -580,7 +607,10 @@ OUTER_LOOP:
{
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
}) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
}
@@ -596,7 +626,10 @@ OUTER_LOOP:
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
})
}
continue OUTER_LOOP
}
@@ -639,7 +672,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
logger.Debug("Sending block part for catchup failed")
@@ -798,12 +834,16 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -815,12 +855,16 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -832,12 +876,16 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -852,12 +900,15 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID,
}),
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -1073,7 +1124,10 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(msg),
}) {
ps.SetHasVote(vote)
return true
}

View File

@@ -297,6 +297,12 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.NewReceive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: MustMsgToProto(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType}),
})
})
}