Move the libs/pubsub package to internal scope (#7451)

No API changes, merely changes the import path.
This commit is contained in:
M. J. Fromberger
2021-12-15 07:09:32 -08:00
committed by GitHub
parent f3278e8b68
commit 82738eb016
50 changed files with 55 additions and 54 deletions

View File

@@ -1,34 +0,0 @@
package pubsub_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
func TestExample(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newTestServer(ctx, t, log.TestingLogger())
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: "example-client",
Query: query.MustCompile(`abci.account.name='John'`),
}))
events := []abci.Event{
{
Type: "abci.account",
Attributes: []abci.EventAttribute{{Key: "name", Value: "John"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Tombstone", events))
sub.mustReceive(ctx, "Tombstone")
}

View File

@@ -1,464 +0,0 @@
// Package pubsub implements an event dispatching server with a single publisher
// and multiple subscriber clients. Multiple goroutines can safely publish to a
// single Server instance.
//
// Clients register subscriptions with a query to select which messages they
// wish to receive. When messages are published, they are broadcast to all
// clients whose subscription query matches that message. Queries are
// constructed using the github.com/tendermint/tendermint/libs/pubsub/query
// package.
//
// Example:
//
// q, err := query.New(`account.name='John'`)
// if err != nil {
// return err
// }
// sub, err := pubsub.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
// ClientID: "johns-transactions",
// Query: q,
// })
// if err != nil {
// return err
// }
//
// for {
// next, err := sub.Next(ctx)
// if err == pubsub.ErrTerminated {
// return err // terminated by publisher
// } else if err != nil {
// return err // timed out, client unsubscribed, etc.
// }
// process(next)
// }
//
package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
var (
// ErrSubscriptionNotFound is returned when a client tries to unsubscribe
// from not existing subscription.
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrAlreadySubscribed is returned when a client tries to subscribe twice or
// more using the same query.
ErrAlreadySubscribed = errors.New("already subscribed")
// ErrServerStopped is returned when attempting to publish or subscribe to a
// server that has been stopped.
ErrServerStopped = errors.New("pubsub server is stopped")
)
// Query defines an interface for a query to be used for subscribing. A query
// matches against a map of events. Each key in this map is a composite of the
// even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
// values are the event values that are contained under that relationship. This
// allows event types to repeat themselves with the same set of keys and
// different values.
type Query interface {
Matches(events []types.Event) (bool, error)
String() string
}
// SubscribeArgs are the parameters to create a new subscription.
type SubscribeArgs struct {
ClientID string // Client ID
Query Query // filter query for events (required)
Limit int // subscription queue capacity limit (0 means 1)
Quota int // subscription queue soft quota (0 uses Limit)
}
// UnsubscribeArgs are the parameters to remove a subscription.
// The subscriber ID must be populated, and at least one of the client ID or
// the registered query.
type UnsubscribeArgs struct {
Subscriber string // subscriber ID chosen by the client (required)
ID string // subscription ID (assigned by the server)
Query Query // the query registered with the subscription
}
// Validate returns nil if args are valid to identify a subscription to remove.
// Otherwise, it reports an error.
func (args UnsubscribeArgs) Validate() error {
if args.Subscriber == "" {
return errors.New("must specify a subscriber")
}
if args.ID == "" && args.Query == nil {
return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber)
}
return nil
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without events, and manages internal state.
type Server struct {
service.BaseService
logger log.Logger
queue chan item
done <-chan struct{} // closed when server should exit
pubs sync.RWMutex // excl: shutdown; shared: active publisher
exited chan struct{} // server exited
// All subscriptions currently known.
// Lock exclusive to add, remove, or cancel subscriptions.
// Lock shared to look up or publish to subscriptions.
subs struct {
sync.RWMutex
index *subIndex
// This function is called synchronously with each message published
// before it is delivered to any other subscriber. This allows an index
// to be persisted before any subscribers see the messages.
observe func(Message) error
}
// TODO(creachadair): Rework the options so that this does not need to live
// as a field. It is not otherwise needed.
queueCap int
}
// Option sets a parameter for the server.
type Option func(*Server)
// NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer(logger log.Logger, options ...Option) *Server {
s := &Server{logger: logger}
s.BaseService = *service.NewBaseService(logger, "PubSub", s)
for _, opt := range options {
opt(s)
}
// The queue receives items to be published.
s.queue = make(chan item, s.queueCap)
// The index tracks subscriptions by ID and query terms.
s.subs.index = newSubIndex()
return s
}
// BufferCapacity allows you to specify capacity for publisher's queue. This
// is the number of messages that can be published without blocking. If no
// buffer is specified, publishing is synchronous with delivery. This function
// will panic if cap < 0.
func BufferCapacity(cap int) Option {
if cap < 0 {
panic("negative buffer capacity")
}
return func(s *Server) { s.queueCap = cap }
}
// BufferCapacity returns capacity of the publication queue.
func (s *Server) BufferCapacity() int { return cap(s.queue) }
// Subscribe creates a subscription for the given client ID and query.
// If len(capacities) > 0, its first value is used as the queue capacity.
//
// Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, capacities ...int) (*Subscription, error) {
args := SubscribeArgs{
ClientID: clientID,
Query: query,
Limit: 1,
}
if len(capacities) > 0 {
args.Limit = capacities[0]
if len(capacities) > 1 {
args.Quota = capacities[1]
}
// bounds are checked below
}
return s.SubscribeWithArgs(ctx, args)
}
// Observe registers an observer function that will be called synchronously
// with each published message matching any of the given queries, prior to it
// being forwarded to any subscriber. If no queries are specified, all
// messages will be observed. An error is reported if an observer is already
// registered.
func (s *Server) Observe(ctx context.Context, observe func(Message) error, queries ...Query) error {
s.subs.Lock()
defer s.subs.Unlock()
if observe == nil {
return errors.New("observe callback is nil")
} else if s.subs.observe != nil {
return errors.New("an observer is already registered")
}
// Compile the message filter.
var matches func(Message) bool
if len(queries) == 0 {
matches = func(Message) bool { return true }
} else {
matches = func(msg Message) bool {
for _, q := range queries {
match, err := q.Matches(msg.events)
if err == nil && match {
return true
}
}
return false
}
}
s.subs.observe = func(msg Message) error {
if matches(msg) {
return observe(msg)
}
return nil // nothing to do for this message
}
return nil
}
// SubscribeWithArgs creates a subscription for the given arguments. It is an
// error if the query is nil, a subscription already exists for the specified
// client ID and query, or if the capacity arguments are invalid.
func (s *Server) SubscribeWithArgs(ctx context.Context, args SubscribeArgs) (*Subscription, error) {
if args.Query == nil {
return nil, errors.New("query is nil")
}
s.subs.Lock()
defer s.subs.Unlock()
if s.subs.index == nil {
return nil, ErrServerStopped
} else if s.subs.index.contains(args.ClientID, args.Query.String()) {
return nil, ErrAlreadySubscribed
}
if args.Limit == 0 {
args.Limit = 1
}
sub, err := newSubscription(args.Quota, args.Limit)
if err != nil {
return nil, err
}
s.subs.index.add(&subInfo{
clientID: args.ClientID,
query: args.Query,
subID: sub.id,
sub: sub,
})
return sub, nil
}
// Unsubscribe removes the subscription for the given client and/or query. It
// returns ErrSubscriptionNotFound if no such subscription exists.
func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
if err := args.Validate(); err != nil {
return err
}
s.subs.Lock()
defer s.subs.Unlock()
if s.subs.index == nil {
return ErrServerStopped
}
// TODO(creachadair): Do we need to support unsubscription for an "empty"
// query? I believe that case is not possible by the Query grammar, but we
// should make sure.
//
// Revisit this logic once we are able to remove indexing by query.
var evict subInfoSet
if args.Subscriber != "" {
evict = s.subs.index.findClientID(args.Subscriber)
if args.Query != nil {
evict = evict.withQuery(args.Query.String())
}
} else {
evict = s.subs.index.findQuery(args.Query.String())
}
if len(evict) == 0 {
return ErrSubscriptionNotFound
}
s.removeSubs(evict, ErrUnsubscribed)
return nil
}
// UnsubscribeAll removes all subscriptions for the given client ID.
// It returns ErrSubscriptionNotFound if no subscriptions exist for that client.
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
s.subs.Lock()
defer s.subs.Unlock()
evict := s.subs.index.findClientID(clientID)
if len(evict) == 0 {
return ErrSubscriptionNotFound
}
s.removeSubs(evict, ErrUnsubscribed)
return nil
}
// NumClients returns the number of clients.
func (s *Server) NumClients() int {
s.subs.RLock()
defer s.subs.RUnlock()
return len(s.subs.index.byClient)
}
// NumClientSubscriptions returns the number of subscriptions the client has.
func (s *Server) NumClientSubscriptions(clientID string) int {
s.subs.RLock()
defer s.subs.RUnlock()
return len(s.subs.index.findClientID(clientID))
}
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.publish(ctx, msg, []types.Event{})
}
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error {
return s.publish(ctx, msg, events)
}
// OnStop implements part of the Service interface. It is a no-op.
func (s *Server) OnStop() {}
// Wait implements Service.Wait by blocking until the server has exited, then
// yielding to the base service wait.
func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil }
// OnReset implements Service.OnReset. It has no effect for this service.
func (s *Server) OnReset() error { return nil }
func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error {
s.pubs.RLock()
defer s.pubs.RUnlock()
select {
case <-s.done:
return ErrServerStopped
case <-ctx.Done():
return ctx.Err()
case s.queue <- item{
Data: data,
Events: events,
}:
return nil
}
}
func (s *Server) run(ctx context.Context) {
// The server runs until ctx is canceled.
s.done = ctx.Done()
queue := s.queue
// Shutdown monitor: When the context ends, wait for any active publish
// calls to exit, then close the queue to signal the sender to exit.
go func() {
<-ctx.Done()
s.pubs.Lock()
defer s.pubs.Unlock()
close(s.queue)
s.queue = nil
}()
s.exited = make(chan struct{})
go func() {
defer close(s.exited)
// Sender: Service the queue and forward messages to subscribers.
for it := range queue {
if err := s.send(it.Data, it.Events); err != nil {
s.logger.Error("Error sending event", "err", err)
}
}
// Terminate all subscribers before exit.
s.subs.Lock()
defer s.subs.Unlock()
for si := range s.subs.index.all {
si.sub.stop(ErrTerminated)
}
s.subs.index = nil
}()
}
// removeSubs cancels and removes all the subscriptions in evict with the given
// error. The caller must hold the s.subs lock.
func (s *Server) removeSubs(evict subInfoSet, reason error) {
for si := range evict {
si.sub.stop(reason)
}
s.subs.index.removeAll(evict)
}
// send delivers the given message to all matching subscribers. An error in
// query matching stops transmission and is returned.
func (s *Server) send(data interface{}, events []types.Event) error {
// At exit, evict any subscriptions that were too slow.
evict := make(subInfoSet)
defer func() {
if len(evict) != 0 {
s.subs.Lock()
defer s.subs.Unlock()
s.removeSubs(evict, ErrTerminated)
}
}()
// N.B. Order is important here. We must acquire and defer the lock release
// AFTER deferring the eviction cleanup: The cleanup must happen after the
// reader lock has released, or it will deadlock.
s.subs.RLock()
defer s.subs.RUnlock()
// If an observer is defined, give it control of the message before
// attempting to deliver it to any matching subscribers. If the observer
// fails, the message will not be forwarded.
if s.subs.observe != nil {
err := s.subs.observe(Message{
data: data,
events: events,
})
if err != nil {
return fmt.Errorf("observer failed on message: %w", err)
}
}
for si := range s.subs.index.all {
match, err := si.query.Matches(events)
if err != nil {
return fmt.Errorf("match failed against query: %w", err)
// TODO(creachadair): Should we evict this subscription?
} else if !match {
continue
}
// Publish the events to the subscriber's queue. If this fails, e.g.,
// because the queue is over capacity or out of quota, evict the
// subscription from the index.
if err := si.sub.publish(Message{
subID: si.sub.id,
data: data,
events: events,
}); err != nil {
evict.add(si)
}
}
return nil
}

