receive start

This commit is contained in:
William Banfield
2022-10-19 17:27:23 -04:00
parent 14510c4eb0
commit e20d28429d
3 changed files with 160 additions and 0 deletions

View File

@@ -387,6 +387,163 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
}
func (conR *Reactor) NewReceive(e p2p.Envelope) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
return
}
if err := e.Message.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
}
switch chID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
conR.conS.mtx.Lock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
case *NewValidBlockMessage:
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height != msg.Height {
return
}
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *bits.BitArray
switch msg.Type {
case tmproto.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case tmproto.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
p, err := MsgToProto(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
})
if err != nil {
panic(err)
}
src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: p,
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case DataChannel:
if conR.WaitSync() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteChannel:
if conR.WaitSync() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteSetBitsChannel:
if conR.WaitSync() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
var ourVotes *bits.BitArray
switch msg.Type {
case tmproto.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case tmproto.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
ps.ApplyVoteSetBitsMessage(msg, ourVotes)
} else {
ps.ApplyVoteSetBitsMessage(msg, nil)
}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
}
}
// SetEventBus sets event bus.
func (conR *Reactor) SetEventBus(b *types.EventBus) {
conR.eventBus = b

View File

@@ -67,5 +67,6 @@ func (br *BaseReactor) SetSwitch(sw *Switch) {
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) NewReceive(e p2p.Envelope) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

View File

@@ -9,6 +9,8 @@ type ChannelDescriptor = conn.ChannelDescriptor
type ConnectionStatus = conn.ConnectionStatus
type Envelope struct {
// Src is set when the message was sent by a remote peer.
Src Peer
ChannelID byte
Message proto.Message
}