remove old Sends

This commit is contained in:
William Banfield
2022-10-19 13:10:11 -04:00
parent 73d0f8116a
commit 64c18a4406
2 changed files with 6 additions and 46 deletions

View File

@@ -36,8 +36,8 @@ type Peer interface {
Status() tmconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket
NewSend(Envelope) bool
NewTrySend(Envelope) bool
Send(Envelope) bool
TrySend(Envelope) bool
Set(string, interface{})
Get(string) interface{}
@@ -249,9 +249,9 @@ func (p *peer) Status() tmconn.ConnectionStatus {
return p.mconn.Status()
}
// NewSend msg bytes to the channel identified by chID byte. Returns false if the
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) NewSend(e Envelope) bool {
func (p *peer) Send(e Envelope) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
@@ -274,30 +274,9 @@ func (p *peer) NewSend(e Envelope) bool {
return res
}
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
return res
}
// NewSend msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) NewTrySend(e Envelope) bool {
func (p *peer) TrySend(e Envelope) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
@@ -320,25 +299,6 @@ func (p *peer) NewTrySend(e Envelope) bool {
return res
}
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
return res
}
// Get the data for a given key.
func (p *peer) Get(key string) interface{} {
return p.Data.Get(key)

View File

@@ -303,7 +303,7 @@ func (sw *Switch) NewBroadcast(e Envelope) chan bool {
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.NewSend(e)
success := p.Send(e)
successChan <- success
}(peer)
}