View File

@@ -1,467 +0,0 @@
package pubsub_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
const (
clientID = "test-client"
)
func TestSubscribeWithArgs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
t.Run("DefaultLimit", func(t *testing.T) {
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.All,
}))
require.Equal(t, 1, s.NumClients())
require.Equal(t, 1, s.NumClientSubscriptions(clientID))
require.NoError(t, s.Publish(ctx, "Ka-Zar"))
sub.mustReceive(ctx, "Ka-Zar")
})
t.Run("PositiveLimit", func(t *testing.T) {
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID + "-2",
Query: query.All,
Limit: 10,
}))
require.NoError(t, s.Publish(ctx, "Aggamon"))
sub.mustReceive(ctx, "Aggamon")
})
}
func TestObserver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
done := make(chan struct{})
var got interface{}
require.NoError(t, s.Observe(ctx, func(msg pubsub.Message) error {
defer close(done)
got = msg.Data()
return nil
}))
const input = "Lions and tigers and bears, oh my!"
require.NoError(t, s.Publish(ctx, input))
<-done
require.Equal(t, got, input)
}
func TestObserverErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
require.Error(t, s.Observe(ctx, nil, query.All))
require.NoError(t, s.Observe(ctx, func(pubsub.Message) error { return nil }))
require.Error(t, s.Observe(ctx, func(pubsub.Message) error { return nil }, query.All))
}
func TestPublishDoesNotBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.All,
}))
published := make(chan struct{})
go func() {
defer close(published)
require.NoError(t, s.Publish(ctx, "Quicksilver"))
require.NoError(t, s.Publish(ctx, "Asylum"))
require.NoError(t, s.Publish(ctx, "Ivan"))
}()
select {
case <-published:
sub.mustReceive(ctx, "Quicksilver")
sub.mustFail(ctx, pubsub.ErrTerminated)
case <-time.After(3 * time.Second):
t.Fatal("Publishing should not have blocked")
}
}
func TestSubscribeErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
t.Run("EmptyQueryErr", func(t *testing.T) {
_, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ClientID: clientID})
require.Error(t, err)
})
t.Run("NegativeLimitErr", func(t *testing.T) {
_, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.All,
Limit: -5,
})
require.Error(t, err)
})
}
func TestSlowSubscriber(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.All,
}))
require.NoError(t, s.Publish(ctx, "Fat Cobra"))
require.NoError(t, s.Publish(ctx, "Viper"))
require.NoError(t, s.Publish(ctx, "Black Panther"))
// We had capacity for one item, so we should get that item, but after that
// the subscription should have been terminated by the publisher.
sub.mustReceive(ctx, "Fat Cobra")
sub.mustFail(ctx, pubsub.ErrTerminated)
}
func TestDifferentClients(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: "client-1",
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}))
events := []abci.Event{{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
}}
require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events))
sub1.mustReceive(ctx, "Iceman")
sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: "client-2",
Query: query.MustCompile(`tm.events.type='NewBlock' AND abci.account.name='Igor'`),
}))
events = []abci.Event{
{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
},
{
Type: "abci.account",
Attributes: []abci.EventAttribute{{Key: "name", Value: "Igor"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Ultimo", events))
sub1.mustReceive(ctx, "Ultimo")
sub2.mustReceive(ctx, "Ultimo")
sub3 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: "client-3",
Query: query.MustCompile(
`tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10`),
}))
events = []abci.Event{{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}},
}}
require.NoError(t, s.PublishWithEvents(ctx, "Valeria Richards", events))
sub3.mustTimeOut(ctx, 100*time.Millisecond)
}
func TestSubscribeDuplicateKeys(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
testCases := []struct {
query string
expected interface{}
}{
{`withdraw.rewards='17'`, "Iceman"},
{`withdraw.rewards='22'`, "Iceman"},
{`withdraw.rewards='1' AND withdraw.rewards='22'`, "Iceman"},
{`withdraw.rewards='100'`, nil},
}
for i, tc := range testCases {
id := fmt.Sprintf("client-%d", i)
q := query.MustCompile(tc.query)
t.Run(id, func(t *testing.T) {
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: id,
Query: q,
}))
events := []abci.Event{
{
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "sender", Value: "foo"},
{Key: "sender", Value: "bar"},
{Key: "sender", Value: "baz"},
},
},
{
Type: "withdraw",
Attributes: []abci.EventAttribute{
{Key: "rewards", Value: "1"},
{Key: "rewards", Value: "17"},
{Key: "rewards", Value: "22"},
},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events))
if tc.expected != nil {
sub.mustReceive(ctx, tc.expected)
} else {
sub.mustTimeOut(ctx, 100*time.Millisecond)
}
})
}
}
func TestClientSubscribesTwice(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
q := query.MustCompile(`tm.events.type='NewBlock'`)
events := []abci.Event{{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
}}
sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: q,
}))
require.NoError(t, s.PublishWithEvents(ctx, "Goblin Queen", events))
sub1.mustReceive(ctx, "Goblin Queen")
// Subscribing a second time with the same client ID and query fails.
{
sub2, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: q,
})
require.Error(t, err)
require.Nil(t, sub2)
}
// The attempt to re-subscribe does not disrupt the existing sub.
require.NoError(t, s.PublishWithEvents(ctx, "Spider-Man", events))
sub1.mustReceive(ctx, "Spider-Man")
}
func TestUnsubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}))
// Removing the subscription we just made should succeed.
require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}))
// Publishing should still work.
require.NoError(t, s.Publish(ctx, "Nick Fury"))
// The unsubscribed subscriber should report as such.
sub.mustFail(ctx, pubsub.ErrUnsubscribed)
}
func TestClientUnsubscribesTwice(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}))
require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}))
require.ErrorIs(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}), pubsub.ErrSubscriptionNotFound)
require.ErrorIs(t, s.UnsubscribeAll(ctx, clientID), pubsub.ErrSubscriptionNotFound)
}
func TestResubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
args := pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.All,
}
newTestSub(t).must(s.SubscribeWithArgs(ctx, args))
require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.All,
}))
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args))
require.NoError(t, s.Publish(ctx, "Cable"))
sub.mustReceive(ctx, "Cable")
}
func TestUnsubscribeAll(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
s := newTestServer(ctx, t, logger)
sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.MustCompile(`tm.events.type='NewBlock'`),
}))
sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: clientID,
Query: query.MustCompile(`tm.events.type='NewBlockHeader'`),
}))
require.NoError(t, s.UnsubscribeAll(ctx, clientID))
require.NoError(t, s.Publish(ctx, "Nick Fury"))
sub1.mustFail(ctx, pubsub.ErrUnsubscribed)
sub2.mustFail(ctx, pubsub.ErrUnsubscribed)
}
func TestBufferCapacity(t *testing.T) {
logger := log.TestingLogger()
s := pubsub.NewServer(logger, pubsub.BufferCapacity(2))
require.Equal(t, 2, s.BufferCapacity())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, s.Publish(ctx, "Nighthawk"))
require.NoError(t, s.Publish(ctx, "Sage"))
ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
require.ErrorIs(t, s.Publish(ctx, "Ironclad"), context.DeadlineExceeded)
}
func newTestServer(ctx context.Context, t testing.TB, logger log.Logger) *pubsub.Server {
t.Helper()
s := pubsub.NewServer(logger)
require.NoError(t, s.Start(ctx))
t.Cleanup(s.Wait)
return s
}
type testSub struct {
t testing.TB
*pubsub.Subscription
}
func newTestSub(t testing.TB) *testSub { return &testSub{t: t} }
func (s *testSub) must(sub *pubsub.Subscription, err error) *testSub {
s.t.Helper()
require.NoError(s.t, err)
require.NotNil(s.t, sub)
s.Subscription = sub
return s
}
func (s *testSub) mustReceive(ctx context.Context, want interface{}) {
s.t.Helper()
got, err := s.Next(ctx)
require.NoError(s.t, err)
require.Equal(s.t, want, got.Data())
}
func (s *testSub) mustTimeOut(ctx context.Context, dur time.Duration) {
s.t.Helper()
tctx, cancel := context.WithTimeout(ctx, dur)
defer cancel()
got, err := s.Next(tctx)
if !errors.Is(err, context.DeadlineExceeded) {
s.t.Errorf("Next: got (%+v, %v), want %v", got, err, context.DeadlineExceeded)
}
}
func (s *testSub) mustFail(ctx context.Context, want error) {
s.t.Helper()
got, err := s.Next(ctx)
if err == nil && want != nil {
s.t.Fatalf("Next: got (%+v, %v), want error %v", got, err, want)
}
require.ErrorIs(s.t, err, want)
}

