From 648393b37c9992a17b3f03f4da1dfdc2dde29aba Mon Sep 17 00:00:00 2001 From: William Banfield Date: Tue, 1 Nov 2022 15:43:32 -0400 Subject: [PATCH] remove old receives --- blocksync/reactor.go | 19 -------- consensus/byzantine_test.go | 3 -- consensus/reactor.go | 18 ------- evidence/reactor.go | 13 ----- mempool/v0/reactor.go | 19 -------- mempool/v1/reactor.go | 19 -------- p2p/base_reactor.go | 31 ++---------- p2p/peer.go | 95 +++++-------------------------------- p2p/pex/pex_reactor.go | 19 -------- p2p/switch.go | 34 ------------- p2p/switch_test.go | 18 ------- statesync/reactor.go | 20 -------- 12 files changed, 18 insertions(+), 290 deletions(-) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index cb45e4b65..fb0301286 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -5,8 +5,6 @@ import ( "reflect" "time" - "github.com/cosmos/gogoproto/proto" - "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" @@ -235,23 +233,6 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { } } -func (bcR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *bcproto.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - uw, err := msg.Unwrap() - if err != nil { - panic(err) - } - bcR.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: uw, - }) -} - // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (bcR *Reactor) poolRoutine(stateSynced bool) { diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index cd405914b..1e8b58f59 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -602,7 +602,4 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) { br.reactor.ReceiveEnvelope(e) } -func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) { - br.reactor.Receive(chID, p, m) -} func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } diff --git a/consensus/reactor.go b/consensus/reactor.go index 90344f121..40c4f8889 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/gogo/protobuf/proto" cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/libs/bits" tmevents "github.com/tendermint/tendermint/libs/events" @@ -388,23 +387,6 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { } } -func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *tmcons.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - uw, err := msg.Unwrap() - if err != nil { - panic(err) - } - conR.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: uw, - }) -} - // SetEventBus sets event bus. func (conR *Reactor) SetEventBus(b *types.EventBus) { conR.eventBus = b diff --git a/evidence/reactor.go b/evidence/reactor.go index 9ff6e785a..d8ebeac50 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -92,19 +92,6 @@ func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) { } } -func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *tmproto.EvidenceList - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - evR.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: msg, - }) -} - // SetEventBus implements events.Eventable. func (evR *Reactor) SetEventBus(b *types.EventBus) { evR.eventBus = b diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 21a25fd81..2fcd8a2a2 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/cosmos/gogoproto/proto" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -190,23 +188,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { // broadcasting happens from go routines per peer } -func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *protomem.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - uw, err := msg.Unwrap() - if err != nil { - panic(err) - } - memR.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: uw, - }) -} - // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 533584fa3..c18e37dbe 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/cosmos/gogoproto/proto" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -189,23 +187,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { // broadcasting happens from go routines per peer } -func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *protomem.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - uw, err := msg.Unwrap() - if err != nil { - panic(err) - } - memR.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: uw, - }) -} - // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index daabab316..aed772297 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -38,28 +38,8 @@ type Reactor interface { // or other reason). RemovePeer(peer Peer, reason interface{}) - // Receive is called by the switch when msgBytes is received from the peer. - // - // NOTE reactor can not keep msgBytes around after Receive completes without - // copying. - // - // CONTRACT: msgBytes are not nil. - // - // Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope - // is implemented, it will be used, otherwise the switch will fallback to - // using Receive. - // Deprecated: Reactors looking to receive data from a peer should implement ReceiveEnvelope. - // Receive will be deprecated in favor of ReceiveEnvelope in v0.38. - Receive(chID byte, peer Peer, msgBytes []byte) -} - -type ReceiveEnveloper interface { // ReceiveEnvelope is called by the switch when an envelope is received from any connected // peer on any of the channels registered by the reactor. - // - // Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope - // is implemented, it will be used, otherwise the switch will fallback to - // using Receive. Receive will be replaced by ReceiveEnvelope in a future version ReceiveEnvelope(Envelope) } @@ -80,9 +60,8 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor { func (br *BaseReactor) SetSwitch(sw *Switch) { br.Switch = sw } -func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } -func (*BaseReactor) AddPeer(peer Peer) {} -func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} -func (*BaseReactor) ReceiveEnvelope(e Envelope) {} -func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} -func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } +func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } +func (*BaseReactor) AddPeer(peer Peer) {} +func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} +func (*BaseReactor) ReceiveEnvelope(e Envelope) {} +func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } diff --git a/p2p/peer.go b/p2p/peer.go index 82af52703..6d9019bc8 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -40,14 +40,6 @@ type Peer interface { SendEnvelope(Envelope) bool TrySendEnvelope(Envelope) bool - // Deprecated: entities looking to act as peers should implement SendEnvelope instead. - // Send will be removed in v0.38. - Send(byte, []byte) bool - - // Deprecated: entities looking to act as peers should implement TrySendEnvelope instead. - // TrySend will be removed in v0.38. - TrySend(byte, []byte) bool - Set(string, interface{}) Get(string) interface{} } @@ -260,64 +252,22 @@ func (p *peer) Status() tmconn.ConnectionStatus { // SendEnvelope sends the message in the envelope on the channel specified by the // envelope. Returns false if the connection times out trying to place the message // onto its internal queue. -// Using SendEnvelope allows for tracking the message bytes sent and received by message type -// as a metric which Send cannot support. func (p *peer) SendEnvelope(e Envelope) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(e.ChannelID) { - return false - } - msg := e.Message - metricLabelValue := p.mlc.ValueToMetricLabel(msg) - if w, ok := msg.(Wrapper); ok { - msg = w.Wrap() - } - msgBytes, err := proto.Marshal(msg) - if err != nil { - p.Logger.Error("marshaling message to send", "error", err) - return false - } - res := p.Send(e.ChannelID, msgBytes) - if res { - p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) - } - return res -} - -// Send msg bytes to the channel identified by chID byte. Returns false if the -// send queue is full after timeout, specified by MConnection. -// SendEnvelope replaces Send which will be deprecated in a future release. -func (p *peer) Send(chID byte, msgBytes []byte) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(chID) { - return false - } - res := p.mconn.Send(chID, msgBytes) - if res { - labels := []string{ - "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", chID), - } - p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) - } - return res + return p.send(e.ChannelID, e.Message, p.mconn.Send) } // TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the // envelope. Returns false immediately if the connection's internal queue is full -// Using TrySendEnvelope allows for tracking the message bytes sent and received by message type -// as a metric which TrySend cannot support. func (p *peer) TrySendEnvelope(e Envelope) bool { + return p.send(e.ChannelID, e.Message, p.mconn.TrySend) +} + +func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool { if !p.IsRunning() { - // see Switch#Broadcast, where we fetch the list of peers and loop over - // them - while we're looping, one peer may be removed and stopped. return false - } else if !p.hasChannel(e.ChannelID) { + } else if !p.hasChannel(chID) { return false } - msg := e.Message metricLabelValue := p.mlc.ValueToMetricLabel(msg) if w, ok := msg.(Wrapper); ok { msg = w.Wrap() @@ -327,29 +277,14 @@ func (p *peer) TrySendEnvelope(e Envelope) bool { p.Logger.Error("marshaling message to send", "error", err) return false } - res := p.TrySend(e.ChannelID, msgBytes) - if res { - p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) - } - return res -} - -// TrySend msg bytes to the channel identified by chID byte. Immediately returns -// false if the send queue is full. -// TrySendEnvelope replaces TrySend which will be deprecated in a future release. -func (p *peer) TrySend(chID byte, msgBytes []byte) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(chID) { - return false - } - res := p.mconn.TrySend(chID, msgBytes) + res := sendFunc(chID, msgBytes) if res { labels := []string{ "peer_id", string(p.ID()), "chID", fmt.Sprintf("%#x", chID), } p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) + p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) } return res } @@ -474,15 +409,11 @@ func createMConnection( } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) - if nr, ok := reactor.(ReceiveEnveloper); ok { - nr.ReceiveEnvelope(Envelope{ - ChannelID: chID, - Src: p, - Message: msg, - }) - } else { - reactor.Receive(chID, p, msgBytes) - } + reactor.ReceiveEnvelope(Envelope{ + ChannelID: chID, + Src: p, + Message: msg, + }) } onError := func(r interface{}) { diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index e0763e29c..f1feb0eaa 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -6,8 +6,6 @@ import ( "sync" "time" - "github.com/cosmos/gogoproto/proto" - "github.com/tendermint/tendermint/libs/cmap" tmmath "github.com/tendermint/tendermint/libs/math" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -300,23 +298,6 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) { } } -func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *tmp2p.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - um, err := msg.Unwrap() - if err != nil { - panic(err) - } - r.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: um, - }) -} - // enforces a minimum amount of time between requests func (r *Reactor) receiveRequest(src Peer) error { id := string(src.ID()) diff --git a/p2p/switch.go b/p2p/switch.go index cf2cb8eb0..8f3d7ae2c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -293,40 +293,6 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) chan bool { return successChan } -// Broadcast runs a go routine for each attempted send, which will block trying -// to send for defaultSendTimeoutSeconds. Returns a channel which receives -// success values for each attempted send (false if times out). Channel will be -// closed once msg bytes are sent to all peers (or time out). -// Broadcasts sends to the peers using the Send method. -// -// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -// -// Deprecated: code looking to broadcast data to all peers should use BroadcastEnvelope. -// Broadcast will be removed in 0.38. -func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { - sw.Logger.Debug("Broadcast", "channel", chID) - - peers := sw.peers.List() - var wg sync.WaitGroup - wg.Add(len(peers)) - successChan := make(chan bool, len(peers)) - - for _, peer := range peers { - go func(p Peer) { - defer wg.Done() - success := p.Send(chID, msgBytes) - successChan <- success - }(peer) - } - - go func() { - wg.Wait() - close(successChan) - }() - - return successChan -} - // NumPeers returns the count of outbound/inbound and outbound-dialing peers. // unconditional peers are not counted here. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 9943437da..2a610ed07 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -81,24 +81,6 @@ func (tr *TestReactor) ReceiveEnvelope(e Envelope) { } } -func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) { - var msg *p2pproto.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - um, err := msg.Unwrap() - if err != nil { - panic(err) - } - - tr.ReceiveEnvelope(Envelope{ - ChannelID: chID, - Src: peer, - Message: um, - }) -} - func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { tr.mtx.Lock() defer tr.mtx.Unlock() diff --git a/statesync/reactor.go b/statesync/reactor.go index 467ec795d..10cf7bd9e 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -5,8 +5,6 @@ import ( "sort" "time" - "github.com/cosmos/gogoproto/proto" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/libs/sync" @@ -225,24 +223,6 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) { } } -func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - var msg *ssproto.Message - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - um, err := msg.Unwrap() - if err != nil { - panic(err) - } - - r.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: um, - }) -} - // recentSnapshots fetches the n most recent snapshots from the app func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) { resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{})