From 738457a63f4c5d0639b30d73c9cefcf6184c2d83 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 1 Jul 2022 18:40:34 -0400 Subject: [PATCH] p2p: make queue read and write simultaneously --- internal/p2p/pqueue.go | 48 +++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index 268daa8de..b8adc44d8 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -151,7 +151,9 @@ func (s *pqScheduler) closed() <-chan struct{} { return s.done } // non-empty, we pop the top Envelope and send it on the dequeueCh. func (s *pqScheduler) process(ctx context.Context) { defer close(s.done) + dequeueReady := make(chan struct{}, 1) +LOOP: for { select { case e := <-s.enqueueCh: @@ -239,28 +241,36 @@ func (s *pqScheduler) process(ctx context.Context) { ) } } - - // dequeue - + select { + case dequeueReady <- struct{}{}: + default: + } + case <-dequeueReady: for s.pq.Len() > 0 { - pqEnv = heap.Pop(s.pq).(*pqEnvelope) - s.size -= pqEnv.size - - // deduct the Envelope size from all the relevant cumulative sizes - for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ { - s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size - } - - s.metrics.PeerSendBytesTotal.With( - "chID", chIDStr, - "peer_id", string(pqEnv.envelope.To), - "message_type", s.lc.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size)) - s.metrics.PeerPendingSendBytes.With( - "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size)) + pqEnv := heap.Pop(s.pq).(*pqEnvelope) select { case s.dequeueCh <- pqEnv.envelope: - case <-s.closeCh: - return + s.size -= pqEnv.size + + // deduct the Envelope size from all the relevant cumulative sizes + for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ { + s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size + } + + chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID)) + s.metrics.PeerSendBytesTotal.With( + "chID", chIDStr, + "peer_id", string(pqEnv.envelope.To), + "message_type", s.lc.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size)) + s.metrics.PeerPendingSendBytes.With( + "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size)) + default: + heap.Push(s.pq, pqEnv) + select { + case dequeueReady <- struct{}{}: + default: + } + continue LOOP } } case <-ctx.Done():