remove old receives

This commit is contained in:
William Banfield
2022-11-01 15:43:32 -04:00
parent 9be4f462fa
commit 648393b37c
12 changed files with 18 additions and 290 deletions

View File

@@ -5,8 +5,6 @@ import (
"reflect"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
@@ -235,23 +233,6 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}
func (bcR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
bcR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {

View File

@@ -602,7 +602,4 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
br.reactor.ReceiveEnvelope(e)
}
func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) {
br.reactor.Receive(chID, p, m)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

View File

@@ -7,7 +7,6 @@ import (
"sync"
"time"
"github.com/gogo/protobuf/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -388,23 +387,6 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}
func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmcons.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
conR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}
// SetEventBus sets event bus.
func (conR *Reactor) SetEventBus(b *types.EventBus) {
conR.eventBus = b

View File

@@ -92,19 +92,6 @@ func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}
func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmproto.EvidenceList
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
evR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: msg,
})
}
// SetEventBus implements events.Eventable.
func (evR *Reactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -190,23 +188,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// broadcasting happens from go routines per peer
}
func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
memR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
@@ -189,23 +187,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// broadcasting happens from go routines per peer
}
func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
memR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64

View File

@@ -38,28 +38,8 @@ type Reactor interface {
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
//
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive.
// Deprecated: Reactors looking to receive data from a peer should implement ReceiveEnvelope.
// Receive will be deprecated in favor of ReceiveEnvelope in v0.38.
Receive(chID byte, peer Peer, msgBytes []byte)
}
type ReceiveEnveloper interface {
// ReceiveEnvelope is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor.
//
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive. Receive will be replaced by ReceiveEnvelope in a future version
ReceiveEnvelope(Envelope)
}
@@ -80,9 +60,8 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

View File

@@ -40,14 +40,6 @@ type Peer interface {
SendEnvelope(Envelope) bool
TrySendEnvelope(Envelope) bool
// Deprecated: entities looking to act as peers should implement SendEnvelope instead.
// Send will be removed in v0.38.
Send(byte, []byte) bool
// Deprecated: entities looking to act as peers should implement TrySendEnvelope instead.
// TrySend will be removed in v0.38.
TrySend(byte, []byte) bool
Set(string, interface{})
Get(string) interface{}
}
@@ -260,64 +252,22 @@ func (p *peer) Status() tmconn.ConnectionStatus {
// SendEnvelope sends the message in the envelope on the channel specified by the
// envelope. Returns false if the connection times out trying to place the message
// onto its internal queue.
// Using SendEnvelope allows for tracking the message bytes sent and received by message type
// as a metric which Send cannot support.
func (p *peer) SendEnvelope(e Envelope) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(e.ChannelID) {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
p.Logger.Error("marshaling message to send", "error", err)
return false
}
res := p.Send(e.ChannelID, msgBytes)
if res {
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
// SendEnvelope replaces Send which will be deprecated in a future release.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
return res
return p.send(e.ChannelID, e.Message, p.mconn.Send)
}
// TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the
// envelope. Returns false immediately if the connection's internal queue is full
// Using TrySendEnvelope allows for tracking the message bytes sent and received by message type
// as a metric which TrySend cannot support.
func (p *peer) TrySendEnvelope(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.TrySend)
}
func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(e.ChannelID) {
} else if !p.hasChannel(chID) {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
@@ -327,29 +277,14 @@ func (p *peer) TrySendEnvelope(e Envelope) bool {
p.Logger.Error("marshaling message to send", "error", err)
return false
}
res := p.TrySend(e.ChannelID, msgBytes)
if res {
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
// TrySendEnvelope replaces TrySend which will be deprecated in a future release.
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
res := sendFunc(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
@@ -474,15 +409,11 @@ func createMConnection(
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
if nr, ok := reactor.(ReceiveEnveloper); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
} else {
reactor.Receive(chID, p, msgBytes)
}
reactor.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
}
onError := func(r interface{}) {

View File

@@ -6,8 +6,6 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
@@ -300,23 +298,6 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmp2p.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
um, err := msg.Unwrap()
if err != nil {
panic(err)
}
r.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: um,
})
}
// enforces a minimum amount of time between requests
func (r *Reactor) receiveRequest(src Peer) error {
id := string(src.ID())

View File

@@ -293,40 +293,6 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) chan bool {
return successChan
}
// Broadcast runs a go routine for each attempted send, which will block trying
// to send for defaultSendTimeoutSeconds. Returns a channel which receives
// success values for each attempted send (false if times out). Channel will be
// closed once msg bytes are sent to all peers (or time out).
// Broadcasts sends to the peers using the Send method.
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
//
// Deprecated: code looking to broadcast data to all peers should use BroadcastEnvelope.
// Broadcast will be removed in 0.38.
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
sw.Logger.Debug("Broadcast", "channel", chID)
peers := sw.peers.List()
var wg sync.WaitGroup
wg.Add(len(peers))
successChan := make(chan bool, len(peers))
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(chID, msgBytes)
successChan <- success
}(peer)
}
go func() {
wg.Wait()
close(successChan)
}()
return successChan
}
// NumPeers returns the count of outbound/inbound and outbound-dialing peers.
// unconditional peers are not counted here.
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {

View File

@@ -81,24 +81,6 @@ func (tr *TestReactor) ReceiveEnvelope(e Envelope) {
}
}
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
var msg *p2pproto.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
um, err := msg.Unwrap()
if err != nil {
panic(err)
}
tr.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: peer,
Message: um,
})
}
func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
tr.mtx.Lock()
defer tr.mtx.Unlock()

View File

@@ -5,8 +5,6 @@ import (
"sort"
"time"
"github.com/cosmos/gogoproto/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
tmsync "github.com/tendermint/tendermint/libs/sync"
@@ -225,24 +223,6 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *ssproto.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
um, err := msg.Unwrap()
if err != nil {
panic(err)
}
r.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: um,
})
}
// recentSnapshots fetches the n most recent snapshots from the app
func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{})