From 079c7af00740d87605c9cf6a3cb04fd080b2b508 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 25 Jan 2022 11:16:48 -0800 Subject: [PATCH] pubsub: use concrete queries instead of an interface (#7686) Remove the pubsub.Query interface and instead use the concrete query type. Nothing uses any other implementation but pubsub/query. * query: remove the error from the Matches method * Update all usage. --- internal/consensus/state_test.go | 3 +- internal/eventbus/event_bus.go | 5 ++-- internal/eventbus/event_bus_test.go | 4 +-- internal/pubsub/pubsub.go | 46 ++++++++--------------------- internal/pubsub/pubsub_test.go | 4 --- internal/pubsub/query/bench_test.go | 5 +--- internal/pubsub/query/query.go | 25 +++++++--------- internal/pubsub/query/query_test.go | 14 ++------- internal/pubsub/subindex.go | 7 +++-- types/events.go | 5 ++-- 10 files changed, 39 insertions(+), 79 deletions(-) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 32517d90f..c1b7eaf9c 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -15,6 +15,7 @@ import ( cstypes "github.com/tendermint/tendermint/internal/consensus/types" "github.com/tendermint/tendermint/internal/eventbus" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" + tmquery "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" @@ -2076,7 +2077,7 @@ func subscribe( ctx context.Context, t *testing.T, eventBus *eventbus.EventBus, - q tmpubsub.Query, + q *tmquery.Query, ) <-chan tmpubsub.Message { t.Helper() sub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index 58f357165..f93b231d2 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -7,6 +7,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" + tmquery "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" @@ -51,7 +52,7 @@ func (b *EventBus) NumClientSubscriptions(clientID string) int { // Deprecated: Use SubscribeWithArgs instead. func (b *EventBus) Subscribe(ctx context.Context, - clientID string, query tmpubsub.Query, capacities ...int) (Subscription, error) { + clientID string, query *tmquery.Query, capacities ...int) (Subscription, error) { return b.pubsub.Subscribe(ctx, clientID, query, capacities...) } @@ -68,7 +69,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error return b.pubsub.UnsubscribeAll(ctx, subscriber) } -func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) error, queries ...tmpubsub.Query) error { +func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) error, queries ...*tmquery.Query) error { return b.pubsub.Observe(ctx, observe, queries...) } diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go index 6569d54fd..dd711564e 100644 --- a/internal/eventbus/event_bus_test.go +++ b/internal/eventbus/event_bus_test.go @@ -500,7 +500,7 @@ func randEventValue() string { return events[mrand.Intn(len(events))] } -var queries = []tmpubsub.Query{ +var queries = []*tmquery.Query{ types.EventQueryNewBlock, types.EventQueryNewBlockHeader, types.EventQueryNewRound, @@ -517,6 +517,6 @@ var queries = []tmpubsub.Query{ types.EventQueryStateSyncStatus, } -func randQuery() tmpubsub.Query { +func randQuery() *tmquery.Query { return queries[mrand.Intn(len(queries))] } diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index aebc9c9e3..53c631de4 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -41,6 +41,7 @@ import ( "sync" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) @@ -59,32 +60,21 @@ var ( ErrServerStopped = errors.New("pubsub server is stopped") ) -// Query defines an interface for a query to be used for subscribing. A query -// matches against a map of events. Each key in this map is a composite of the -// even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the -// values are the event values that are contained under that relationship. This -// allows event types to repeat themselves with the same set of keys and -// different values. -type Query interface { - Matches(events []types.Event) (bool, error) - String() string -} - // SubscribeArgs are the parameters to create a new subscription. type SubscribeArgs struct { - ClientID string // Client ID - Query Query // filter query for events (required) - Limit int // subscription queue capacity limit (0 means 1) - Quota int // subscription queue soft quota (0 uses Limit) + ClientID string // Client ID + Query *query.Query // filter query for events (required) + Limit int // subscription queue capacity limit (0 means 1) + Quota int // subscription queue soft quota (0 uses Limit) } // UnsubscribeArgs are the parameters to remove a subscription. // The subscriber ID must be populated, and at least one of the client ID or // the registered query. type UnsubscribeArgs struct { - Subscriber string // subscriber ID chosen by the client (required) - ID string // subscription ID (assigned by the server) - Query Query // the query registered with the subscription + Subscriber string // subscriber ID chosen by the client (required) + ID string // subscription ID (assigned by the server) + Query *query.Query // the query registered with the subscription } // Validate returns nil if args are valid to identify a subscription to remove. @@ -93,10 +83,6 @@ func (args UnsubscribeArgs) Validate() error { if args.Subscriber == "" { return errors.New("must specify a subscriber") } - if args.ID == "" && args.Query == nil { - return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber) - } - return nil } @@ -170,7 +156,7 @@ func (s *Server) BufferCapacity() int { return cap(s.queue) } // If len(capacities) > 0, its first value is used as the queue capacity. // // Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36. -func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, capacities ...int) (*Subscription, error) { +func (s *Server) Subscribe(ctx context.Context, clientID string, query *query.Query, capacities ...int) (*Subscription, error) { args := SubscribeArgs{ ClientID: clientID, Query: query, @@ -191,7 +177,7 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ca // being forwarded to any subscriber. If no queries are specified, all // messages will be observed. An error is reported if an observer is already // registered. -func (s *Server) Observe(ctx context.Context, observe func(Message) error, queries ...Query) error { +func (s *Server) Observe(ctx context.Context, observe func(Message) error, queries ...*query.Query) error { s.subs.Lock() defer s.subs.Unlock() if observe == nil { @@ -207,8 +193,7 @@ func (s *Server) Observe(ctx context.Context, observe func(Message) error, queri } else { matches = func(msg Message) bool { for _, q := range queries { - match, err := q.Matches(msg.events) - if err == nil && match { + if q.Matches(msg.events) { return true } } @@ -229,9 +214,6 @@ func (s *Server) Observe(ctx context.Context, observe func(Message) error, queri // error if the query is nil, a subscription already exists for the specified // client ID and query, or if the capacity arguments are invalid. func (s *Server) SubscribeWithArgs(ctx context.Context, args SubscribeArgs) (*Subscription, error) { - if args.Query == nil { - return nil, errors.New("query is nil") - } s.subs.Lock() defer s.subs.Unlock() @@ -440,11 +422,7 @@ func (s *Server) send(data interface{}, events []types.Event) error { } for si := range s.subs.index.all { - match, err := si.query.Matches(events) - if err != nil { - return fmt.Errorf("match failed against query: %w", err) - // TODO(creachadair): Should we evict this subscription? - } else if !match { + if !si.query.Matches(events) { continue } diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go index c4da551d0..c11ba4cf5 100644 --- a/internal/pubsub/pubsub_test.go +++ b/internal/pubsub/pubsub_test.go @@ -119,10 +119,6 @@ func TestSubscribeErrors(t *testing.T) { logger := log.TestingLogger() s := newTestServer(ctx, t, logger) - t.Run("EmptyQueryErr", func(t *testing.T) { - _, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ClientID: clientID}) - require.Error(t, err) - }) t.Run("NegativeLimitErr", func(t *testing.T) { _, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, diff --git a/internal/pubsub/query/bench_test.go b/internal/pubsub/query/bench_test.go index 28f5184ab..0916e9c8a 100644 --- a/internal/pubsub/query/bench_test.go +++ b/internal/pubsub/query/bench_test.go @@ -48,10 +48,7 @@ func BenchmarkMatchCustom(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - ok, err := q.Matches(testEvents) - if err != nil { - b.Fatal(err) - } else if !ok { + if !q.Matches(testEvents) { b.Error("no match") } } diff --git a/internal/pubsub/query/query.go b/internal/pubsub/query/query.go index ce70238a5..23510a75d 100644 --- a/internal/pubsub/query/query.go +++ b/internal/pubsub/query/query.go @@ -67,13 +67,18 @@ func Compile(ast syntax.Query) (*Query, error) { return &Query{ast: ast, conds: conds}, nil } -// Matches satisfies part of the pubsub.Query interface. This implementation -// never reports an error. A nil *Query matches all events. -func (q *Query) Matches(events []types.Event) (bool, error) { +// Matches reports whether q matches the given events. If q == nil, the query +// matches any non-empty collection of events. +func (q *Query) Matches(events []types.Event) bool { if q == nil { - return true, nil + return true } - return q.matchesEvents(events), nil + for _, cond := range q.conds { + if !cond.matchesAny(events) { + return false + } + } + return len(events) != 0 } // String matches part of the pubsub.Query interface. @@ -92,16 +97,6 @@ func (q *Query) Syntax() syntax.Query { return q.ast } -// matchesEvents reports whether all the conditions match the given events. -func (q *Query) matchesEvents(events []types.Event) bool { - for _, cond := range q.conds { - if !cond.matchesAny(events) { - return false - } - } - return len(events) != 0 -} - // A condition is a compiled match condition. A condition matches an event if // the event has the designated type, contains an attribute with the given // name, and the match function returns true for the attribute value. diff --git a/internal/pubsub/query/query_test.go b/internal/pubsub/query/query_test.go index d79600e27..fc5fd82f0 100644 --- a/internal/pubsub/query/query_test.go +++ b/internal/pubsub/query/query_test.go @@ -7,13 +7,10 @@ import ( "time" "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/internal/pubsub/query/syntax" ) -var _ pubsub.Query = (*query.Query)(nil) - // Example events from the OpenAPI documentation: // https://github.com/tendermint/tendermint/blob/master/rpc/openapi/openapi.yaml // @@ -210,11 +207,7 @@ func TestCompiledMatches(t *testing.T) { t.Fatalf("NewCompiled %#q: unexpected error: %v", tc.s, err) } - got, err := c.Matches(tc.events) - if err != nil { - t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v", - tc.s, tc.events, err) - } + got := c.Matches(tc.events) if got != tc.matches { t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v", tc.s, tc.events, got, tc.matches) @@ -231,10 +224,7 @@ func TestAllMatchesAll(t *testing.T) { `Rilly|Blue=`, ) for i := 0; i < len(events); i++ { - match, err := query.All.Matches(events[:i]) - if err != nil { - t.Errorf("Matches failed: %w", err) - } else if !match { + if !query.All.Matches(events[:i]) { t.Errorf("Did not match on %+v ", events[:i]) } } diff --git a/internal/pubsub/subindex.go b/internal/pubsub/subindex.go index 48dccf72d..c9bb1ae0e 100644 --- a/internal/pubsub/subindex.go +++ b/internal/pubsub/subindex.go @@ -1,6 +1,9 @@ package pubsub -import "github.com/tendermint/tendermint/abci/types" +import ( + "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/internal/pubsub/query" +) // An item to be published to subscribers. type item struct { @@ -11,7 +14,7 @@ type item struct { // A subInfo value records a single subscription. type subInfo struct { clientID string // chosen by the client - query Query // chosen by the client + query *query.Query // chosen by the client subID string // assigned at registration sub *Subscription // receives published events } diff --git a/types/events.go b/types/events.go index 7a4d7d543..68139fec6 100644 --- a/types/events.go +++ b/types/events.go @@ -7,7 +7,6 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/jsontypes" - tmpubsub "github.com/tendermint/tendermint/internal/pubsub" tmquery "github.com/tendermint/tendermint/internal/pubsub/query" ) @@ -266,11 +265,11 @@ var ( EventQueryStateSyncStatus = QueryForEvent(EventStateSyncStatusValue) ) -func EventQueryTxFor(tx Tx) tmpubsub.Query { +func EventQueryTxFor(tx Tx) *tmquery.Query { return tmquery.MustCompile(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTxValue, TxHashKey, tx.Hash())) } -func QueryForEvent(eventValue string) tmpubsub.Query { +func QueryForEvent(eventValue string) *tmquery.Query { return tmquery.MustCompile(fmt.Sprintf("%s='%s'", EventTypeKey, eventValue)) }