From d1a16e8ff0393b80d45eb7299cac314570e48f8c Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 7 Jul 2022 12:13:52 -0400 Subject: [PATCH] p2p: simpler priority queue (#8929) --- internal/p2p/channel.go | 4 ++ internal/p2p/pqueue.go | 14 +++-- internal/p2p/router.go | 13 ++++- internal/p2p/rqueue.go | 112 ++++++++++++++++++++++++++++++++++++ internal/p2p/rqueue_test.go | 47 +++++++++++++++ 5 files changed, 183 insertions(+), 7 deletions(-) create mode 100644 internal/p2p/rqueue.go create mode 100644 internal/p2p/rqueue_test.go diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index d6763543a..e33e7faa7 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -19,6 +19,10 @@ type Envelope struct { ChannelID ChannelID } +func (e Envelope) IsZero() bool { + return e.From == "" && e.To == "" && e.Message == nil +} + // Wrapper is a Protobuf message that can contain a variety of inner messages // (e.g. via oneof fields). If a Channel's message type implements Wrapper, the // Router will automatically wrap outbound messages and unwrap inbound messages, diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index 268daa8de..3cd1c897a 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -31,8 +31,16 @@ func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] } func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { - // if both elements have the same priority, prioritize based on most recent + // if both elements have the same priority, prioritize based + // on most recent and largest if pq[i].priority == pq[j].priority { + diff := pq[i].timestamp.Sub(pq[j].timestamp) + if diff < 0 { + diff *= -1 + } + if diff < 10*time.Millisecond { + return pq[i].size > pq[j].size + } return pq[i].timestamp.After(pq[j].timestamp) } @@ -272,12 +280,10 @@ func (s *pqScheduler) process(ctx context.Context) { } func (s *pqScheduler) push(pqEnv *pqEnvelope) { - chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID)) - // enqueue the incoming Envelope heap.Push(s.pq, pqEnv) s.size += pqEnv.size - s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size)) + s.metrics.PeerQueueMsgSize.With("ch_id", strconv.Itoa(int(pqEnv.envelope.ChannelID))).Add(float64(pqEnv.size)) // Update the cumulative sizes by adding the Envelope's size to every // priority less than or equal to it. diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 8b7db9a03..0e55049c1 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -68,8 +68,9 @@ type RouterOptions struct { } const ( - queueTypeFifo = "fifo" - queueTypePriority = "priority" + queueTypeFifo = "fifo" + queueTypePriority = "priority" + queueTypeSimplePriority = "simple-priority" ) // Validate validates router options. @@ -77,7 +78,7 @@ func (o *RouterOptions) Validate() error { switch o.QueueType { case "": o.QueueType = queueTypeFifo - case queueTypeFifo, queueTypePriority: + case queueTypeFifo, queueTypePriority, queueTypeSimplePriority: // pass default: return fmt.Errorf("queue type %q is not supported", o.QueueType) @@ -227,6 +228,9 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error return q }, nil + case queueTypeSimplePriority: + return func(size int) queue { return newSimplePriorityQueue(ctx, size, r.chDescs) }, nil + default: return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType) } @@ -304,6 +308,9 @@ func (r *Router) routeChannel( for { select { case envelope := <-outCh: + if envelope.IsZero() { + continue + } // Mark the envelope with the channel ID to allow sendPeer() to pass // it on to Transport.SendMessage(). envelope.ChannelID = chID diff --git a/internal/p2p/rqueue.go b/internal/p2p/rqueue.go new file mode 100644 index 000000000..8d6406864 --- /dev/null +++ b/internal/p2p/rqueue.go @@ -0,0 +1,112 @@ +package p2p + +import ( + "container/heap" + "context" + "sort" + "time" + + "github.com/gogo/protobuf/proto" +) + +type simpleQueue struct { + input chan Envelope + output chan Envelope + closeFn func() + closeCh <-chan struct{} + + maxSize int + chDescs []*ChannelDescriptor +} + +func newSimplePriorityQueue(ctx context.Context, size int, chDescs []*ChannelDescriptor) *simpleQueue { + if size%2 != 0 { + size++ + } + + ctx, cancel := context.WithCancel(ctx) + q := &simpleQueue{ + input: make(chan Envelope, size*2), + output: make(chan Envelope, size/2), + maxSize: size * size, + closeCh: ctx.Done(), + closeFn: cancel, + } + + go q.run(ctx) + return q +} + +func (q *simpleQueue) enqueue() chan<- Envelope { return q.input } +func (q *simpleQueue) dequeue() <-chan Envelope { return q.output } +func (q *simpleQueue) close() { q.closeFn() } +func (q *simpleQueue) closed() <-chan struct{} { return q.closeCh } + +func (q *simpleQueue) run(ctx context.Context) { + defer q.closeFn() + + var chPriorities = make(map[ChannelID]uint, len(q.chDescs)) + for _, chDesc := range q.chDescs { + chID := chDesc.ID + chPriorities[chID] = uint(chDesc.Priority) + } + + pq := make(priorityQueue, 0, q.maxSize) + heap.Init(&pq) + ticker := time.NewTicker(10 * time.Millisecond) + // must have a buffer of exactly one because both sides of + // this channel are used in this loop, and simply signals adds + // to the heap + signal := make(chan struct{}, 1) + for { + select { + case <-ctx.Done(): + return + case <-q.closeCh: + return + case e := <-q.input: + // enqueue the incoming Envelope + heap.Push(&pq, &pqEnvelope{ + envelope: e, + size: uint(proto.Size(e.Message)), + priority: chPriorities[e.ChannelID], + timestamp: time.Now().UTC(), + }) + + select { + case signal <- struct{}{}: + default: + if len(pq) > q.maxSize { + sort.Sort(pq) + pq = pq[:q.maxSize] + } + } + + case <-ticker.C: + if len(pq) > q.maxSize { + sort.Sort(pq) + pq = pq[:q.maxSize] + } + if len(pq) > 0 { + select { + case signal <- struct{}{}: + default: + } + } + case <-signal: + SEND: + for len(pq) > 0 { + select { + case <-ctx.Done(): + return + case <-q.closeCh: + return + case q.output <- heap.Pop(&pq).(*pqEnvelope).envelope: + continue SEND + default: + break SEND + } + } + } + } +} diff --git a/internal/p2p/rqueue_test.go b/internal/p2p/rqueue_test.go new file mode 100644 index 000000000..43c4066e5 --- /dev/null +++ b/internal/p2p/rqueue_test.go @@ -0,0 +1,47 @@ +package p2p + +import ( + "context" + "testing" + "time" +) + +func TestSimpleQueue(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // set up a small queue with very small buffers so we can + // watch it shed load, then send a bunch of messages to the + // queue, most of which we'll watch it drop. + sq := newSimplePriorityQueue(ctx, 1, nil) + for i := 0; i < 100; i++ { + sq.enqueue() <- Envelope{From: "merlin"} + } + + seen := 0 + +RETRY: + for seen <= 2 { + select { + case e := <-sq.dequeue(): + if e.From != "merlin" { + continue + } + seen++ + case <-time.After(10 * time.Millisecond): + break RETRY + } + } + // if we don't see any messages, then it's just broken. + if seen == 0 { + t.Errorf("seen %d messages, should have seen more than one", seen) + } + // ensure that load shedding happens: there can be at most 3 + // messages that we get out of this, one that was buffered + // plus 2 that were under the cap, everything else gets + // dropped. + if seen > 3 { + t.Errorf("saw %d messages, should have seen 5 or fewer", seen) + } + +}