View File

@@ -1,58 +0,0 @@
package query_test
import (
"testing"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
const testQuery = `tm.events.type='NewBlock' AND abci.account.name='Igor'`
var testEvents = []types.Event{
{
Type: "tm.events",
Attributes: []types.EventAttribute{{
Key: "index",
Value: "25",
}, {
Key: "type",
Value: "NewBlock",
}},
},
{
Type: "abci.account",
Attributes: []types.EventAttribute{{
Key: "name",
Value: "Anya",
}, {
Key: "name",
Value: "Igor",
}},
},
}
func BenchmarkParseCustom(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := query.New(testQuery)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkMatchCustom(b *testing.B) {
q, err := query.New(testQuery)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ok, err := q.Matches(testEvents)
if err != nil {
b.Fatal(err)
} else if !ok {
b.Error("no match")
}
}
}

View File

@@ -1,327 +0,0 @@
// Package query implements the custom query format used to filter event
// subscriptions in Tendermint.
//
// Query expressions describe properties of events and their attributes, using
// strings like:
//
// abci.invoice.number = 22 AND abci.invoice.owner = 'Ivan'
//
// Query expressions can handle attribute values encoding numbers, strings,
// dates, and timestamps. The complete query grammar is described in the
// query/syntax package.
//
package query
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
// All is a query that matches all events.
var All *Query
// A Query is the compiled form of a query.
type Query struct {
ast syntax.Query
conds []condition
}
// New parses and compiles the query expression into an executable query.
func New(query string) (*Query, error) {
ast, err := syntax.Parse(query)
if err != nil {
return nil, err
}
return Compile(ast)
}
// MustCompile compiles the query expression into an executable query.
// In case of error, MustCompile will panic.
//
// This is intended for use in program initialization; use query.New if you
// need to check errors.
func MustCompile(query string) *Query {
q, err := New(query)
if err != nil {
panic(err)
}
return q
}
// Compile compiles the given query AST so it can be used to match events.
func Compile(ast syntax.Query) (*Query, error) {
conds := make([]condition, len(ast))
for i, q := range ast {
cond, err := compileCondition(q)
if err != nil {
return nil, fmt.Errorf("compile %s: %w", q, err)
}
conds[i] = cond
}
return &Query{ast: ast, conds: conds}, nil
}
// Matches satisfies part of the pubsub.Query interface. This implementation
// never reports an error. A nil *Query matches all events.
func (q *Query) Matches(events []types.Event) (bool, error) {
if q == nil {
return true, nil
}
return q.matchesEvents(events), nil
}
// String matches part of the pubsub.Query interface.
func (q *Query) String() string {
if q == nil {
return "<empty>"
}
return q.ast.String()
}
// Syntax returns the syntax tree representation of q.
func (q *Query) Syntax() syntax.Query {
if q == nil {
return nil
}
return q.ast
}
// matchesEvents reports whether all the conditions match the given events.
func (q *Query) matchesEvents(events []types.Event) bool {
for _, cond := range q.conds {
if !cond.matchesAny(events) {
return false
}
}
return len(events) != 0
}
// A condition is a compiled match condition. A condition matches an event if
// the event has the designated type, contains an attribute with the given
// name, and the match function returns true for the attribute value.
type condition struct {
tag string // e.g., "tx.hash"
match func(s string) bool
}
// findAttr returns a slice of attribute values from event matching the
// condition tag, and reports whether the event type strictly equals the
// condition tag.
func (c condition) findAttr(event types.Event) ([]string, bool) {
if !strings.HasPrefix(c.tag, event.Type) {
return nil, false // type does not match tag
} else if len(c.tag) == len(event.Type) {
return nil, true // type == tag
}
var vals []string
for _, attr := range event.Attributes {
fullName := event.Type + "." + attr.Key
if fullName == c.tag {
vals = append(vals, attr.Value)
}
}
return vals, false
}
// matchesAny reports whether c matches at least one of the given events.
func (c condition) matchesAny(events []types.Event) bool {
for _, event := range events {
if c.matchesEvent(event) {
return true
}
}
return false
}
// matchesEvent reports whether c matches the given event.
func (c condition) matchesEvent(event types.Event) bool {
vs, tagEqualsType := c.findAttr(event)
if len(vs) == 0 {
// As a special case, a condition tag that exactly matches the event type
// is matched against an empty string. This allows existence checks to
// work for type-only queries.
if tagEqualsType {
return c.match("")
}
return false
}
// At this point, we have candidate values.
for _, v := range vs {
if c.match(v) {
return true
}
}
return false
}
func compileCondition(cond syntax.Condition) (condition, error) {
out := condition{tag: cond.Tag}
// Handle existence checks separately to simplify the logic below for
// comparisons that take arguments.
if cond.Op == syntax.TExists {
out.match = func(string) bool { return true }
return out, nil
}
// All the other operators require an argument.
if cond.Arg == nil {
return condition{}, fmt.Errorf("missing argument for %v", cond.Op)
}
// Precompile the argument value matcher.
argType := cond.Arg.Type
var argValue interface{}
switch argType {
case syntax.TString:
argValue = cond.Arg.Value()
case syntax.TNumber:
argValue = cond.Arg.Number()
case syntax.TTime, syntax.TDate:
argValue = cond.Arg.Time()
default:
return condition{}, fmt.Errorf("unknown argument type %v", argType)
}
mcons := opTypeMap[cond.Op][argType]
if mcons == nil {
return condition{}, fmt.Errorf("invalid op/arg combination (%v, %v)", cond.Op, argType)
}
out.match = mcons(argValue)
return out, nil
}
// TODO(creachadair): The existing implementation allows anything number shaped
// to be treated as a number. This preserves the parts of that behavior we had
// tests for, but we should probably get rid of that.
var extractNum = regexp.MustCompile(`^\d+(\.\d+)?`)
func parseNumber(s string) (float64, error) {
return strconv.ParseFloat(extractNum.FindString(s), 64)
}
// A map of operator ⇒ argtype ⇒ match-constructor.
// An entry does not exist if the combination is not valid.
//
// Disable the dupl lint for this map. The result isn't even correct.
//nolint:dupl
var opTypeMap = map[syntax.Token]map[syntax.Token]func(interface{}) func(string) bool{
syntax.TContains: {
syntax.TString: func(v interface{}) func(string) bool {
return func(s string) bool {
return strings.Contains(s, v.(string))
}
},
},
syntax.TEq: {
syntax.TString: func(v interface{}) func(string) bool {
return func(s string) bool { return s == v.(string) }
},
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w == v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && ts.Equal(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && ts.Equal(v.(time.Time))
}
},
},
syntax.TLt: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w < v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && ts.Before(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && ts.Before(v.(time.Time))
}
},
},
syntax.TLeq: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w <= v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && !ts.After(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && !ts.After(v.(time.Time))
}
},
},
syntax.TGt: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w > v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && ts.After(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && ts.After(v.(time.Time))
}
},
},
syntax.TGeq: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w >= v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && !ts.Before(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && !ts.Before(v.(time.Time))
}
},
},
}

