From e18106eedae6735ff2d8be42aa63a8ba381aaf25 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Thu, 20 Oct 2022 13:47:54 -0400 Subject: [PATCH] remove old receive funcs --- consensus/byzantine_test.go | 3 - consensus/reactor.go | 161 ------------------------------------ evidence/reactor.go | 27 ------ mempool/v0/reactor.go | 26 ------ mempool/v1/reactor.go | 27 ------ p2p/pex/pex_reactor.go | 71 ---------------- statesync/reactor.go | 129 ----------------------------- 7 files changed, 444 deletions(-) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index e29057dd1..06d31eeba 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -596,9 +596,6 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) { func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { br.reactor.RemovePeer(peer, reason) } -func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - br.reactor.Receive(chID, peer, msgBytes) -} func (br *ByzantineReactor) NewReceive(e p2p.Envelope) { br.reactor.NewReceive(e) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 3030676cc..c81d8001b 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -227,167 +227,6 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine // NOTE: blocks on consensus state for proposals, block parts, and votes -func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - return //disable and rely on the NewReceive - if !conR.IsRunning() { - conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) - return - } - - msg, err := decodeMsg(msgBytes) - if err != nil { - conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - conR.Switch.StopPeerForError(src, err) - return - } - - if err = msg.ValidateBasic(); err != nil { - conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - conR.Switch.StopPeerForError(src, err) - return - } - - conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - - // Get peer states - ps, ok := src.Get(types.PeerStateKey).(*PeerState) - if !ok { - panic(fmt.Sprintf("Peer %v has no state", src)) - } - - switch chID { - case StateChannel: - switch msg := msg.(type) { - case *NewRoundStepMessage: - conR.conS.mtx.Lock() - initialHeight := conR.conS.state.InitialHeight - conR.conS.mtx.Unlock() - if err = msg.ValidateHeight(initialHeight); err != nil { - conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - conR.Switch.StopPeerForError(src, err) - return - } - ps.ApplyNewRoundStepMessage(msg) - case *NewValidBlockMessage: - ps.ApplyNewValidBlockMessage(msg) - case *HasVoteMessage: - ps.ApplyHasVoteMessage(msg) - case *VoteSetMaj23Message: - cs := conR.conS - cs.mtx.Lock() - height, votes := cs.Height, cs.Votes - cs.mtx.Unlock() - if height != msg.Height { - return - } - // Peer claims to have a maj23 for some BlockID at H,R,S, - err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID) - if err != nil { - conR.Switch.StopPeerForError(src, err) - return - } - // Respond with a VoteSetBitsMessage showing which votes we have. - // (and consequently shows which we don't have) - var ourVotes *bits.BitArray - switch msg.Type { - case tmproto.PrevoteType: - ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) - case tmproto.PrecommitType: - ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) - default: - panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") - } - src.TrySend(p2p.Envelope{ - ChannelID: VoteSetBitsChannel, - Message: MustMsgToProto(&VoteSetBitsMessage{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: msg.BlockID, - Votes: ourVotes, - }), - }) - default: - conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) - } - - case DataChannel: - if conR.WaitSync() { - conR.Logger.Info("Ignoring message received during sync", "msg", msg) - return - } - switch msg := msg.(type) { - case *ProposalMessage: - ps.SetHasProposal(msg.Proposal) - conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} - case *ProposalPOLMessage: - ps.ApplyProposalPOLMessage(msg) - case *BlockPartMessage: - ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) - conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1) - conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} - default: - conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) - } - - case VoteChannel: - if conR.WaitSync() { - conR.Logger.Info("Ignoring message received during sync", "msg", msg) - return - } - switch msg := msg.(type) { - case *VoteMessage: - cs := conR.conS - cs.mtx.RLock() - height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() - cs.mtx.RUnlock() - ps.EnsureVoteBitArrays(height, valSize) - ps.EnsureVoteBitArrays(height-1, lastCommitSize) - ps.SetHasVote(msg.Vote) - - cs.peerMsgQueue <- msgInfo{msg, src.ID()} - - default: - // don't punish (leave room for soft upgrades) - conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) - } - - case VoteSetBitsChannel: - if conR.WaitSync() { - conR.Logger.Info("Ignoring message received during sync", "msg", msg) - return - } - switch msg := msg.(type) { - case *VoteSetBitsMessage: - cs := conR.conS - cs.mtx.Lock() - height, votes := cs.Height, cs.Votes - cs.mtx.Unlock() - - if height == msg.Height { - var ourVotes *bits.BitArray - switch msg.Type { - case tmproto.PrevoteType: - ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) - case tmproto.PrecommitType: - ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) - default: - panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") - } - ps.ApplyVoteSetBitsMessage(msg, ourVotes) - } else { - ps.ApplyVoteSetBitsMessage(msg, nil) - } - default: - // don't punish (leave room for soft upgrades) - conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) - } - - default: - conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID)) - } -} - func (conR *Reactor) NewReceive(e p2p.Envelope) { if !conR.IsRunning() { conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID) diff --git a/evidence/reactor.go b/evidence/reactor.go index 654847c26..adf8bcfe9 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -66,33 +66,6 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) { go evR.broadcastEvidenceRoutine(peer) } -// Receive implements Reactor. -// It adds any received evidence to the evpool. -func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - return - evis, err := decodeMsg(msgBytes) - if err != nil { - evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - evR.Switch.StopPeerForError(src, err) - return - } - - for _, ev := range evis { - err := evR.evpool.AddEvidence(ev) - switch err.(type) { - case *types.ErrInvalidEvidence: - evR.Logger.Error(err.Error()) - // punish peer - evR.Switch.StopPeerForError(src, err) - return - case nil: - default: - // continue to the next piece of evidence - evR.Logger.Error("Evidence has not been added", "evidence", evis, "err", err) - } - } -} - // Receive implements Reactor. // It adds any received evidence to the evpool. func (evR *Reactor) NewReceive(e p2p.Envelope) { diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 6adefbe9e..9583ffb5f 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -156,32 +156,6 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. -func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := decodeMsg(msgBytes) - if err != nil { - memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - memR.Switch.StopPeerForError(src, err) - return - } - memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - - txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)} - if src != nil { - txInfo.SenderP2PID = src.ID() - } - - for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) - if errors.Is(err, mempool.ErrTxInCache) { - memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) - } - } - - // broadcasting happens from go routines per peer -} - func (memR *Reactor) NewReceive(e p2p.Envelope) { msg, err := msgFromProto(e.Message) if err != nil { diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 95dccda23..0be1ded37 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -154,33 +154,6 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // broadcast routine checks if peer is gone and returns } -// Receive implements Reactor. -// It adds any received transactions to the mempool. -func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - return - msg, err := decodeMsg(msgBytes) - if err != nil { - memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - memR.Switch.StopPeerForError(src, err) - return - } - memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - - txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)} - if src != nil { - txInfo.SenderP2PID = src.ID() - } - for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) - if err == mempool.ErrTxInCache { - memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) - } - } - // broadcasting happens from go routines per peer -} - // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) NewReceive(e p2p.Envelope) { diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 4855528c9..e8f26b433 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -237,78 +237,7 @@ func (r *Reactor) logErrAddrBook(err error) { } // Receive implements Reactor by handling incoming PEX messages. -func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) { - return - msg, err := decodeMsg(msgBytes) - if err != nil { - r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - r.Switch.StopPeerForError(src, err) - return - } - r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg) - - switch msg := msg.(type) { - case *tmp2p.PexRequest: - - // NOTE: this is a prime candidate for amplification attacks, - // so it's important we - // 1) restrict how frequently peers can request - // 2) limit the output size - - // If we're a seed and this is an inbound peer, - // respond once and disconnect. - if r.config.SeedMode && !src.IsOutbound() { - id := string(src.ID()) - v := r.lastReceivedRequests.Get(id) - if v != nil { - // FlushStop/StopPeer are already - // running in a go-routine. - return - } - r.lastReceivedRequests.Set(id, time.Now()) - - // Send addrs and disconnect - r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) - go func() { - // In a go-routine so it doesn't block .Receive. - src.FlushStop() - r.Switch.StopPeerGracefully(src) - }() - - } else { - // Check we're not receiving requests too frequently. - if err := r.receiveRequest(src); err != nil { - r.Switch.StopPeerForError(src, err) - r.book.MarkBad(src.SocketAddr(), defaultBanTime) - return - } - r.SendAddrs(src, r.book.GetSelection()) - } - - case *tmp2p.PexAddrs: - // If we asked for addresses, add them to the book - addrs, err := p2p.NetAddressesFromProto(msg.Addrs) - if err != nil { - r.Switch.StopPeerForError(src, err) - r.book.MarkBad(src.SocketAddr(), defaultBanTime) - return - } - err = r.ReceiveAddrs(addrs, src) - if err != nil { - r.Switch.StopPeerForError(src, err) - if err == ErrUnsolicitedList { - r.book.MarkBad(src.SocketAddr(), defaultBanTime) - } - return - } - - default: - r.Logger.Error(fmt.Sprintf("Unknown message type %T", msg)) - } -} - func (r *Reactor) NewReceive(e p2p.Envelope) { - // return msg, err := msgFromProto(e.Message) if err != nil { r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) diff --git a/statesync/reactor.go b/statesync/reactor.go index bd7706652..55cbb789b 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -101,135 +101,6 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { } } -// Receive implements p2p.Reactor. -func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - return - if !r.IsRunning() { - return - } - - msg, err := decodeMsg(msgBytes) - if err != nil { - r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - r.Switch.StopPeerForError(src, err) - return - } - err = validateMsg(msg) - if err != nil { - r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err) - r.Switch.StopPeerForError(src, err) - return - } - - switch chID { - case SnapshotChannel: - switch msg := msg.(type) { - case *ssproto.SnapshotsRequest: - snapshots, err := r.recentSnapshots(recentSnapshots) - if err != nil { - r.Logger.Error("Failed to fetch snapshots", "err", err) - return - } - for _, snapshot := range snapshots { - r.Logger.Debug("Advertising snapshot", "height", snapshot.Height, - "format", snapshot.Format, "peer", src.ID()) - src.Send(p2p.Envelope{ - ChannelID: chID, - Message: mustWrapToProto(&ssproto.SnapshotsResponse{ - Height: snapshot.Height, - Format: snapshot.Format, - Chunks: snapshot.Chunks, - Hash: snapshot.Hash, - Metadata: snapshot.Metadata, - }), - }) - } - - case *ssproto.SnapshotsResponse: - r.mtx.RLock() - defer r.mtx.RUnlock() - if r.syncer == nil { - r.Logger.Debug("Received unexpected snapshot, no state sync in progress") - return - } - r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID()) - _, err := r.syncer.AddSnapshot(src, &snapshot{ - Height: msg.Height, - Format: msg.Format, - Chunks: msg.Chunks, - Hash: msg.Hash, - Metadata: msg.Metadata, - }) - // TODO: We may want to consider punishing the peer for certain errors - if err != nil { - r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format, - "peer", src.ID(), "err", err) - return - } - - default: - r.Logger.Error("Received unknown message %T", msg) - } - - case ChunkChannel: - switch msg := msg.(type) { - case *ssproto.ChunkRequest: - r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format, - "chunk", msg.Index, "peer", src.ID()) - resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{ - Height: msg.Height, - Format: msg.Format, - Chunk: msg.Index, - }) - if err != nil { - r.Logger.Error("Failed to load chunk", "height", msg.Height, "format", msg.Format, - "chunk", msg.Index, "err", err) - return - } - r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format, - "chunk", msg.Index, "peer", src.ID()) - src.Send(p2p.Envelope{ - ChannelID: ChunkChannel, - Message: mustWrapToProto(&ssproto.ChunkResponse{ - Height: msg.Height, - Format: msg.Format, - Index: msg.Index, - Chunk: resp.Chunk, - Missing: resp.Chunk == nil, - }), - }) - - case *ssproto.ChunkResponse: - r.mtx.RLock() - defer r.mtx.RUnlock() - if r.syncer == nil { - r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID()) - return - } - r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format, - "chunk", msg.Index, "peer", src.ID()) - _, err := r.syncer.AddChunk(&chunk{ - Height: msg.Height, - Format: msg.Format, - Index: msg.Index, - Chunk: msg.Chunk, - Sender: src.ID(), - }) - if err != nil { - r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format, - "chunk", msg.Index, "err", err) - return - } - - default: - r.Logger.Error("Received unknown message %T", msg) - } - - default: - r.Logger.Error("Received message on invalid channel %x", chID) - } -} - // NewReceive implements p2p.Reactor. func (r *Reactor) NewReceive(e p2p.Envelope) { if !r.IsRunning() {