diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 92ac8889e..ae7d2d03d 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -26,6 +26,7 @@ import ( mempoolv0 "github.com/tendermint/tendermint/mempool/v0" mempoolv1 "github.com/tendermint/tendermint/mempool/v1" "github.com/tendermint/tendermint/p2p" + tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" @@ -166,13 +167,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { if i < len(peerList)/2 { bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer) peer.Send(p2p.Envelope{ - Message: MustMsgToProto(&VoteMessage{prevote1}), + Message: &tmcons.Vote{prevote1.ToProto()}, ChannelID: VoteChannel, }) } else { bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer) peer.Send(p2p.Envelope{ - Message: MustMsgToProto(&VoteMessage{prevote2}), + Message: &tmcons.Vote{prevote2.ToProto()}, ChannelID: VoteChannel, }) } @@ -526,23 +527,25 @@ func sendProposalAndParts( parts *types.PartSet, ) { // proposal - msg := &ProposalMessage{Proposal: proposal} peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.Proposal{Proposal: *proposal.ToProto()}, }) // parts for i := 0; i < int(parts.Total()); i++ { part := parts.GetPart(i) - msg := &BlockPartMessage{ - Height: height, // This tells peer that this part applies to us. - Round: round, // This tells peer that this part applies to us. - Part: part, + pp, err := part.ToProto() + if err != nil { + panic(err) // TODO: wbanfield better error handling } peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.BlockPart{ + Height: height, // This tells peer that this part applies to us. + Round: round, // This tells peer that this part applies to us. + Part: *pp, + }, }) } @@ -553,11 +556,11 @@ func sendProposalAndParts( cs.mtx.Unlock() peer.Send(p2p.Envelope{ ChannelID: VoteChannel, - Message: MustMsgToProto(&VoteMessage{prevote}), + Message: &tmcons.Vote{prevote.ToProto()}, }) peer.Send(p2p.Envelope{ ChannelID: VoteChannel, - Message: MustMsgToProto(&VoteMessage{precommit}), + Message: &tmcons.Vote{precommit.ToProto()}, }) } diff --git a/consensus/invalid_test.go b/consensus/invalid_test.go index 8fef4e8b2..9f2b5ceba 100644 --- a/consensus/invalid_test.go +++ b/consensus/invalid_test.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/p2p" + tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" ) @@ -95,7 +96,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw for _, peer := range peers { cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer) peer.Send(p2p.Envelope{ - Message: MustMsgToProto(&VoteMessage{precommit}), + Message: &tmcons.Vote{precommit.ToProto()}, ChannelID: VoteChannel, }) } diff --git a/consensus/reactor.go b/consensus/reactor.go index d2b1dad6b..34823cee0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -296,15 +296,18 @@ func (conR *Reactor) Receive(e p2p.Envelope) { default: panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") } + eMsg := &tmcons.VoteSetBits{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID.ToProto(), + } + if votes := ourVotes.ToProto(); votes != nil { + eMsg.Votes = *votes + } e.Src.TrySend(p2p.Envelope{ ChannelID: VoteSetBitsChannel, - Message: MustMsgToProto(&VoteSetBitsMessage{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: msg.BlockID, - Votes: ourVotes, - }), + Message: eMsg, }) default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -439,28 +442,28 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) conR.Switch.NewBroadcast(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(nrsMsg), + Message: nrsMsg, }) } func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { - csMsg := &NewValidBlockMessage{ + psh := rs.ProposalBlockParts.Header() + csMsg := &tmcons.NewValidBlock{ Height: rs.Height, Round: rs.Round, - BlockPartSetHeader: rs.ProposalBlockParts.Header(), - BlockParts: rs.ProposalBlockParts.BitArray(), + BlockPartSetHeader: psh.ToProto(), + BlockParts: rs.ProposalBlockParts.BitArray().ToProto(), IsCommit: rs.Step == cstypes.RoundStepCommit, } - MsgToProto(csMsg) conR.Switch.NewBroadcast(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(csMsg), + Message: csMsg, }) } // Broadcasts HasVoteMessage to peers that care. func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { - msg := &HasVoteMessage{ + msg := &tmcons.HasVote{ Height: vote.Height, Round: vote.Round, Type: vote.Type, @@ -468,7 +471,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { } conR.Switch.NewBroadcast(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(msg), + Message: msg, }) /* // TODO: Make this broadcast more selective. @@ -494,11 +497,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { */ } -func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) { - nrsMsg = &NewRoundStepMessage{ +func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) { + nrsMsg = &tmcons.NewRoundStep{ Height: rs.Height, Round: rs.Round, - Step: rs.Step, + Step: uint32(rs.Step), SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()), LastCommitRound: rs.LastCommit.GetRound(), } @@ -510,7 +513,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { nrsMsg := makeRoundStepMessage(rs) peer.Send(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(nrsMsg), + Message: nrsMsg, }) } @@ -550,15 +553,18 @@ OUTER_LOOP: if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) { if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) - msg := &BlockPartMessage{ - Height: rs.Height, // This tells peer that this part applies to us. - Round: rs.Round, // This tells peer that this part applies to us. - Part: part, + parts, err := part.ToProto() + if err != nil { + panic(err) } logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) if peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.BlockPart{ + Height: rs.Height, // This tells peer that this part applies to us. + Round: rs.Round, // This tells peer that this part applies to us. + Part: *parts, + }, }) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } @@ -605,11 +611,10 @@ OUTER_LOOP: if rs.Proposal != nil && !prs.Proposal { // Proposal: share the proposal metadata with peer. { - msg := &ProposalMessage{Proposal: rs.Proposal} logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) if peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()}, }) { // NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected! ps.SetHasProposal(rs.Proposal) @@ -620,15 +625,14 @@ OUTER_LOOP: // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round, // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound). if 0 <= rs.Proposal.POLRound { - msg := &ProposalPOLMessage{ - Height: rs.Height, - ProposalPOLRound: rs.Proposal.POLRound, - ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), - } logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.ProposalPOL{ + Height: rs.Height, + ProposalPolRound: rs.Proposal.POLRound, + ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(), + }, }) } continue OUTER_LOOP @@ -666,15 +670,19 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt return } // Send the part - msg := &BlockPartMessage{ - Height: prs.Height, // Not our height, so it doesn't matter. - Round: prs.Round, // Not our height, so it doesn't matter. - Part: part, - } logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index) + pp, err := part.ToProto() + if err != nil { + logger.Error("Could not convert part to proto", "index", index, "error", err) + return + } if peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.BlockPart{ + Height: prs.Height, // Not our height, so it doesn't matter. + Round: prs.Round, // Not our height, so it doesn't matter. + Part: *pp, + }, }) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } else { @@ -837,12 +845,12 @@ OUTER_LOOP: peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(&VoteSetMaj23Message{ + Message: &tmcons.VoteSetMaj23{ Height: prs.Height, Round: prs.Round, Type: tmproto.PrevoteType, - BlockID: maj23, - }), + BlockID: maj23.ToProto(), + }, }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } @@ -855,15 +863,14 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(&VoteSetMaj23Message{ + Message: &tmcons.VoteSetMaj23{ Height: prs.Height, Round: prs.Round, Type: tmproto.PrecommitType, - BlockID: maj23, - }), + BlockID: maj23.ToProto(), + }, }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } @@ -879,12 +886,12 @@ OUTER_LOOP: peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(&VoteSetMaj23Message{ + Message: &tmcons.VoteSetMaj23{ Height: prs.Height, Round: prs.ProposalPOLRound, Type: tmproto.PrevoteType, - BlockID: maj23, - }), + BlockID: maj23.ToProto(), + }, }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } @@ -902,12 +909,12 @@ OUTER_LOOP: if commit := conR.conS.LoadCommit(prs.Height); commit != nil { peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: MustMsgToProto(&VoteSetMaj23Message{ + Message: &tmcons.VoteSetMaj23{ Height: prs.Height, Round: commit.Round, Type: tmproto.PrecommitType, - BlockID: commit.BlockID, - }), + BlockID: commit.BlockID.ToProto(), + }, }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } @@ -1122,11 +1129,12 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in // Returns true if vote was sent. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { - msg := &VoteMessage{vote} ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) if ps.peer.Send(p2p.Envelope{ ChannelID: VoteChannel, - Message: MustMsgToProto(msg), + Message: &tmcons.Vote{ + Vote: vote.ToProto(), + }, }) { ps.SetHasVote(vote) return true diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 3d52e95fd..f0cdc5f9d 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -33,6 +33,7 @@ import ( mempoolv1 "github.com/tendermint/tendermint/mempool/v1" "github.com/tendermint/tendermint/p2p" p2pmock "github.com/tendermint/tendermint/p2p/mock" + tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" sm "github.com/tendermint/tendermint/state" statemocks "github.com/tendermint/tendermint/state/mocks" @@ -265,15 +266,18 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) { var ( reactor = reactors[0] peer = p2pmock.NewPeer(nil) - msg = MustEncode(&HasVoteMessage{Height: 1, - Round: 1, Index: 1, Type: tmproto.PrevoteType}) ) reactor.InitPeer(peer) // simulate switch calling Receive before AddPeer assert.NotPanics(t, func() { - reactor.Receive(StateChannel, peer, msg) + reactor.Receive(p2p.Envelope{ + ChannelID: StateChannel, + Src: peer, + Message: &tmcons.HasVote{Height: 1, + Round: 1, Index: 1, Type: tmproto.PrevoteType}, + }) reactor.AddPeer(peer) }) } @@ -288,20 +292,17 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) { var ( reactor = reactors[0] peer = p2pmock.NewPeer(nil) - msg = MustEncode(&HasVoteMessage{Height: 1, - Round: 1, Index: 1, Type: tmproto.PrevoteType}) ) // we should call InitPeer here // simulate switch calling Receive before AddPeer assert.Panics(t, func() { - reactor.Receive(StateChannel, peer, msg) - reactor.NewReceive(p2p.Envelope{ + reactor.Receive(p2p.Envelope{ ChannelID: StateChannel, Src: peer, - Message: MustMsgToProto(&HasVoteMessage{Height: 1, - Round: 1, Index: 1, Type: tmproto.PrevoteType}), + Message: &tmcons.HasVote{Height: 1, + Round: 1, Index: 1, Type: tmproto.PrevoteType}, }) }) } diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 0389a7d19..5e61c3e0b 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -19,7 +19,7 @@ func NewReactor() *Reactor { return r } -func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } -func (r *Reactor) AddPeer(peer p2p.Peer) {} -func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} -func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} +func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } +func (r *Reactor) AddPeer(peer p2p.Peer) {} +func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} +func (r *Reactor) Receive(e p2p.Envelope) {}