View File

@@ -1,275 +0,0 @@
package query_test
import (
"fmt"
"strings"
"testing"
"time"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
var _ pubsub.Query = (*query.Query)(nil)
// Example events from the OpenAPI documentation:
// https://github.com/tendermint/tendermint/blob/master/rpc/openapi/openapi.yaml
//
// Redactions:
//
// - Add an explicit "tm" event for the built-in attributes.
// - Remove Index fields (not relevant to tests).
// - Add explicit balance values (to use in tests).
//
var apiEvents = []types.Event{
{
Type: "tm",
Attributes: []types.EventAttribute{
{Key: "event", Value: "Tx"},
{Key: "hash", Value: "XYZ"},
{Key: "height", Value: "5"},
},
},
{
Type: "rewards.withdraw",
Attributes: []types.EventAttribute{
{Key: "address", Value: "AddrA"},
{Key: "source", Value: "SrcX"},
{Key: "amount", Value: "100"},
{Key: "balance", Value: "1500"},
},
},
{
Type: "rewards.withdraw",
Attributes: []types.EventAttribute{
{Key: "address", Value: "AddrB"},
{Key: "source", Value: "SrcY"},
{Key: "amount", Value: "45"},
{Key: "balance", Value: "999"},
},
},
{
Type: "transfer",
Attributes: []types.EventAttribute{
{Key: "sender", Value: "AddrC"},
{Key: "recipient", Value: "AddrD"},
{Key: "amount", Value: "160"},
},
},
}
func TestCompiledMatches(t *testing.T) {
var (
txDate = "2017-01-01"
txTime = "2018-05-03T14:45:00Z"
)
testCases := []struct {
s string
events []types.Event
matches bool
}{
{`tm.events.type='NewBlock'`,
newTestEvents(`tm|events.type=NewBlock`),
true},
{`tx.gas > 7`,
newTestEvents(`tx|gas=8`),
true},
{`transfer.amount > 7`,
newTestEvents(`transfer|amount=8stake`),
true},
{`transfer.amount > 7`,
newTestEvents(`transfer|amount=8.045`),
true},
{`transfer.amount > 7.043`,
newTestEvents(`transfer|amount=8.045stake`),
true},
{`transfer.amount > 8.045`,
newTestEvents(`transfer|amount=8.045stake`),
false},
{`tx.gas > 7 AND tx.gas < 9`,
newTestEvents(`tx|gas=8`),
true},
{`body.weight >= 3.5`,
newTestEvents(`body|weight=3.5`),
true},
{`account.balance < 1000.0`,
newTestEvents(`account|balance=900`),
true},
{`apples.kg <= 4`,
newTestEvents(`apples|kg=4.0`),
true},
{`body.weight >= 4.5`,
newTestEvents(`body|weight=4.5`),
true},
{`oranges.kg < 4 AND watermellons.kg > 10`,
newTestEvents(`oranges|kg=3`, `watermellons|kg=12`),
true},
{`peaches.kg < 4`,
newTestEvents(`peaches|kg=5`),
false},
{`tx.date > DATE 2017-01-01`,
newTestEvents(`tx|date=` + time.Now().Format(syntax.DateFormat)),
true},
{`tx.date = DATE 2017-01-01`,
newTestEvents(`tx|date=` + txDate),
true},
{`tx.date = DATE 2018-01-01`,
newTestEvents(`tx|date=` + txDate),
false},
{`tx.time >= TIME 2013-05-03T14:45:00Z`,
newTestEvents(`tx|time=` + time.Now().Format(syntax.TimeFormat)),
true},
{`tx.time = TIME 2013-05-03T14:45:00Z`,
newTestEvents(`tx|time=` + txTime),
false},
{`abci.owner.name CONTAINS 'Igor'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name CONTAINS 'Igor'`,
newTestEvents(`abci|owner.name=Pavel|owner.name=Ivan`),
false},
{`abci.owner.name = 'Igor'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name = 'Ivan'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name = 'Ivan' AND abci.owner.name = 'John'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
false},
{`tm.events.type='NewBlock'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
true},
{`app.name = 'fuzzed'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
true},
{`tm.events.type='NewBlock' AND app.name = 'fuzzed'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
true},
{`tm.events.type='NewHeader' AND app.name = 'fuzzed'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
false},
{`slash EXISTS`,
newTestEvents(`slash|reason=missing_signature|power=6000`),
true},
{`slash EXISTS`,
newTestEvents(`transfer|recipient=cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz|sender=cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5`),
false},
{`slash.reason EXISTS AND slash.power > 1000`,
newTestEvents(`slash|reason=missing_signature|power=6000`),
true},
{`slash.reason EXISTS AND slash.power > 1000`,
newTestEvents(`slash|reason=missing_signature|power=500`),
false},
{`slash.reason EXISTS`,
newTestEvents(`transfer|recipient=cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz|sender=cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5`),
false},
// Test cases based on the OpenAPI examples.
{`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA'`,
apiEvents, true},
{`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA' AND rewards.withdraw.source = 'SrcY'`,
apiEvents, true},
{`tm.event = 'Tx' AND transfer.sender = 'AddrA'`,
apiEvents, false},
{`tm.event = 'Tx' AND transfer.sender = 'AddrC'`,
apiEvents, true},
{`tm.event = 'Tx' AND transfer.sender = 'AddrZ'`,
apiEvents, false},
{`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrZ'`,
apiEvents, false},
{`tm.event = 'Tx' AND rewards.withdraw.source = 'W'`,
apiEvents, false},
}
// NOTE: The original implementation allowed arbitrary prefix matches on
// attribute tags, e.g., "sl" would match "slash".
//
// That is weird and probably wrong: "foo.ba" should not match "foo.bar",
// or there is no way to distinguish the case where there were two values
// for "foo.bar" or one value each for "foo.ba" and "foo.bar".
//
// Apart from a single test case, I could not find any attested usage of
// this implementation detail. It isn't documented in the OpenAPI docs and
// is not shown in any of the example inputs.
//
// On that basis, I removed that test case. This implementation still does
// correctly handle variable type/attribute splits ("x", "y.z" / "x.y", "z")
// since that was required by the original "flattened" event representation.
for i, tc := range testCases {
t.Run(fmt.Sprintf("%02d", i+1), func(t *testing.T) {
c, err := query.New(tc.s)
if err != nil {
t.Fatalf("NewCompiled %#q: unexpected error: %v", tc.s, err)
}
got, err := c.Matches(tc.events)
if err != nil {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v",
tc.s, tc.events, err)
}
if got != tc.matches {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
tc.s, tc.events, got, tc.matches)
}
})
}
}
func TestAllMatchesAll(t *testing.T) {
events := newTestEvents(
``,
`Asher|Roth=`,
`Route|66=`,
`Rilly|Blue=`,
)
for i := 0; i < len(events); i++ {
match, err := query.All.Matches(events[:i])
if err != nil {
t.Errorf("Matches failed: %v", err)
} else if !match {
t.Errorf("Did not match on %+v ", events[:i])
}
}
}
// newTestEvent constructs an Event message from a template string.
// The format is "type|attr1=val1|attr2=val2|...".
func newTestEvent(s string) types.Event {
var event types.Event
parts := strings.Split(s, "|")
event.Type = parts[0]
if len(parts) == 1 {
return event // type only, no attributes
}
for _, kv := range parts[1:] {
key, val := splitKV(kv)
event.Attributes = append(event.Attributes, types.EventAttribute{
Key: key,
Value: val,
})
}
return event
}
// newTestEvents constructs a slice of Event messages by applying newTestEvent
// to each element of ss.
func newTestEvents(ss ...string) []types.Event {
events := make([]types.Event, len(ss))
for i, s := range ss {
events[i] = newTestEvent(s)
}
return events
}
func splitKV(s string) (key, value string) {
kv := strings.SplitN(s, "=", 2)
return kv[0], kv[1]
}

View File

@@ -1,34 +0,0 @@
// Package syntax defines a scanner and parser for the Tendermint event filter
// query language. A query selects events by their types and attribute values.
//
// Grammar
//
// The grammar of the query language is defined by the following EBNF:
//
// query = conditions EOF
// conditions = condition {"AND" condition}
// condition = tag comparison
// comparison = equal / order / contains / "EXISTS"
// equal = "=" (date / number / time / value)
// order = cmp (date / number / time)
// contains = "CONTAINS" value
// cmp = "<" / "<=" / ">" / ">="
//
// The lexical terms are defined here using RE2 regular expression notation:
//
// // The name of an event attribute (type.value)
// tag = #'\w+(\.\w+)*'
//
// // A datestamp (YYYY-MM-DD)
// date = #'DATE \d{4}-\d{2}-\d{2}'
//
// // A number with optional fractional parts (0, 10, 3.25)
// number = #'\d+(\.\d+)?'
//
// // An RFC3339 timestamp (2021-11-23T22:04:19-09:00)
// time = #'TIME \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}([-+]\d{2}:\d{2}|Z)'
//
// // A quoted literal string value ('a b c')
// value = #'\'[^\']*\''
//
package syntax

View File

@@ -1,213 +0,0 @@
package syntax
import (
"fmt"
"io"
"math"
"strconv"
"strings"
"time"
)
// Parse parses the specified query string. It is shorthand for constructing a
// parser for s and calling its Parse method.
func Parse(s string) (Query, error) {
return NewParser(strings.NewReader(s)).Parse()
}
// Query is the root of the parse tree for a query. A query is the conjunction
// of one or more conditions.
type Query []Condition
func (q Query) String() string {
ss := make([]string, len(q))
for i, cond := range q {
ss[i] = cond.String()
}
return strings.Join(ss, " AND ")
}
// A Condition is a single conditional expression, consisting of a tag, a
// comparison operator, and an optional argument. The type of the argument
// depends on the operator.
type Condition struct {
Tag string
Op Token
Arg *Arg
opText string
}
func (c Condition) String() string {
s := c.Tag + " " + c.opText
if c.Arg != nil {
return s + " " + c.Arg.String()
}
return s
}
// An Arg is the argument of a comparison operator.
type Arg struct {
Type Token
text string
}
func (a *Arg) String() string {
if a == nil {
return ""
}
switch a.Type {
case TString:
return "'" + a.text + "'"
case TTime:
return "TIME " + a.text
case TDate:
return "DATE " + a.text
default:
return a.text
}
}
// Number returns the value of the argument text as a number, or a NaN if the
// text does not encode a valid number value.
func (a *Arg) Number() float64 {
if a == nil {
return -1
}
v, err := strconv.ParseFloat(a.text, 64)
if err == nil && v >= 0 {
return v
}
return math.NaN()
}
// Time returns the value of the argument text as a time, or the zero value if
// the text does not encode a timestamp or datestamp.
func (a *Arg) Time() time.Time {
var ts time.Time
if a == nil {
return ts
}
var err error
switch a.Type {
case TDate:
ts, err = ParseDate(a.text)
case TTime:
ts, err = ParseTime(a.text)
}
if err == nil {
return ts
}
return time.Time{}
}
// Value returns the value of the argument text as a string, or "".
func (a *Arg) Value() string {
if a == nil {
return ""
}
return a.text
}
// Parser is a query expression parser. The grammar for query expressions is
// defined in the syntax package documentation.
type Parser struct {
scanner *Scanner
}
// NewParser constructs a new parser that reads the input from r.
func NewParser(r io.Reader) *Parser {
return &Parser{scanner: NewScanner(r)}
}
// Parse parses the complete input and returns the resulting query.
func (p *Parser) Parse() (Query, error) {
cond, err := p.parseCond()
if err != nil {
return nil, err
}
conds := []Condition{cond}
for p.scanner.Next() != io.EOF {
if tok := p.scanner.Token(); tok != TAnd {
return nil, fmt.Errorf("offset %d: got %v, want %v", p.scanner.Pos(), tok, TAnd)
}
cond, err := p.parseCond()
if err != nil {
return nil, err
}
conds = append(conds, cond)
}
return conds, nil
}
// parseCond parses a conditional expression: tag OP value.
func (p *Parser) parseCond() (Condition, error) {
var cond Condition
if err := p.require(TTag); err != nil {
return cond, err
}
cond.Tag = p.scanner.Text()
if err := p.require(TLeq, TGeq, TLt, TGt, TEq, TContains, TExists); err != nil {
return cond, err
}
cond.Op = p.scanner.Token()
cond.opText = p.scanner.Text()
var err error
switch cond.Op {
case TLeq, TGeq, TLt, TGt:
err = p.require(TNumber, TTime, TDate)
case TEq:
err = p.require(TNumber, TTime, TDate, TString)
case TContains:
err = p.require(TString)
case TExists:
// no argument
return cond, nil
default:
return cond, fmt.Errorf("offset %d: unexpected operator %v", p.scanner.Pos(), cond.Op)
}
if err != nil {
return cond, err
}
cond.Arg = &Arg{Type: p.scanner.Token(), text: p.scanner.Text()}
return cond, nil
}
// require advances the scanner and requires that the resulting token is one of
// the specified token types.
func (p *Parser) require(tokens ...Token) error {
if err := p.scanner.Next(); err != nil {
return fmt.Errorf("offset %d: %w", p.scanner.Pos(), err)
}
got := p.scanner.Token()
for _, tok := range tokens {
if tok == got {
return nil
}
}
return fmt.Errorf("offset %d: got %v, wanted %s", p.scanner.Pos(), got, tokLabel(tokens))
}
// tokLabel makes a human-readable summary string for the given token types.
func tokLabel(tokens []Token) string {
if len(tokens) == 1 {
return tokens[0].String()
}
last := len(tokens) - 1
ss := make([]string, len(tokens)-1)
for i, tok := range tokens[:last] {
ss[i] = tok.String()
}
return strings.Join(ss, ", ") + " or " + tokens[last].String()
}
// ParseDate parses s as a date string in the format used by DATE values.
func ParseDate(s string) (time.Time, error) {
return time.Parse("2006-01-02", s)
}
// ParseTime parses s as a timestamp in the format used by TIME values.
func ParseTime(s string) (time.Time, error) {
return time.Parse(time.RFC3339, s)
}

View File

@@ -1,312 +0,0 @@
package syntax
import (
"bufio"
"bytes"
"fmt"
"io"
"strings"
"time"
"unicode"
)
// Token is the type of a lexical token in the query grammar.
type Token byte
const (
TInvalid = iota // invalid or unknown token
TTag // field tag: x.y
TString // string value: 'foo bar'
TNumber // number: 0, 15.5, 100
TTime // timestamp: TIME yyyy-mm-ddThh:mm:ss([-+]hh:mm|Z)
TDate // datestamp: DATE yyyy-mm-dd
TAnd // operator: AND
TContains // operator: CONTAINS
TExists // operator: EXISTS
TEq // operator: =
TLt // operator: <
TLeq // operator: <=
TGt // operator: >
TGeq // operator: >=
// Do not reorder these values without updating the scanner code.
)
var tString = [...]string{
TInvalid: "invalid token",
TTag: "tag",
TString: "string",
TNumber: "number",
TTime: "timestamp",
TDate: "datestamp",
TAnd: "AND operator",
TContains: "CONTAINS operator",
TExists: "EXISTS operator",
TEq: "= operator",
TLt: "< operator",
TLeq: "<= operator",
TGt: "> operator",
TGeq: ">= operator",
}
func (t Token) String() string {
v := int(t)
if v > len(tString) {
return "unknown token type"
}
return tString[v]
}
const (
// TimeFormat is the format string used for timestamp values.
TimeFormat = time.RFC3339
// DateFormat is the format string used for datestamp values.
DateFormat = "2006-01-02"
)
// Scanner reads lexical tokens of the query language from an input stream.
// Each call to Next advances the scanner to the next token, or reports an
// error.
type Scanner struct {
r *bufio.Reader
buf bytes.Buffer
tok Token
err error
pos, last, end int
}
// NewScanner constructs a new scanner that reads from r.
func NewScanner(r io.Reader) *Scanner { return &Scanner{r: bufio.NewReader(r)} }
// Next advances s to the next token in the input, or reports an error. At the
// end of input, Next returns io.EOF.
func (s *Scanner) Next() error {
s.buf.Reset()
s.pos = s.end
s.tok = TInvalid
s.err = nil
for {
ch, err := s.rune()
if err != nil {
return s.fail(err)
}
if unicode.IsSpace(ch) {
s.pos = s.end
continue // skip whitespace
}
if '0' <= ch && ch <= '9' {
return s.scanNumber(ch)
} else if isTagRune(ch) {
return s.scanTagLike(ch)
}
switch ch {
case '\'':
return s.scanString(ch)
case '<', '>', '=':
return s.scanCompare(ch)
default:
return s.invalid(ch)
}
}
}
// Token returns the type of the current input token.
func (s *Scanner) Token() Token { return s.tok }
// Text returns the text of the current input token.
func (s *Scanner) Text() string { return s.buf.String() }
// Pos returns the start offset of the current token in the input.
func (s *Scanner) Pos() int { return s.pos }
// Err returns the last error reported by Next, if any.
func (s *Scanner) Err() error { return s.err }
// scanNumber scans for numbers with optional fractional parts.
// Examples: 0, 1, 3.14
func (s *Scanner) scanNumber(first rune) error {
s.buf.WriteRune(first)
if err := s.scanWhile(isDigit); err != nil {
return err
}
ch, err := s.rune()
if err != nil && err != io.EOF {
return err
}
if ch == '.' {
s.buf.WriteRune(ch)
if err := s.scanWhile(isDigit); err != nil {
return err
}
} else {
s.unrune()
}
s.tok = TNumber
return nil
}
func (s *Scanner) scanString(first rune) error {
// discard opening quote
for {
ch, err := s.rune()
if err != nil {
return s.fail(err)
} else if ch == first {
// discard closing quote
s.tok = TString
return nil
}
s.buf.WriteRune(ch)
}
}
func (s *Scanner) scanCompare(first rune) error {
s.buf.WriteRune(first)
switch first {
case '=':
s.tok = TEq
return nil
case '<':
s.tok = TLt
case '>':
s.tok = TGt
default:
return s.invalid(first)
}
ch, err := s.rune()
if err == io.EOF {
return nil // the assigned token is correct
} else if err != nil {
return s.fail(err)
}
if ch == '=' {
s.buf.WriteRune(ch)
s.tok++ // depends on token order
return nil
}
s.unrune()
return nil
}
func (s *Scanner) scanTagLike(first rune) error {
s.buf.WriteRune(first)
var hasSpace bool
for {
ch, err := s.rune()
if err == io.EOF {
break
} else if err != nil {
return s.fail(err)
}
if !isTagRune(ch) {
hasSpace = ch == ' ' // to check for TIME, DATE
break
}
s.buf.WriteRune(ch)
}
text := s.buf.String()
switch text {
case "TIME":
if hasSpace {
return s.scanTimestamp()
}
s.tok = TTag
case "DATE":
if hasSpace {
return s.scanDatestamp()
}
s.tok = TTag
case "AND":
s.tok = TAnd
case "EXISTS":
s.tok = TExists
case "CONTAINS":
s.tok = TContains
default:
s.tok = TTag
}
s.unrune()
return nil
}
func (s *Scanner) scanTimestamp() error {
s.buf.Reset() // discard "TIME" label
if err := s.scanWhile(isTimeRune); err != nil {
return err
}
if ts, err := time.Parse(TimeFormat, s.buf.String()); err != nil {
return s.fail(fmt.Errorf("invalid TIME value: %w", err))
} else if y := ts.Year(); y < 1900 || y > 2999 {
return s.fail(fmt.Errorf("timestamp year %d out of range", ts.Year()))
}
s.tok = TTime
return nil
}
func (s *Scanner) scanDatestamp() error {
s.buf.Reset() // discard "DATE" label
if err := s.scanWhile(isDateRune); err != nil {
return err
}
if ts, err := time.Parse(DateFormat, s.buf.String()); err != nil {
return s.fail(fmt.Errorf("invalid DATE value: %w", err))
} else if y := ts.Year(); y < 1900 || y > 2999 {
return s.fail(fmt.Errorf("datestamp year %d out of range", ts.Year()))
}
s.tok = TDate
return nil
}
func (s *Scanner) scanWhile(ok func(rune) bool) error {
for {
ch, err := s.rune()
if err == io.EOF {
return nil
} else if err != nil {
return s.fail(err)
} else if !ok(ch) {
s.unrune()
return nil
}
s.buf.WriteRune(ch)
}
}
func (s *Scanner) rune() (rune, error) {
ch, nb, err := s.r.ReadRune()
s.last = nb
s.end += nb
return ch, err
}
func (s *Scanner) unrune() {
_ = s.r.UnreadRune()
s.end -= s.last
}
func (s *Scanner) fail(err error) error {
s.err = err
return err
}
func (s *Scanner) invalid(ch rune) error {
return s.fail(fmt.Errorf("invalid input %c at offset %d", ch, s.end))
}
func isDigit(r rune) bool { return '0' <= r && r <= '9' }
func isTagRune(r rune) bool {
return r == '.' || r == '_' || unicode.IsLetter(r) || unicode.IsDigit(r)
}
func isTimeRune(r rune) bool {
return strings.ContainsRune("-T:+Z", r) || isDigit(r)
}
func isDateRune(r rune) bool { return isDigit(r) || r == '-' }

