remove old receive funcs

This commit is contained in:
William Banfield
2022-10-20 13:47:54 -04:00
parent 922ba6ab67
commit e18106eeda
7 changed files with 0 additions and 444 deletions

View File

@@ -596,9 +596,6 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) NewReceive(e p2p.Envelope) {
br.reactor.NewReceive(e)
}

View File

@@ -227,167 +227,6 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return //disable and rely on the NewReceive
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
return
}
msg, err := decodeMsg(msgBytes)
if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
}
switch chID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
conR.conS.mtx.Lock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
case *NewValidBlockMessage:
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height != msg.Height {
return
}
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *bits.BitArray
switch msg.Type {
case tmproto.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case tmproto.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: MustMsgToProto(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}),
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case DataChannel:
if conR.WaitSync() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteChannel:
if conR.WaitSync() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteSetBitsChannel:
if conR.WaitSync() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
var ourVotes *bits.BitArray
switch msg.Type {
case tmproto.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case tmproto.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
ps.ApplyVoteSetBitsMessage(msg, ourVotes)
} else {
ps.ApplyVoteSetBitsMessage(msg, nil)
}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
}
}
func (conR *Reactor) NewReceive(e p2p.Envelope) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)

View File

@@ -66,33 +66,6 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
evis, err := decodeMsg(msgBytes)
if err != nil {
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
evR.Switch.StopPeerForError(src, err)
return
}
for _, ev := range evis {
err := evR.evpool.AddEvidence(ev)
switch err.(type) {
case *types.ErrInvalidEvidence:
evR.Logger.Error(err.Error())
// punish peer
evR.Switch.StopPeerForError(src, err)
return
case nil:
default:
// continue to the next piece of evidence
evR.Logger.Error("Evidence has not been added", "evidence", evis, "err", err)
}
}
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *Reactor) NewReceive(e p2p.Envelope) {

View File

@@ -156,32 +156,6 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
memR.Switch.StopPeerForError(src, err)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
func (memR *Reactor) NewReceive(e p2p.Envelope) {
msg, err := msgFromProto(e.Message)
if err != nil {

View File

@@ -154,33 +154,6 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// broadcast routine checks if peer is gone and returns
}
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
msg, err := decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
memR.Switch.StopPeerForError(src, err)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if err == mempool.ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) NewReceive(e p2p.Envelope) {

View File

@@ -237,78 +237,7 @@ func (r *Reactor) logErrAddrBook(err error) {
}
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
return
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
// so it's important we
// 1) restrict how frequently peers can request
// 2) limit the output size
// If we're a seed and this is an inbound peer,
// respond once and disconnect.
if r.config.SeedMode && !src.IsOutbound() {
id := string(src.ID())
v := r.lastReceivedRequests.Get(id)
if v != nil {
// FlushStop/StopPeer are already
// running in a go-routine.
return
}
r.lastReceivedRequests.Set(id, time.Now())
// Send addrs and disconnect
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
// In a go-routine so it doesn't block .Receive.
src.FlushStop()
r.Switch.StopPeerGracefully(src)
}()
} else {
// Check we're not receiving requests too frequently.
if err := r.receiveRequest(src); err != nil {
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
return
}
r.SendAddrs(src, r.book.GetSelection())
}
case *tmp2p.PexAddrs:
// If we asked for addresses, add them to the book
addrs, err := p2p.NetAddressesFromProto(msg.Addrs)
if err != nil {
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
return
}
err = r.ReceiveAddrs(addrs, src)
if err != nil {
r.Switch.StopPeerForError(src, err)
if err == ErrUnsolicitedList {
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
}
return
}
default:
r.Logger.Error(fmt.Sprintf("Unknown message type %T", msg))
}
}
func (r *Reactor) NewReceive(e p2p.Envelope) {
// return
msg, err := msgFromProto(e.Message)
if err != nil {
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)

View File

@@ -101,135 +101,6 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
}
}
// Receive implements p2p.Reactor.
func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
if !r.IsRunning() {
return
}
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
err = validateMsg(msg)
if err != nil {
r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
switch chID {
case SnapshotChannel:
switch msg := msg.(type) {
case *ssproto.SnapshotsRequest:
snapshots, err := r.recentSnapshots(recentSnapshots)
if err != nil {
r.Logger.Error("Failed to fetch snapshots", "err", err)
return
}
for _, snapshot := range snapshots {
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
"format", snapshot.Format, "peer", src.ID())
src.Send(p2p.Envelope{
ChannelID: chID,
Message: mustWrapToProto(&ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
}),
})
}
case *ssproto.SnapshotsResponse:
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer == nil {
r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
return
}
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
_, err := r.syncer.AddSnapshot(src, &snapshot{
Height: msg.Height,
Format: msg.Format,
Chunks: msg.Chunks,
Hash: msg.Hash,
Metadata: msg.Metadata,
})
// TODO: We may want to consider punishing the peer for certain errors
if err != nil {
r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
"peer", src.ID(), "err", err)
return
}
default:
r.Logger.Error("Received unknown message %T", msg)
}
case ChunkChannel:
switch msg := msg.(type) {
case *ssproto.ChunkRequest:
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
Height: msg.Height,
Format: msg.Format,
Chunk: msg.Index,
})
if err != nil {
r.Logger.Error("Failed to load chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "err", err)
return
}
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
src.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: mustWrapToProto(&ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
}),
})
case *ssproto.ChunkResponse:
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer == nil {
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
return
}
r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
_, err := r.syncer.AddChunk(&chunk{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: msg.Chunk,
Sender: src.ID(),
})
if err != nil {
r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "err", err)
return
}
default:
r.Logger.Error("Received unknown message %T", msg)
}
default:
r.Logger.Error("Received message on invalid channel %x", chID)
}
}
// NewReceive implements p2p.Reactor.
func (r *Reactor) NewReceive(e p2p.Envelope) {
if !r.IsRunning() {