diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index d5051325f..cc6e5b8a3 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -156,6 +156,7 @@ func (s *pqScheduler) start() { func (s *pqScheduler) process() { defer s.done.Close() +LOOP: for { select { case e := <-s.enqueueCh: @@ -247,21 +248,24 @@ func (s *pqScheduler) process() { for s.pq.Len() > 0 { pqEnv = heap.Pop(s.pq).(*pqEnvelope) - s.size -= pqEnv.size - - // deduct the Envelope size from all the relevant cumulative sizes - for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ { - s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size - } - - s.metrics.PeerSendBytesTotal.With( - "chID", chIDStr, - "peer_id", string(pqEnv.envelope.To), - "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size)) select { case s.dequeueCh <- pqEnv.envelope: + s.size -= pqEnv.size + + // deduct the Envelope size from all the relevant cumulative sizes + for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ { + s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size + } + + s.metrics.PeerSendBytesTotal.With( + "chID", chIDStr, + "peer_id", string(pqEnv.envelope.To), + "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size)) case <-s.closer.Done(): return + default: + heap.Push(s.pq, pqEnv) + continue LOOP } }