View File

@@ -1,190 +0,0 @@
package syntax_test
import (
"io"
"reflect"
"strings"
"testing"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
func TestScanner(t *testing.T) {
tests := []struct {
input string
want []syntax.Token
}{
// Empty inputs
{"", nil},
{" ", nil},
{"\t\n ", nil},
// Numbers
{`0 123`, []syntax.Token{syntax.TNumber, syntax.TNumber}},
{`0.32 3.14`, []syntax.Token{syntax.TNumber, syntax.TNumber}},
// Tags
{`foo foo.bar`, []syntax.Token{syntax.TTag, syntax.TTag}},
// Strings (values)
{` '' x 'x' 'x y'`, []syntax.Token{syntax.TString, syntax.TTag, syntax.TString, syntax.TString}},
{` 'you are not your job' `, []syntax.Token{syntax.TString}},
// Comparison operators
{`< <= = > >=`, []syntax.Token{
syntax.TLt, syntax.TLeq, syntax.TEq, syntax.TGt, syntax.TGeq,
}},
// Mixed values of various kinds.
{`x AND y`, []syntax.Token{syntax.TTag, syntax.TAnd, syntax.TTag}},
{`x.y CONTAINS 'z'`, []syntax.Token{syntax.TTag, syntax.TContains, syntax.TString}},
{`foo EXISTS`, []syntax.Token{syntax.TTag, syntax.TExists}},
{`and AND`, []syntax.Token{syntax.TTag, syntax.TAnd}},
// Timestamp
{`TIME 2021-11-23T15:16:17Z`, []syntax.Token{syntax.TTime}},
// Datestamp
{`DATE 2021-11-23`, []syntax.Token{syntax.TDate}},
}
for _, test := range tests {
s := syntax.NewScanner(strings.NewReader(test.input))
var got []syntax.Token
for s.Next() == nil {
got = append(got, s.Token())
}
if err := s.Err(); err != io.EOF {
t.Errorf("Next: unexpected error: %v", err)
}
if !reflect.DeepEqual(got, test.want) {
t.Logf("Scanner input: %q", test.input)
t.Errorf("Wrong tokens:\ngot: %+v\nwant: %+v", got, test.want)
}
}
}
func TestScannerErrors(t *testing.T) {
tests := []struct {
input string
}{
{`'incomplete string`},
{`-23`},
{`&`},
{`DATE xyz-pdq`},
{`DATE xyzp-dq-zv`},
{`DATE 0000-00-00`},
{`DATE 0000-00-000`},
{`DATE 2021-01-99`},
{`TIME 2021-01-01T34:56:78Z`},
{`TIME 2021-01-99T14:56:08Z`},
{`TIME 2021-01-99T34:56:08`},
{`TIME 2021-01-99T34:56:11+3`},
}
for _, test := range tests {
s := syntax.NewScanner(strings.NewReader(test.input))
if err := s.Next(); err == nil {
t.Errorf("Next: got %v (%#q), want error", s.Token(), s.Text())
}
}
}
// These parser tests were copied from the original implementation of the query
// parser, and are preserved here as a compatibility check.
func TestParseValid(t *testing.T) {
tests := []struct {
input string
valid bool
}{
{"tm.events.type='NewBlock'", true},
{"tm.events.type = 'NewBlock'", true},
{"tm.events.name = ''", true},
{"tm.events.type='TIME'", true},
{"tm.events.type='DATE'", true},
{"tm.events.type='='", true},
{"tm.events.type='TIME", false},
{"tm.events.type=TIME'", false},
{"tm.events.type==", false},
{"tm.events.type=NewBlock", false},
{">==", false},
{"tm.events.type 'NewBlock' =", false},
{"tm.events.type>'NewBlock'", false},
{"", false},
{"=", false},
{"='NewBlock'", false},
{"tm.events.type=", false},
{"tm.events.typeNewBlock", false},
{"tm.events.type'NewBlock'", false},
{"'NewBlock'", false},
{"NewBlock", false},
{"", false},
{"tm.events.type='NewBlock' AND abci.account.name='Igor'", true},
{"tm.events.type='NewBlock' AND", false},
{"tm.events.type='NewBlock' AN", false},
{"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false},
{"AND tm.events.type='NewBlock' ", false},
{"abci.account.name CONTAINS 'Igor'", true},
{"tx.date > DATE 2013-05-03", true},
{"tx.date < DATE 2013-05-03", true},
{"tx.date <= DATE 2013-05-03", true},
{"tx.date >= DATE 2013-05-03", true},
{"tx.date >= DAT 2013-05-03", false},
{"tx.date <= DATE2013-05-03", false},
{"tx.date <= DATE -05-03", false},
{"tx.date >= DATE 20130503", false},
{"tx.date >= DATE 2013+01-03", false},
// incorrect year, month, day
{"tx.date >= DATE 0013-01-03", false},
{"tx.date >= DATE 2013-31-03", false},
{"tx.date >= DATE 2013-01-83", false},
{"tx.date > TIME 2013-05-03T14:45:00+07:00", true},
{"tx.date < TIME 2013-05-03T14:45:00-02:00", true},
{"tx.date <= TIME 2013-05-03T14:45:00Z", true},
{"tx.date >= TIME 2013-05-03T14:45:00Z", true},
{"tx.date >= TIME2013-05-03T14:45:00Z", false},
{"tx.date = IME 2013-05-03T14:45:00Z", false},
{"tx.date = TIME 2013-05-:45:00Z", false},
{"tx.date >= TIME 2013-05-03T14:45:00", false},
{"tx.date >= TIME 0013-00-00T14:45:00Z", false},
{"tx.date >= TIME 2013+05=03T14:45:00Z", false},
{"account.balance=100", true},
{"account.balance >= 200", true},
{"account.balance >= -300", false},
{"account.balance >>= 400", false},
{"account.balance=33.22.1", false},
{"slashing.amount EXISTS", true},
{"slashing.amount EXISTS AND account.balance=100", true},
{"account.balance=100 AND slashing.amount EXISTS", true},
{"slashing EXISTS", true},
{"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true},
{"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false},
}
for _, test := range tests {
q, err := syntax.Parse(test.input)
if test.valid != (err == nil) {
t.Errorf("Parse %#q: valid %v got err=%v", test.input, test.valid, err)
}
// For valid queries, check that the query round-trips.
if test.valid {
qstr := q.String()
r, err := syntax.Parse(qstr)
if err != nil {
t.Errorf("Reparse %#q failed: %v", qstr, err)
}
if rstr := r.String(); rstr != qstr {
t.Errorf("Reparse diff\nold: %#q\nnew: %#q", qstr, rstr)
}
}
}
}

View File

@@ -1,113 +0,0 @@
package pubsub
import "github.com/tendermint/tendermint/abci/types"
// An item to be published to subscribers.
type item struct {
Data interface{}
Events []types.Event
}
// A subInfo value records a single subscription.
type subInfo struct {
clientID string // chosen by the client
query Query // chosen by the client
subID string // assigned at registration
sub *Subscription // receives published events
}
// A subInfoSet is an unordered set of subscription info records.
type subInfoSet map[*subInfo]struct{}
func (s subInfoSet) contains(si *subInfo) bool { _, ok := s[si]; return ok }
func (s subInfoSet) add(si *subInfo) { s[si] = struct{}{} }
func (s subInfoSet) remove(si *subInfo) { delete(s, si) }
// withQuery returns the subset of s whose query string matches qs.
func (s subInfoSet) withQuery(qs string) subInfoSet {
out := make(subInfoSet)
for si := range s {
if si.query.String() == qs {
out.add(si)
}
}
return out
}
// A subIndex is an indexed collection of subscription info records.
// The index is not safe for concurrent use without external synchronization.
type subIndex struct {
all subInfoSet // all subscriptions
byClient map[string]subInfoSet // per-client subscriptions
byQuery map[string]subInfoSet // per-query subscriptions
// TODO(creachadair): We allow indexing by query to support existing use by
// the RPC service methods for event streaming. Fix up those methods not to
// require this, and then remove indexing by query.
}
// newSubIndex constructs a new, empty subscription index.
func newSubIndex() *subIndex {
return &subIndex{
all: make(subInfoSet),
byClient: make(map[string]subInfoSet),
byQuery: make(map[string]subInfoSet),
}
}
// findClients returns the set of subscriptions for the given client ID, or nil.
func (idx *subIndex) findClientID(id string) subInfoSet { return idx.byClient[id] }
// findQuery returns the set of subscriptions on the given query string, or nil.
func (idx *subIndex) findQuery(qs string) subInfoSet { return idx.byQuery[qs] }
// contains reports whether idx contains any subscription matching the given
// client ID and query pair.
func (idx *subIndex) contains(clientID, query string) bool {
csubs, qsubs := idx.byClient[clientID], idx.byQuery[query]
if len(csubs) == 0 || len(qsubs) == 0 {
return false
}
for si := range csubs {
if qsubs.contains(si) {
return true
}
}
return false
}
// add adds si to the index, replacing any previous entry with the same terms.
// It is the caller's responsibility to check for duplicates before adding.
// See also the contains method.
func (idx *subIndex) add(si *subInfo) {
idx.all.add(si)
if m := idx.byClient[si.clientID]; m == nil {
idx.byClient[si.clientID] = subInfoSet{si: struct{}{}}
} else {
m.add(si)
}
qs := si.query.String()
if m := idx.byQuery[qs]; m == nil {
idx.byQuery[qs] = subInfoSet{si: struct{}{}}
} else {
m.add(si)
}
}
// removeAll removes all the elements of s from the index.
func (idx *subIndex) removeAll(s subInfoSet) {
for si := range s {
idx.all.remove(si)
idx.byClient[si.clientID].remove(si)
if len(idx.byClient[si.clientID]) == 0 {
delete(idx.byClient, si.clientID)
}
if si.query != nil {
qs := si.query.String()
idx.byQuery[qs].remove(si)
if len(idx.byQuery[qs]) == 0 {
delete(idx.byQuery, qs)
}
}
}
}

View File

@@ -1,88 +0,0 @@
package pubsub
import (
"context"
"errors"
"github.com/google/uuid"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/libs/queue"
)
var (
// ErrUnsubscribed is returned by Next when the client has unsubscribed.
ErrUnsubscribed = errors.New("subscription removed by client")
// ErrTerminated is returned by Next when the subscription was terminated by
// the publisher.
ErrTerminated = errors.New("subscription terminated by publisher")
)
// A Subscription represents a client subscription for a particular query.
type Subscription struct {
id string
queue *queue.Queue // open until the subscription ends
stopErr error // after queue is closed, the reason why
}
// newSubscription returns a new subscription with the given queue capacity.
func newSubscription(quota, limit int) (*Subscription, error) {
queue, err := queue.New(queue.Options{
SoftQuota: quota,
HardLimit: limit,
})
if err != nil {
return nil, err
}
return &Subscription{
id: uuid.NewString(),
queue: queue,
}, nil
}
// Next blocks until a message is available, ctx ends, or the subscription
// ends. Next returns ErrUnsubscribed if s was unsubscribed, ErrTerminated if
// s was terminated by the publisher, or a context error if ctx ended without a
// message being available.
func (s *Subscription) Next(ctx context.Context) (Message, error) {
next, err := s.queue.Wait(ctx)
if errors.Is(err, queue.ErrQueueClosed) {
return Message{}, s.stopErr
} else if err != nil {
return Message{}, err
}
return next.(Message), nil
}
// ID returns the unique subscription identifier for s.
func (s *Subscription) ID() string { return s.id }
// publish transmits msg to the subscriber. It reports a queue error if the
// queue cannot accept any further messages.
func (s *Subscription) publish(msg Message) error { return s.queue.Add(msg) }
// stop terminates the subscription with the given error reason.
func (s *Subscription) stop(err error) {
if err == nil {
panic("nil stop error")
}
s.stopErr = err
s.queue.Close()
}
// Message glues data and events together.
type Message struct {
subID string
data interface{}
events []types.Event
}
// SubscriptionID returns the unique identifier for the subscription
// that produced this message.
func (msg Message) SubscriptionID() string { return msg.subID }
// Data returns an original data published.
func (msg Message) Data() interface{} { return msg.data }
// Events returns events, which matched the client's query.
func (msg Message) Events() []types.Event { return msg.events }