mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 15:07:24 +00:00
Rework the internal plumbing of the server. This change does not modify the
exported interfaces or semantics of the package, and all the existing tests
still pass.
The main changes here are to:
- Simplify the interface for subscription indexing with a typed index rather
than a single nested map.
- Ensure orderly shutdown of channels, so that there is no longer a dynamic
race with concurrent publishers & subscribers at shutdown.
- Remove a layer of indirection between publishers and subscribers. This mainly
helps legibility.
- Remove order dependencies between registration and delivery.
- Add documentation comments where they seemed helpful, and clarified the
existing comments where it was practical.
Although performance was not a primary goal of this change, the simplifications
did very slightly reduce memory use and increase throughput on the existing
benchmarks, though the delta is not statistically significant.
BENCHMARK BEFORE AFTER SPEEDUP (%) B/op (B) B/op (A)
Benchmark10Clients-12 5947 5566 6.4 2017 1942
Benchmark100Clients-12 6111 5762 5.7 1992 1910
Benchmark1000Clients-12 6983 6344 9.2 2046 1959
111 lines
3.1 KiB
Go
111 lines
3.1 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/tendermint/tendermint/abci/types"
|
|
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
|
)
|
|
|
|
var (
|
|
// ErrUnsubscribed is returned by Err when a client unsubscribes.
|
|
ErrUnsubscribed = errors.New("client unsubscribed")
|
|
|
|
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
|
|
// fast enough. Note the client's subscription will be terminated.
|
|
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
|
|
)
|
|
|
|
// A Subscription represents a client subscription for a particular query and
|
|
// consists of three things:
|
|
// 1) channel onto which messages and events are published
|
|
// 2) channel which is closed if a client is too slow or choose to unsubscribe
|
|
// 3) err indicating the reason for (2)
|
|
type Subscription struct {
|
|
id string
|
|
out chan Message
|
|
|
|
canceled chan struct{}
|
|
mtx tmsync.RWMutex
|
|
err error
|
|
}
|
|
|
|
// NewSubscription returns a new subscription with the given outCapacity.
|
|
func NewSubscription(outCapacity int) *Subscription {
|
|
return &Subscription{
|
|
id: uuid.NewString(),
|
|
out: make(chan Message, outCapacity),
|
|
canceled: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Out returns a channel onto which messages and events are published.
|
|
// Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
|
|
// receiving a nil message.
|
|
func (s *Subscription) Out() <-chan Message { return s.out }
|
|
|
|
func (s *Subscription) ID() string { return s.id }
|
|
|
|
// Canceled returns a channel that's closed when the subscription is
|
|
// terminated and supposed to be used in a select statement.
|
|
func (s *Subscription) Canceled() <-chan struct{} {
|
|
return s.canceled
|
|
}
|
|
|
|
// Err returns nil if the channel returned by Canceled is not yet closed.
|
|
// If the channel is closed, Err returns a non-nil error explaining why:
|
|
// - ErrUnsubscribed if the subscriber choose to unsubscribe,
|
|
// - ErrOutOfCapacity if the subscriber is not pulling messages fast enough
|
|
// and the channel returned by Out became full,
|
|
// After Err returns a non-nil error, successive calls to Err return the same
|
|
// error.
|
|
func (s *Subscription) Err() error {
|
|
s.mtx.RLock()
|
|
defer s.mtx.RUnlock()
|
|
return s.err
|
|
}
|
|
|
|
func (s *Subscription) cancel(err error) {
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
defer func() {
|
|
perr := recover()
|
|
if err == nil && perr != nil {
|
|
err = fmt.Errorf("problem closing subscription: %v", perr)
|
|
}
|
|
}()
|
|
|
|
if s.err == nil && err != nil {
|
|
s.err = err
|
|
}
|
|
|
|
close(s.canceled)
|
|
}
|
|
|
|
// Message glues data and events together.
|
|
type Message struct {
|
|
subID string
|
|
data interface{}
|
|
events []types.Event
|
|
}
|
|
|
|
func NewMessage(subID string, data interface{}, events []types.Event) Message {
|
|
return Message{
|
|
subID: subID,
|
|
data: data,
|
|
events: events,
|
|
}
|
|
}
|
|
|
|
// SubscriptionID returns the unique identifier for the subscription
|
|
// that produced this message.
|
|
func (msg Message) SubscriptionID() string { return msg.subID }
|
|
|
|
// Data returns an original data published.
|
|
func (msg Message) Data() interface{} { return msg.data }
|
|
|
|
// Events returns events, which matched the client's query.
|
|
func (msg Message) Events() []types.Event { return msg.events }
|