improve NewReceive on cons reactor

This commit is contained in:
William Banfield
2022-10-19 20:52:20 -04:00
parent 7b41c03409
commit 1053a49b4e
2 changed files with 27 additions and 20 deletions

View File

@@ -393,21 +393,28 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
return
}
if err := e.Message.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
if err != nil {
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
if err = msg.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
switch chID {
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
// Get peer states
ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", e.Src))
}
switch e.ChannelID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
@@ -415,8 +422,8 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
@@ -435,7 +442,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(src, err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
@@ -459,7 +466,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
if err != nil {
panic(err)
}
src.TrySend(p2p.Envelope{
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: p,
})
@@ -475,13 +482,13 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -501,7 +508,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
// don't punish (leave room for soft upgrades)
@@ -540,7 +547,7 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) {
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID))
}
}

View File

@@ -69,6 +69,6 @@ func (br *BaseReactor) SetSwitch(sw *Switch) {
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) NewReceive(e p2p.Envelope) {}
func (*BaseReactor) NewReceive(e Envelope) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }