diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index e4560c7bd..b19436e15 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -167,13 +167,12 @@ func (s *pqScheduler) process() { timestamp: time.Now().UTC(), } - s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) - // enqueue // Check if we have sufficient capacity to simply enqueue the incoming // Envelope. if s.size+pqEnv.size <= s.capacity { + s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) // enqueue the incoming Envelope s.push(pqEnv) } else { @@ -213,6 +212,8 @@ func (s *pqScheduler) process() { "capacity", s.capacity, ) + s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnvTmp.envelope.To)).Add(float64(-pqEnvTmp.size)) + // dequeue/drop from the priority queue heap.Remove(s.pq, pqEnvTmp.index) @@ -257,6 +258,8 @@ func (s *pqScheduler) process() { s.metrics.PeerSendBytesTotal.With( "chID", chIDStr, "peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) + s.metrics.PeerPendingSendBytes.With( + "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size)) select { case s.dequeueCh <- pqEnv.envelope: case <-s.closer.Done():