mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-10 05:50:19 +00:00
This metric describes itself as 'pending' but never actual decrements when the messages are removed from the queue. This change fixes that by decrementing the metric when the data is removed from the queue.
290 lines
7.8 KiB
Go
290 lines
7.8 KiB
Go
package p2p
|
|
|
|
import (
|
|
"container/heap"
|
|
"sort"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
)
|
|
|
|
// pqEnvelope defines a wrapper around an Envelope with priority to be inserted
|
|
// into a priority queue used for Envelope scheduling.
|
|
type pqEnvelope struct {
|
|
envelope Envelope
|
|
priority uint
|
|
size uint
|
|
timestamp time.Time
|
|
|
|
index int
|
|
}
|
|
|
|
// priorityQueue defines a type alias for a priority queue implementation.
|
|
type priorityQueue []*pqEnvelope
|
|
|
|
func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
|
|
func (pq priorityQueue) Len() int { return len(pq) }
|
|
|
|
func (pq priorityQueue) Less(i, j int) bool {
|
|
// if both elements have the same priority, prioritize based on most recent
|
|
if pq[i].priority == pq[j].priority {
|
|
return pq[i].timestamp.After(pq[j].timestamp)
|
|
}
|
|
|
|
// otherwise, pick the pqEnvelope with the higher priority
|
|
return pq[i].priority > pq[j].priority
|
|
}
|
|
|
|
func (pq priorityQueue) Swap(i, j int) {
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
pq[i].index = i
|
|
pq[j].index = j
|
|
}
|
|
|
|
func (pq *priorityQueue) Push(x interface{}) {
|
|
n := len(*pq)
|
|
pqEnv := x.(*pqEnvelope)
|
|
pqEnv.index = n
|
|
*pq = append(*pq, pqEnv)
|
|
}
|
|
|
|
func (pq *priorityQueue) Pop() interface{} {
|
|
old := *pq
|
|
n := len(old)
|
|
pqEnv := old[n-1]
|
|
old[n-1] = nil
|
|
pqEnv.index = -1
|
|
*pq = old[:n-1]
|
|
return pqEnv
|
|
}
|
|
|
|
// Assert the priority queue scheduler implements the queue interface at
|
|
// compile-time.
|
|
var _ queue = (*pqScheduler)(nil)
|
|
|
|
type pqScheduler struct {
|
|
logger log.Logger
|
|
metrics *Metrics
|
|
size uint
|
|
sizes map[uint]uint // cumulative priority sizes
|
|
pq *priorityQueue
|
|
chDescs []ChannelDescriptor
|
|
capacity uint
|
|
chPriorities map[ChannelID]uint
|
|
|
|
enqueueCh chan Envelope
|
|
dequeueCh chan Envelope
|
|
closer *tmsync.Closer
|
|
done *tmsync.Closer
|
|
}
|
|
|
|
func newPQScheduler(
|
|
logger log.Logger,
|
|
m *Metrics,
|
|
chDescs []ChannelDescriptor,
|
|
enqueueBuf, dequeueBuf, capacity uint,
|
|
) *pqScheduler {
|
|
|
|
// copy each ChannelDescriptor and sort them by ascending channel priority
|
|
chDescsCopy := make([]ChannelDescriptor, len(chDescs))
|
|
copy(chDescsCopy, chDescs)
|
|
sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
|
|
|
|
var (
|
|
chPriorities = make(map[ChannelID]uint)
|
|
sizes = make(map[uint]uint)
|
|
)
|
|
|
|
for _, chDesc := range chDescsCopy {
|
|
chID := ChannelID(chDesc.ID)
|
|
chPriorities[chID] = uint(chDesc.Priority)
|
|
sizes[uint(chDesc.Priority)] = 0
|
|
}
|
|
|
|
pq := make(priorityQueue, 0)
|
|
heap.Init(&pq)
|
|
|
|
return &pqScheduler{
|
|
logger: logger.With("router", "scheduler"),
|
|
metrics: m,
|
|
chDescs: chDescsCopy,
|
|
capacity: capacity,
|
|
chPriorities: chPriorities,
|
|
pq: &pq,
|
|
sizes: sizes,
|
|
enqueueCh: make(chan Envelope, enqueueBuf),
|
|
dequeueCh: make(chan Envelope, dequeueBuf),
|
|
closer: tmsync.NewCloser(),
|
|
done: tmsync.NewCloser(),
|
|
}
|
|
}
|
|
|
|
func (s *pqScheduler) enqueue() chan<- Envelope {
|
|
return s.enqueueCh
|
|
}
|
|
|
|
func (s *pqScheduler) dequeue() <-chan Envelope {
|
|
return s.dequeueCh
|
|
}
|
|
|
|
func (s *pqScheduler) close() {
|
|
s.closer.Close()
|
|
<-s.done.Done()
|
|
}
|
|
|
|
func (s *pqScheduler) closed() <-chan struct{} {
|
|
return s.closer.Done()
|
|
}
|
|
|
|
// start starts non-blocking process that starts the priority queue scheduler.
|
|
func (s *pqScheduler) start() {
|
|
go s.process()
|
|
}
|
|
|
|
// process starts a block process where we listen for Envelopes to enqueue. If
|
|
// there is sufficient capacity, it will be enqueued into the priority queue,
|
|
// otherwise, we attempt to dequeue enough elements from the priority queue to
|
|
// make room for the incoming Envelope by dropping lower priority elements. If
|
|
// there isn't sufficient capacity at lower priorities for the incoming Envelope,
|
|
// it is dropped.
|
|
//
|
|
// After we attempt to enqueue the incoming Envelope, if the priority queue is
|
|
// non-empty, we pop the top Envelope and send it on the dequeueCh.
|
|
func (s *pqScheduler) process() {
|
|
defer s.done.Close()
|
|
|
|
for {
|
|
select {
|
|
case e := <-s.enqueueCh:
|
|
chIDStr := strconv.Itoa(int(e.channelID))
|
|
pqEnv := &pqEnvelope{
|
|
envelope: e,
|
|
size: uint(proto.Size(e.Message)),
|
|
priority: s.chPriorities[e.channelID],
|
|
timestamp: time.Now().UTC(),
|
|
}
|
|
|
|
// enqueue
|
|
|
|
// Check if we have sufficient capacity to simply enqueue the incoming
|
|
// Envelope.
|
|
if s.size+pqEnv.size <= s.capacity {
|
|
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
|
|
// enqueue the incoming Envelope
|
|
s.push(pqEnv)
|
|
} else {
|
|
// There is not sufficient capacity to simply enqueue the incoming
|
|
// Envelope. So we have to attempt to make room for it by dropping lower
|
|
// priority Envelopes or drop the incoming Envelope otherwise.
|
|
|
|
// The cumulative size of all enqueue envelopes at the incoming envelope's
|
|
// priority or lower.
|
|
total := s.sizes[pqEnv.priority]
|
|
|
|
if total >= pqEnv.size {
|
|
// There is room for the incoming Envelope, so we drop as many lower
|
|
// priority Envelopes as we need to.
|
|
var (
|
|
canEnqueue bool
|
|
tmpSize = s.size
|
|
i = s.pq.Len() - 1
|
|
)
|
|
|
|
// Drop lower priority Envelopes until sufficient capacity exists for
|
|
// the incoming Envelope
|
|
for i >= 0 && !canEnqueue {
|
|
pqEnvTmp := s.pq.get(i)
|
|
|
|
if pqEnvTmp.priority < pqEnv.priority {
|
|
if tmpSize+pqEnv.size <= s.capacity {
|
|
canEnqueue = true
|
|
} else {
|
|
pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID))
|
|
s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
|
|
s.logger.Debug(
|
|
"dropped envelope",
|
|
"ch_id", pqEnvTmpChIDStr,
|
|
"priority", pqEnvTmp.priority,
|
|
"msg_size", pqEnvTmp.size,
|
|
"capacity", s.capacity,
|
|
)
|
|
|
|
s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnvTmp.envelope.To)).Add(float64(-pqEnvTmp.size))
|
|
|
|
// dequeue/drop from the priority queue
|
|
heap.Remove(s.pq, pqEnvTmp.index)
|
|
|
|
// update the size tracker
|
|
tmpSize -= pqEnvTmp.size
|
|
|
|
// start from the end again
|
|
i = s.pq.Len() - 1
|
|
}
|
|
} else {
|
|
i--
|
|
}
|
|
}
|
|
|
|
// enqueue the incoming Envelope
|
|
s.push(pqEnv)
|
|
} else {
|
|
// There is not sufficient capacity to drop lower priority Envelopes,
|
|
// so we drop the incoming Envelope.
|
|
s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
|
|
s.logger.Debug(
|
|
"dropped envelope",
|
|
"ch_id", chIDStr,
|
|
"priority", pqEnv.priority,
|
|
"msg_size", pqEnv.size,
|
|
"capacity", s.capacity,
|
|
)
|
|
}
|
|
}
|
|
|
|
// dequeue
|
|
|
|
for s.pq.Len() > 0 {
|
|
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
|
|
s.size -= pqEnv.size
|
|
|
|
// deduct the Envelope size from all the relevant cumulative sizes
|
|
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
|
|
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
|
|
}
|
|
|
|
s.metrics.PeerSendBytesTotal.With(
|
|
"chID", chIDStr,
|
|
"peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
|
|
s.metrics.PeerPendingSendBytes.With(
|
|
"peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
|
|
select {
|
|
case s.dequeueCh <- pqEnv.envelope:
|
|
case <-s.closer.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
case <-s.closer.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *pqScheduler) push(pqEnv *pqEnvelope) {
|
|
chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
|
|
|
|
// enqueue the incoming Envelope
|
|
heap.Push(s.pq, pqEnv)
|
|
s.size += pqEnv.size
|
|
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
|
|
|
|
// Update the cumulative sizes by adding the Envelope's size to every
|
|
// priority less than or equal to it.
|
|
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
|
|
s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
|
|
}
|
|
}
|