diff --git a/consensus/invalid_test.go b/consensus/invalid_test.go index 64c0abcfd..8fef4e8b2 100644 --- a/consensus/invalid_test.go +++ b/consensus/invalid_test.go @@ -94,12 +94,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw peers := sw.Peers().List() for _, peer := range peers { cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer) - p := MustConvertMsgToProto(&VoteMessage{precommit}) - e := p2p.Envelope{ - Message: p, + peer.Send(p2p.Envelope{ + Message: MustMsgToProto(&VoteMessage{precommit}), ChannelID: VoteChannel, - } - peer.Send(e) + }) } }() } diff --git a/consensus/msgs.go b/consensus/msgs.go index 2cde90411..6aaacba83 100644 --- a/consensus/msgs.go +++ b/consensus/msgs.go @@ -15,10 +15,10 @@ import ( "github.com/tendermint/tendermint/types" ) -// MustConvertMsgToProto takes a consensus message type and returns the proto defined consensus message -func MustConvertMsgToProto(msg Message) *tmcons.Message { +// MsgToProto takes a consensus message type and returns the proto defined consensus message. +func MsgToProto(msg Message) (*tmcons.Message, error) { if msg == nil { - panic(errors.New("consensus: message is nil")) + return nil, errors.New("consensus: message is nil") } var pb tmcons.Message @@ -72,7 +72,7 @@ func MustConvertMsgToProto(msg Message) *tmcons.Message { case *BlockPartMessage: parts, err := msg.Part.ToProto() if err != nil { - panic(fmt.Errorf("msg to proto error: %w", err)) + return nil, fmt.Errorf("msg to proto error: %w", err) } pb = tmcons.Message{ Sum: &tmcons.Message_BlockPart{ @@ -137,10 +137,18 @@ func MustConvertMsgToProto(msg Message) *tmcons.Message { } default: - panic(fmt.Errorf("consensus: message not recognized: %T", msg)) + return nil, fmt.Errorf("consensus: message not recognized: %T", msg) } - return &pb + return &pb, nil +} + +func MustMsgToProto(msg Message) *tmcons.Message { + m, err := MsgToProto(msg) + if err != nil { + panic(err) + } + return m } // MsgFromProto takes a consensus proto message and returns the native go type @@ -263,7 +271,10 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) { // MustEncode takes the reactors msg, makes it proto and marshals it // this mimics `MustMarshalBinaryBare` in that is panics on error func MustEncode(msg Message) []byte { - pb := MustConvertMsgToProto(msg) + pb, err := MsgToProto(msg) + if err != nil { + panic(err) + } enc, err := proto.Marshal(pb) if err != nil { panic(err) @@ -287,7 +298,10 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) { }, } case msgInfo: - consMsg := MustConvertMsgToProto(msg.Msg) + consMsg, err := MsgToProto(msg.Msg) + if err != nil { + return nil, err + } pb = tmcons.WALMessage{ Sum: &tmcons.WALMessage_MsgInfo{ MsgInfo: &tmcons.MsgInfo{ diff --git a/consensus/msgs_test.go b/consensus/msgs_test.go index 532264abb..7690c3364 100644 --- a/consensus/msgs_test.go +++ b/consensus/msgs_test.go @@ -189,7 +189,11 @@ func TestMsgToProto(t *testing.T) { for _, tt := range testsCases { tt := tt t.Run(tt.testName, func(t *testing.T) { - pb := MustConvertMsgToProto(tt.msg) + pb, err := MsgToProto(tt.msg) + if tt.wantErr == true { + assert.Equal(t, err != nil, tt.wantErr) + return + } assert.EqualValues(t, tt.want, pb, tt.testName) msg, err := MsgFromProto(pb) diff --git a/consensus/reactor.go b/consensus/reactor.go index baf475fcd..3030676cc 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -297,16 +297,15 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { default: panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") } - p := MustConvertMsgToProto(&VoteSetBitsMessage{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: msg.BlockID, - Votes: ourVotes, - }) src.TrySend(p2p.Envelope{ ChannelID: VoteSetBitsChannel, - Message: p, + Message: MustMsgToProto(&VoteSetBitsMessage{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID, + Votes: ourVotes, + }), }) default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -458,16 +457,15 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { default: panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") } - p := MustConvertMsgToProto(&VoteSetBitsMessage{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: msg.BlockID, - Votes: ourVotes, - }) e.Src.TrySend(p2p.Envelope{ ChannelID: VoteSetBitsChannel, - Message: p, + Message: MustMsgToProto(&VoteSetBitsMessage{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID, + Votes: ourVotes, + }), }) default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -600,12 +598,10 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() { func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) - p := MustConvertMsgToProto(nrsMsg) - e := p2p.Envelope{ + conR.Switch.NewBroadcast(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - conR.Switch.NewBroadcast(e) + Message: MustMsgToProto(nrsMsg), + }) } func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { @@ -616,12 +612,11 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { BlockParts: rs.ProposalBlockParts.BitArray(), IsCommit: rs.Step == cstypes.RoundStepCommit, } - p := MustConvertMsgToProto(csMsg) - e := p2p.Envelope{ + MsgToProto(csMsg) + conR.Switch.NewBroadcast(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - conR.Switch.NewBroadcast(e) + Message: MustMsgToProto(csMsg), + }) } // Broadcasts HasVoteMessage to peers that care. @@ -632,12 +627,10 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { Type: vote.Type, Index: vote.ValidatorIndex, } - p := MustConvertMsgToProto(msg) - e := p2p.Envelope{ + conR.Switch.NewBroadcast(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - conR.Switch.NewBroadcast(e) + Message: MustMsgToProto(msg), + }) /* // TODO: Make this broadcast more selective. for _, peer := range conR.Switch.Peers().List() { @@ -676,12 +669,10 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { rs := conR.getRoundState() nrsMsg := makeRoundStepMessage(rs) - p := MustConvertMsgToProto(nrsMsg) - e := p2p.Envelope{ + peer.Send(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - peer.Send(e) + Message: MustMsgToProto(nrsMsg), + }) } func (conR *Reactor) updateRoundStateRoutine() { @@ -726,12 +717,10 @@ OUTER_LOOP: Part: part, } logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) - p := MustConvertMsgToProto(msg) - e := p2p.Envelope{ + if peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: p, - } - if peer.Send(e) { + Message: MustMsgToProto(msg), + }) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } continue OUTER_LOOP @@ -779,12 +768,10 @@ OUTER_LOOP: { msg := &ProposalMessage{Proposal: rs.Proposal} logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) - p := MustConvertMsgToProto(msg) - e := p2p.Envelope{ + if peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: p, - } - if peer.Send(e) { + Message: MustMsgToProto(msg), + }) { // NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected! ps.SetHasProposal(rs.Proposal) } @@ -800,12 +787,10 @@ OUTER_LOOP: ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), } logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) - p := MustConvertMsgToProto(msg) - e := p2p.Envelope{ + peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: p, - } - peer.Send(e) + Message: MustMsgToProto(msg), + }) } continue OUTER_LOOP } @@ -848,12 +833,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt Part: part, } logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index) - p := MustConvertMsgToProto(msg) - e := p2p.Envelope{ + if peer.Send(p2p.Envelope{ ChannelID: DataChannel, - Message: p, - } - if peer.Send(e) { + Message: MustMsgToProto(msg), + }) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } else { logger.Debug("Sending block part for catchup failed") @@ -1012,17 +995,16 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - p := MustConvertMsgToProto(&VoteSetMaj23Message{ - Height: prs.Height, - Round: prs.Round, - Type: tmproto.PrevoteType, - BlockID: maj23, - }) - e := p2p.Envelope{ + + peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - peer.TrySend(e) + Message: MustMsgToProto(&VoteSetMaj23Message{ + Height: prs.Height, + Round: prs.Round, + Type: tmproto.PrevoteType, + BlockID: maj23, + }), + }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -1034,17 +1016,16 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - p := MustConvertMsgToProto(&VoteSetMaj23Message{ - Height: prs.Height, - Round: prs.Round, - Type: tmproto.PrecommitType, - BlockID: maj23, - }) - e := p2p.Envelope{ + + peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - peer.TrySend(e) + Message: MustMsgToProto(&VoteSetMaj23Message{ + Height: prs.Height, + Round: prs.Round, + Type: tmproto.PrecommitType, + BlockID: maj23, + }), + }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -1056,18 +1037,16 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - p := MustConvertMsgToProto(&VoteSetMaj23Message{ - Height: prs.Height, - Round: prs.ProposalPOLRound, - Type: tmproto.PrevoteType, - BlockID: maj23, - }) - e := p2p.Envelope{ + peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - peer.TrySend(e) + Message: MustMsgToProto(&VoteSetMaj23Message{ + Height: prs.Height, + Round: prs.ProposalPOLRound, + Type: tmproto.PrevoteType, + BlockID: maj23, + }), + }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -1082,17 +1061,15 @@ OUTER_LOOP: if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && prs.Height >= conR.conS.blockStore.Base() { if commit := conR.conS.LoadCommit(prs.Height); commit != nil { - p := MustConvertMsgToProto(&VoteSetMaj23Message{ - Height: prs.Height, - Round: commit.Round, - Type: tmproto.PrecommitType, - BlockID: commit.BlockID, - }) - e := p2p.Envelope{ + peer.TrySend(p2p.Envelope{ ChannelID: StateChannel, - Message: p, - } - peer.TrySend(e) + Message: MustMsgToProto(&VoteSetMaj23Message{ + Height: prs.Height, + Round: commit.Round, + Type: tmproto.PrecommitType, + BlockID: commit.BlockID, + }), + }) time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) } } @@ -1308,12 +1285,10 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { msg := &VoteMessage{vote} ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) - p := MustConvertMsgToProto(msg) - e := p2p.Envelope{ + if ps.peer.Send(p2p.Envelope{ ChannelID: VoteChannel, - Message: p, - } - if ps.peer.Send(e) { + Message: MustMsgToProto(msg), + }) { ps.SetHasVote(vote) return true }