diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index d5051325f..dc63b319e 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -29,8 +29,16 @@ func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] } func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { - // if both elements have the same priority, prioritize based on most recent + // if both elements have the same priority, prioritize based + // on most recent and largest if pq[i].priority == pq[j].priority { + diff := pq[i].timestamp.Sub(pq[j].timestamp) + if diff < 0 { + diff *= -1 + } + if diff < 10*time.Millisecond { + return pq[i].size > pq[j].size + } return pq[i].timestamp.After(pq[j].timestamp) } @@ -272,12 +280,10 @@ func (s *pqScheduler) process() { } func (s *pqScheduler) push(pqEnv *pqEnvelope) { - chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID)) - // enqueue the incoming Envelope heap.Push(s.pq, pqEnv) s.size += pqEnv.size - s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size)) + s.metrics.PeerQueueMsgSize.With("ch_id", strconv.Itoa(int(pqEnv.envelope.channelID))).Add(float64(pqEnv.size)) // Update the cumulative sizes by adding the Envelope's size to every // priority less than or equal to it. diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 7247642ca..56558a80f 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -40,6 +40,10 @@ type Envelope struct { channelID ChannelID } +func (e Envelope) IsZero() bool { + return e.From == "" && e.To == "" && e.Message == nil +} + // PeerError is a peer error reported via Channel.Error. // // FIXME: This currently just disconnects the peer, which is too simplistic. @@ -166,9 +170,10 @@ type RouterOptions struct { } const ( - queueTypeFifo = "fifo" - queueTypePriority = "priority" - queueTypeWDRR = "wdrr" + queueTypeFifo = "fifo" + queueTypePriority = "priority" + queueTypeWDRR = "wdrr" + queueTypeSimplePriority = "simple-priority" ) // Validate validates router options. @@ -176,8 +181,8 @@ func (o *RouterOptions) Validate() error { switch o.QueueType { case "": o.QueueType = queueTypeFifo - case queueTypeFifo, queueTypeWDRR, queueTypePriority: - // passI me + case queueTypeFifo, queueTypeWDRR, queueTypePriority, queueTypeSimplePriority: + // pass default: return fmt.Errorf("queue type %q is not supported", o.QueueType) } @@ -354,6 +359,9 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { return q }, nil + case queueTypeSimplePriority: + return func(size int) queue { return newSimplePriorityQueue(r.stopCtx(), size, r.chDescs) }, nil + default: return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType) } @@ -420,6 +428,9 @@ func (r *Router) routeChannel( for { select { case envelope := <-outCh: + if envelope.IsZero() { + continue + } // Mark the envelope with the channel ID to allow sendPeer() to pass // it on to Transport.SendMessage(). envelope.channelID = chID diff --git a/internal/p2p/rqueue.go b/internal/p2p/rqueue.go new file mode 100644 index 000000000..02826bfe9 --- /dev/null +++ b/internal/p2p/rqueue.go @@ -0,0 +1,112 @@ +package p2p + +import ( + "container/heap" + "context" + "sort" + "time" + + "github.com/gogo/protobuf/proto" +) + +type simpleQueue struct { + input chan Envelope + output chan Envelope + closeFn func() + closeCh <-chan struct{} + + maxSize int + chDescs []ChannelDescriptor +} + +func newSimplePriorityQueue(ctx context.Context, size int, chDescs []ChannelDescriptor) *simpleQueue { + if size%2 != 0 { + size++ + } + + ctx, cancel := context.WithCancel(ctx) + q := &simpleQueue{ + input: make(chan Envelope, size*2), + output: make(chan Envelope, size/2), + maxSize: size * size, + closeCh: ctx.Done(), + closeFn: cancel, + } + + go q.run(ctx) + return q +} + +func (q *simpleQueue) enqueue() chan<- Envelope { return q.input } +func (q *simpleQueue) dequeue() <-chan Envelope { return q.output } +func (q *simpleQueue) close() { q.closeFn() } +func (q *simpleQueue) closed() <-chan struct{} { return q.closeCh } + +func (q *simpleQueue) run(ctx context.Context) { + defer q.closeFn() + + var chPriorities = make(map[ChannelID]uint, len(q.chDescs)) + for _, chDesc := range q.chDescs { + chID := ChannelID(chDesc.ID) + chPriorities[chID] = uint(chDesc.Priority) + } + + pq := make(priorityQueue, 0, q.maxSize) + heap.Init(&pq) + ticker := time.NewTicker(10 * time.Millisecond) + // must have a buffer of exactly one because both sides of + // this channel are used in this loop, and simply signals adds + // to the heap + signal := make(chan struct{}, 1) + for { + select { + case <-ctx.Done(): + return + case <-q.closeCh: + return + case e := <-q.input: + // enqueue the incoming Envelope + heap.Push(&pq, &pqEnvelope{ + envelope: e, + size: uint(proto.Size(e.Message)), + priority: chPriorities[e.channelID], + timestamp: time.Now().UTC(), + }) + + select { + case signal <- struct{}{}: + default: + if len(pq) > q.maxSize { + sort.Sort(pq) + pq = pq[:q.maxSize] + } + } + + case <-ticker.C: + if len(pq) > q.maxSize { + sort.Sort(pq) + pq = pq[:q.maxSize] + } + if len(pq) > 0 { + select { + case signal <- struct{}{}: + default: + } + } + case <-signal: + SEND: + for len(pq) > 0 { + select { + case <-ctx.Done(): + return + case <-q.closeCh: + return + case q.output <- heap.Pop(&pq).(*pqEnvelope).envelope: + continue SEND + default: + break SEND + } + } + } + } +} diff --git a/internal/p2p/rqueue_test.go b/internal/p2p/rqueue_test.go new file mode 100644 index 000000000..43c4066e5 --- /dev/null +++ b/internal/p2p/rqueue_test.go @@ -0,0 +1,47 @@ +package p2p + +import ( + "context" + "testing" + "time" +) + +func TestSimpleQueue(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // set up a small queue with very small buffers so we can + // watch it shed load, then send a bunch of messages to the + // queue, most of which we'll watch it drop. + sq := newSimplePriorityQueue(ctx, 1, nil) + for i := 0; i < 100; i++ { + sq.enqueue() <- Envelope{From: "merlin"} + } + + seen := 0 + +RETRY: + for seen <= 2 { + select { + case e := <-sq.dequeue(): + if e.From != "merlin" { + continue + } + seen++ + case <-time.After(10 * time.Millisecond): + break RETRY + } + } + // if we don't see any messages, then it's just broken. + if seen == 0 { + t.Errorf("seen %d messages, should have seen more than one", seen) + } + // ensure that load shedding happens: there can be at most 3 + // messages that we get out of this, one that was buffered + // plus 2 that were under the cap, everything else gets + // dropped. + if seen > 3 { + t.Errorf("saw %d messages, should have seen 5 or fewer", seen) + } + +}