Compare commits

...

3 Commits

Author SHA1 Message Date
William Banfield
90dd875c11 fix metric 2021-10-13 16:01:29 -04:00
William Banfield
e80807a07e only store when pqueue is added to 2021-10-13 15:50:43 -04:00
William Banfield
c9521c9e2d p2p: fix priority queue bytes pending metric 2021-10-13 15:46:30 -04:00

View File

@@ -167,13 +167,12 @@ func (s *pqScheduler) process() {
timestamp: time.Now().UTC(),
}
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
// enqueue
// Check if we have sufficient capacity to simply enqueue the incoming
// Envelope.
if s.size+pqEnv.size <= s.capacity {
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
// enqueue the incoming Envelope
s.push(pqEnv)
} else {
@@ -213,6 +212,8 @@ func (s *pqScheduler) process() {
"capacity", s.capacity,
)
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnvTmp.envelope.To)).Add(float64(-pqEnvTmp.size))
// dequeue/drop from the priority queue
heap.Remove(s.pq, pqEnvTmp.index)
@@ -257,6 +258,8 @@ func (s *pqScheduler) process() {
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
s.metrics.PeerPendingSendBytes.With(
"peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
case <-s.closer.Done():