Files
tendermint/p2p/queue.go
Erik Grinaker c61cd3fd05 p2p: add Router prototype (#5831)
Early but functional prototype of the new `p2p.Router`, see its GoDoc comment for details on how it works. Expect much of this logic to change and improve as we evolve the new P2P stack.

There is a simple test that sets up an in-memory network of four routers with reactors and passes messages between them, but otherwise no exhaustive tests since this is very much a work-in-progress.
2021-01-08 15:32:11 +00:00

60 lines
1.5 KiB
Go

package p2p
import "sync"
// queue does QoS scheduling for Envelopes, enqueueing and dequeueing according
// to some policy. Queues are used at contention points, i.e.:
//
// - Receiving inbound messages to a single channel from all peers.
// - Sending outbound messages to a single peer from all channels.
type queue interface {
// enqueue returns a channel for submitting envelopes.
enqueue() chan<- Envelope
// dequeue returns a channel ordered according to some queueing policy.
dequeue() <-chan Envelope
// close closes the queue. After this call enqueue() will block, so the
// caller must select on closed() as well to avoid blocking forever. The
// enqueue() and dequeue() channels will not be closed.
close()
// closed returns a channel that's closed when the scheduler is closed.
closed() <-chan struct{}
}
// fifoQueue is a simple unbuffered lossless queue that passes messages through
// in the order they were received, and blocks until message is received.
type fifoQueue struct {
queueCh chan Envelope
closeCh chan struct{}
closeOnce sync.Once
}
var _ queue = (*fifoQueue)(nil)
func newFIFOQueue() *fifoQueue {
return &fifoQueue{
queueCh: make(chan Envelope),
closeCh: make(chan struct{}),
}
}
func (q *fifoQueue) enqueue() chan<- Envelope {
return q.queueCh
}
func (q *fifoQueue) dequeue() <-chan Envelope {
return q.queueCh
}
func (q *fifoQueue) close() {
q.closeOnce.Do(func() {
close(q.closeCh)
})
}
func (q *fifoQueue) closed() <-chan struct{} {
return q.closeCh
}