diff --git a/consensus/reactor.go b/consensus/reactor.go index b91915206..6bc831439 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -387,6 +387,163 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } } +func (conR *Reactor) NewReceive(e p2p.Envelope) { + if !conR.IsRunning() { + conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID) + return + } + + if err := e.Message.ValidateBasic(); err != nil { + conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + conR.Switch.StopPeerForError(src, err) + return + } + + conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) + + // Get peer states + ps, ok := src.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", src)) + } + + switch chID { + case StateChannel: + switch msg := msg.(type) { + case *NewRoundStepMessage: + conR.conS.mtx.Lock() + initialHeight := conR.conS.state.InitialHeight + conR.conS.mtx.Unlock() + if err = msg.ValidateHeight(initialHeight); err != nil { + conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + conR.Switch.StopPeerForError(src, err) + return + } + ps.ApplyNewRoundStepMessage(msg) + case *NewValidBlockMessage: + ps.ApplyNewValidBlockMessage(msg) + case *HasVoteMessage: + ps.ApplyHasVoteMessage(msg) + case *VoteSetMaj23Message: + cs := conR.conS + cs.mtx.Lock() + height, votes := cs.Height, cs.Votes + cs.mtx.Unlock() + if height != msg.Height { + return + } + // Peer claims to have a maj23 for some BlockID at H,R,S, + err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID) + if err != nil { + conR.Switch.StopPeerForError(src, err) + return + } + // Respond with a VoteSetBitsMessage showing which votes we have. + // (and consequently shows which we don't have) + var ourVotes *bits.BitArray + switch msg.Type { + case tmproto.PrevoteType: + ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case tmproto.PrecommitType: + ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") + } + p, err := MsgToProto(&VoteSetBitsMessage{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID, + Votes: ourVotes, + }) + if err != nil { + panic(err) + } + src.TrySend(p2p.Envelope{ + ChannelID: VoteSetBitsChannel, + Message: p, + }) + default: + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + case DataChannel: + if conR.WaitSync() { + conR.Logger.Info("Ignoring message received during sync", "msg", msg) + return + } + switch msg := msg.(type) { + case *ProposalMessage: + ps.SetHasProposal(msg.Proposal) + conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} + case *ProposalPOLMessage: + ps.ApplyProposalPOLMessage(msg) + case *BlockPartMessage: + ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) + conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1) + conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} + default: + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + case VoteChannel: + if conR.WaitSync() { + conR.Logger.Info("Ignoring message received during sync", "msg", msg) + return + } + switch msg := msg.(type) { + case *VoteMessage: + cs := conR.conS + cs.mtx.RLock() + height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() + cs.mtx.RUnlock() + ps.EnsureVoteBitArrays(height, valSize) + ps.EnsureVoteBitArrays(height-1, lastCommitSize) + ps.SetHasVote(msg.Vote) + + cs.peerMsgQueue <- msgInfo{msg, src.ID()} + + default: + // don't punish (leave room for soft upgrades) + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + case VoteSetBitsChannel: + if conR.WaitSync() { + conR.Logger.Info("Ignoring message received during sync", "msg", msg) + return + } + switch msg := msg.(type) { + case *VoteSetBitsMessage: + cs := conR.conS + cs.mtx.Lock() + height, votes := cs.Height, cs.Votes + cs.mtx.Unlock() + + if height == msg.Height { + var ourVotes *bits.BitArray + switch msg.Type { + case tmproto.PrevoteType: + ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case tmproto.PrecommitType: + ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") + } + ps.ApplyVoteSetBitsMessage(msg, ourVotes) + } else { + ps.ApplyVoteSetBitsMessage(msg, nil) + } + default: + // don't punish (leave room for soft upgrades) + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + default: + conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID)) + } +} + // SetEventBus sets event bus. func (conR *Reactor) SetEventBus(b *types.EventBus) { conR.eventBus = b diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 86b0d980a..14025b590 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -67,5 +67,6 @@ func (br *BaseReactor) SetSwitch(sw *Switch) { func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} +func (*BaseReactor) NewReceive(e p2p.Envelope) {} func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } diff --git a/p2p/types.go b/p2p/types.go index e38fbe366..e76388ff4 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -9,6 +9,8 @@ type ChannelDescriptor = conn.ChannelDescriptor type ConnectionStatus = conn.ConnectionStatus type Envelope struct { + // Src is set when the message was sent by a remote peer. + Src Peer ChannelID byte Message proto.Message }