diff --git a/consensus/reactor.go b/consensus/reactor.go index 6bc831439..586a20ff4 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -393,21 +393,28 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { return } - if err := e.Message.ValidateBasic(); err != nil { - conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - conR.Switch.StopPeerForError(src, err) + msg, err := MsgFromProto(e.Message.(*tmcons.Message)) + if err != nil { + conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) + conR.Switch.StopPeerForError(e.Src, err) return } - conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - - // Get peer states - ps, ok := src.Get(types.PeerStateKey).(*PeerState) - if !ok { - panic(fmt.Sprintf("Peer %v has no state", src)) + if err = msg.ValidateBasic(); err != nil { + conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err) + conR.Switch.StopPeerForError(e.Src, err) + return } - switch chID { + conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg) + + // Get peer states + ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", e.Src)) + } + + switch e.ChannelID { case StateChannel: switch msg := msg.(type) { case *NewRoundStepMessage: @@ -415,8 +422,8 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { initialHeight := conR.conS.state.InitialHeight conR.conS.mtx.Unlock() if err = msg.ValidateHeight(initialHeight); err != nil { - conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - conR.Switch.StopPeerForError(src, err) + conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err) + conR.Switch.StopPeerForError(e.Src, err) return } ps.ApplyNewRoundStepMessage(msg) @@ -435,7 +442,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { // Peer claims to have a maj23 for some BlockID at H,R,S, err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID) if err != nil { - conR.Switch.StopPeerForError(src, err) + conR.Switch.StopPeerForError(e.Src, err) return } // Respond with a VoteSetBitsMessage showing which votes we have. @@ -459,7 +466,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { if err != nil { panic(err) } - src.TrySend(p2p.Envelope{ + e.Src.TrySend(p2p.Envelope{ ChannelID: VoteSetBitsChannel, Message: p, }) @@ -475,13 +482,13 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) - conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} + conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()} case *ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) - conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1) - conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} + conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) + conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } @@ -501,7 +508,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - cs.peerMsgQueue <- msgInfo{msg, src.ID()} + cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()} default: // don't punish (leave room for soft upgrades) @@ -540,7 +547,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { } default: - conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID)) + conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID)) } } diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index cac9054f2..762b948f9 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -69,6 +69,6 @@ func (br *BaseReactor) SetSwitch(sw *Switch) { func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} -func (*BaseReactor) NewReceive(e p2p.Envelope) {} +func (*BaseReactor) NewReceive(e Envelope) {} func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }