Files
tendermint/internal/p2p/pqueue.go
William Banfield ff7b0e638e p2p: fix priority queue bytes pending calculation (#7120)
This metric describes itself as 'pending' but never actual decrements when the messages are removed from the queue.

This change fixes that by decrementing the metric when the data is removed from the queue.
2021-10-13 21:18:44 +00:00

290 lines
7.8 KiB
Go

package p2p
import (
"container/heap"
"sort"
"strconv"
"time"
"github.com/gogo/protobuf/proto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
)
// pqEnvelope defines a wrapper around an Envelope with priority to be inserted
// into a priority queue used for Envelope scheduling.
type pqEnvelope struct {
envelope Envelope
priority uint
size uint
timestamp time.Time
index int
}
// priorityQueue defines a type alias for a priority queue implementation.
type priorityQueue []*pqEnvelope
func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
// if both elements have the same priority, prioritize based on most recent
if pq[i].priority == pq[j].priority {
return pq[i].timestamp.After(pq[j].timestamp)
}
// otherwise, pick the pqEnvelope with the higher priority
return pq[i].priority > pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
pqEnv := x.(*pqEnvelope)
pqEnv.index = n
*pq = append(*pq, pqEnv)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
pqEnv := old[n-1]
old[n-1] = nil
pqEnv.index = -1
*pq = old[:n-1]
return pqEnv
}
// Assert the priority queue scheduler implements the queue interface at
// compile-time.
var _ queue = (*pqScheduler)(nil)
type pqScheduler struct {
logger log.Logger
metrics *Metrics
size uint
sizes map[uint]uint // cumulative priority sizes
pq *priorityQueue
chDescs []ChannelDescriptor
capacity uint
chPriorities map[ChannelID]uint
enqueueCh chan Envelope
dequeueCh chan Envelope
closer *tmsync.Closer
done *tmsync.Closer
}
func newPQScheduler(
logger log.Logger,
m *Metrics,
chDescs []ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *pqScheduler {
// copy each ChannelDescriptor and sort them by ascending channel priority
chDescsCopy := make([]ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
var (
chPriorities = make(map[ChannelID]uint)
sizes = make(map[uint]uint)
)
for _, chDesc := range chDescsCopy {
chID := ChannelID(chDesc.ID)
chPriorities[chID] = uint(chDesc.Priority)
sizes[uint(chDesc.Priority)] = 0
}
pq := make(priorityQueue, 0)
heap.Init(&pq)
return &pqScheduler{
logger: logger.With("router", "scheduler"),
metrics: m,
chDescs: chDescsCopy,
capacity: capacity,
chPriorities: chPriorities,
pq: &pq,
sizes: sizes,
enqueueCh: make(chan Envelope, enqueueBuf),
dequeueCh: make(chan Envelope, dequeueBuf),
closer: tmsync.NewCloser(),
done: tmsync.NewCloser(),
}
}
func (s *pqScheduler) enqueue() chan<- Envelope {
return s.enqueueCh
}
func (s *pqScheduler) dequeue() <-chan Envelope {
return s.dequeueCh
}
func (s *pqScheduler) close() {
s.closer.Close()
<-s.done.Done()
}
func (s *pqScheduler) closed() <-chan struct{} {
return s.closer.Done()
}
// start starts non-blocking process that starts the priority queue scheduler.
func (s *pqScheduler) start() {
go s.process()
}
// process starts a block process where we listen for Envelopes to enqueue. If
// there is sufficient capacity, it will be enqueued into the priority queue,
// otherwise, we attempt to dequeue enough elements from the priority queue to
// make room for the incoming Envelope by dropping lower priority elements. If
// there isn't sufficient capacity at lower priorities for the incoming Envelope,
// it is dropped.
//
// After we attempt to enqueue the incoming Envelope, if the priority queue is
// non-empty, we pop the top Envelope and send it on the dequeueCh.
func (s *pqScheduler) process() {
defer s.done.Close()
for {
select {
case e := <-s.enqueueCh:
chIDStr := strconv.Itoa(int(e.channelID))
pqEnv := &pqEnvelope{
envelope: e,
size: uint(proto.Size(e.Message)),
priority: s.chPriorities[e.channelID],
timestamp: time.Now().UTC(),
}
// enqueue
// Check if we have sufficient capacity to simply enqueue the incoming
// Envelope.
if s.size+pqEnv.size <= s.capacity {
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
// enqueue the incoming Envelope
s.push(pqEnv)
} else {
// There is not sufficient capacity to simply enqueue the incoming
// Envelope. So we have to attempt to make room for it by dropping lower
// priority Envelopes or drop the incoming Envelope otherwise.
// The cumulative size of all enqueue envelopes at the incoming envelope's
// priority or lower.
total := s.sizes[pqEnv.priority]
if total >= pqEnv.size {
// There is room for the incoming Envelope, so we drop as many lower
// priority Envelopes as we need to.
var (
canEnqueue bool
tmpSize = s.size
i = s.pq.Len() - 1
)
// Drop lower priority Envelopes until sufficient capacity exists for
// the incoming Envelope
for i >= 0 && !canEnqueue {
pqEnvTmp := s.pq.get(i)
if pqEnvTmp.priority < pqEnv.priority {
if tmpSize+pqEnv.size <= s.capacity {
canEnqueue = true
} else {
pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID))
s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
s.logger.Debug(
"dropped envelope",
"ch_id", pqEnvTmpChIDStr,
"priority", pqEnvTmp.priority,
"msg_size", pqEnvTmp.size,
"capacity", s.capacity,
)
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnvTmp.envelope.To)).Add(float64(-pqEnvTmp.size))
// dequeue/drop from the priority queue
heap.Remove(s.pq, pqEnvTmp.index)
// update the size tracker
tmpSize -= pqEnvTmp.size
// start from the end again
i = s.pq.Len() - 1
}
} else {
i--
}
}
// enqueue the incoming Envelope
s.push(pqEnv)
} else {
// There is not sufficient capacity to drop lower priority Envelopes,
// so we drop the incoming Envelope.
s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
s.logger.Debug(
"dropped envelope",
"ch_id", chIDStr,
"priority", pqEnv.priority,
"msg_size", pqEnv.size,
"capacity", s.capacity,
)
}
}
// dequeue
for s.pq.Len() > 0 {
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
s.metrics.PeerPendingSendBytes.With(
"peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
case <-s.closer.Done():
return
}
}
case <-s.closer.Done():
return
}
}
}
func (s *pqScheduler) push(pqEnv *pqEnvelope) {
chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
// enqueue the incoming Envelope
heap.Push(s.pq, pqEnv)
s.size += pqEnv.size
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
// Update the cumulative sizes by adding the Envelope's size to every
// priority less than or equal to it.
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
}
}