p2p: pqueue proposal

This commit is contained in:
William Banfield
2022-06-29 18:16:27 -04:00
parent 978f754ad3
commit 15f2436c28

View File

@@ -156,6 +156,7 @@ func (s *pqScheduler) start() {
func (s *pqScheduler) process() {
defer s.done.Close()
LOOP:
for {
select {
case e := <-s.enqueueCh:
@@ -247,21 +248,24 @@ func (s *pqScheduler) process() {
for s.pq.Len() > 0 {
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
case <-s.closer.Done():
return
default:
heap.Push(s.pq, pqEnv)
continue LOOP
}
}