p2p: simpler priority queue (#8929)

This commit is contained in:
Sam Kleinman
2022-07-07 12:13:52 -04:00
committed by GitHub
parent 27c523dccb
commit d1a16e8ff0
5 changed files with 183 additions and 7 deletions

View File

@@ -19,6 +19,10 @@ type Envelope struct {
ChannelID ChannelID
}
func (e Envelope) IsZero() bool {
return e.From == "" && e.To == "" && e.Message == nil
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,

View File

@@ -31,8 +31,16 @@ func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
// if both elements have the same priority, prioritize based on most recent
// if both elements have the same priority, prioritize based
// on most recent and largest
if pq[i].priority == pq[j].priority {
diff := pq[i].timestamp.Sub(pq[j].timestamp)
if diff < 0 {
diff *= -1
}
if diff < 10*time.Millisecond {
return pq[i].size > pq[j].size
}
return pq[i].timestamp.After(pq[j].timestamp)
}
@@ -272,12 +280,10 @@ func (s *pqScheduler) process(ctx context.Context) {
}
func (s *pqScheduler) push(pqEnv *pqEnvelope) {
chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID))
// enqueue the incoming Envelope
heap.Push(s.pq, pqEnv)
s.size += pqEnv.size
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
s.metrics.PeerQueueMsgSize.With("ch_id", strconv.Itoa(int(pqEnv.envelope.ChannelID))).Add(float64(pqEnv.size))
// Update the cumulative sizes by adding the Envelope's size to every
// priority less than or equal to it.

View File

@@ -68,8 +68,9 @@ type RouterOptions struct {
}
const (
queueTypeFifo = "fifo"
queueTypePriority = "priority"
queueTypeFifo = "fifo"
queueTypePriority = "priority"
queueTypeSimplePriority = "simple-priority"
)
// Validate validates router options.
@@ -77,7 +78,7 @@ func (o *RouterOptions) Validate() error {
switch o.QueueType {
case "":
o.QueueType = queueTypeFifo
case queueTypeFifo, queueTypePriority:
case queueTypeFifo, queueTypePriority, queueTypeSimplePriority:
// pass
default:
return fmt.Errorf("queue type %q is not supported", o.QueueType)
@@ -227,6 +228,9 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error
return q
}, nil
case queueTypeSimplePriority:
return func(size int) queue { return newSimplePriorityQueue(ctx, size, r.chDescs) }, nil
default:
return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType)
}
@@ -304,6 +308,9 @@ func (r *Router) routeChannel(
for {
select {
case envelope := <-outCh:
if envelope.IsZero() {
continue
}
// Mark the envelope with the channel ID to allow sendPeer() to pass
// it on to Transport.SendMessage().
envelope.ChannelID = chID

112
internal/p2p/rqueue.go Normal file
View File

@@ -0,0 +1,112 @@
package p2p
import (
"container/heap"
"context"
"sort"
"time"
"github.com/gogo/protobuf/proto"
)
type simpleQueue struct {
input chan Envelope
output chan Envelope
closeFn func()
closeCh <-chan struct{}
maxSize int
chDescs []*ChannelDescriptor
}
func newSimplePriorityQueue(ctx context.Context, size int, chDescs []*ChannelDescriptor) *simpleQueue {
if size%2 != 0 {
size++
}
ctx, cancel := context.WithCancel(ctx)
q := &simpleQueue{
input: make(chan Envelope, size*2),
output: make(chan Envelope, size/2),
maxSize: size * size,
closeCh: ctx.Done(),
closeFn: cancel,
}
go q.run(ctx)
return q
}
func (q *simpleQueue) enqueue() chan<- Envelope { return q.input }
func (q *simpleQueue) dequeue() <-chan Envelope { return q.output }
func (q *simpleQueue) close() { q.closeFn() }
func (q *simpleQueue) closed() <-chan struct{} { return q.closeCh }
func (q *simpleQueue) run(ctx context.Context) {
defer q.closeFn()
var chPriorities = make(map[ChannelID]uint, len(q.chDescs))
for _, chDesc := range q.chDescs {
chID := chDesc.ID
chPriorities[chID] = uint(chDesc.Priority)
}
pq := make(priorityQueue, 0, q.maxSize)
heap.Init(&pq)
ticker := time.NewTicker(10 * time.Millisecond)
// must have a buffer of exactly one because both sides of
// this channel are used in this loop, and simply signals adds
// to the heap
signal := make(chan struct{}, 1)
for {
select {
case <-ctx.Done():
return
case <-q.closeCh:
return
case e := <-q.input:
// enqueue the incoming Envelope
heap.Push(&pq, &pqEnvelope{
envelope: e,
size: uint(proto.Size(e.Message)),
priority: chPriorities[e.ChannelID],
timestamp: time.Now().UTC(),
})
select {
case signal <- struct{}{}:
default:
if len(pq) > q.maxSize {
sort.Sort(pq)
pq = pq[:q.maxSize]
}
}
case <-ticker.C:
if len(pq) > q.maxSize {
sort.Sort(pq)
pq = pq[:q.maxSize]
}
if len(pq) > 0 {
select {
case signal <- struct{}{}:
default:
}
}
case <-signal:
SEND:
for len(pq) > 0 {
select {
case <-ctx.Done():
return
case <-q.closeCh:
return
case q.output <- heap.Pop(&pq).(*pqEnvelope).envelope:
continue SEND
default:
break SEND
}
}
}
}
}

View File

@@ -0,0 +1,47 @@
package p2p
import (
"context"
"testing"
"time"
)
func TestSimpleQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// set up a small queue with very small buffers so we can
// watch it shed load, then send a bunch of messages to the
// queue, most of which we'll watch it drop.
sq := newSimplePriorityQueue(ctx, 1, nil)
for i := 0; i < 100; i++ {
sq.enqueue() <- Envelope{From: "merlin"}
}
seen := 0
RETRY:
for seen <= 2 {
select {
case e := <-sq.dequeue():
if e.From != "merlin" {
continue
}
seen++
case <-time.After(10 * time.Millisecond):
break RETRY
}
}
// if we don't see any messages, then it's just broken.
if seen == 0 {
t.Errorf("seen %d messages, should have seen more than one", seen)
}
// ensure that load shedding happens: there can be at most 3
// messages that we get out of this, one that was buffered
// plus 2 that were under the cap, everything else gets
// dropped.
if seen > 3 {
t.Errorf("saw %d messages, should have seen 5 or fewer", seen)
}
}