pubsub: Refactor Event Subscription (#6634)

This commit is contained in:
Aleksandr Bezobchuk
2021-07-01 11:17:48 -04:00
committed by GitHub
parent b0a413eb17
commit 414130aee1
19 changed files with 428 additions and 226 deletions

View File

@@ -32,7 +32,7 @@ type Eventable interface {
//
// FireEvent fires an event with the given name and data.
type Fireable interface {
FireEvent(event string, data EventData)
FireEvent(eventValue string, data EventData)
}
// EventSwitch is the interface for synchronous pubsub, where listeners
@@ -46,7 +46,7 @@ type EventSwitch interface {
service.Service
Fireable
AddListenerForEvent(listenerID, event string, cb EventCallback) error
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
RemoveListenerForEvent(event string, listenerID string)
RemoveListener(listenerID string)
}
@@ -74,27 +74,29 @@ func (evsw *eventSwitch) OnStart() error {
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error {
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
// Get/Create eventCell and listener.
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
eventCell := evsw.eventCells[eventValue]
if eventCell == nil {
eventCell = newEventCell()
evsw.eventCells[event] = eventCell
evsw.eventCells[eventValue] = eventCell
}
listener := evsw.listeners[listenerID]
if listener == nil {
listener = newEventListener(listenerID)
evsw.listeners[listenerID] = listener
}
evsw.mtx.Unlock()
// Add event and listener.
if err := listener.AddEvent(event); err != nil {
if err := listener.AddEvent(eventValue); err != nil {
return err
}
eventCell.AddListener(listenerID, cb)
eventCell.AddListener(listenerID, cb)
return nil
}

View File

@@ -6,8 +6,8 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
@@ -15,8 +15,9 @@ import (
func TestExample(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
err := s.Start()
require.NoError(t, err)
require.NoError(t, s.Start())
t.Cleanup(func() {
if err := s.Stop(); err != nil {
t.Error(err)
@@ -24,9 +25,18 @@ func TestExample(t *testing.T) {
})
ctx := context.Background()
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Tombstone", map[string][]string{"abci.account.name": {"John"}})
events := []abci.Event{
{
Type: "abci.account",
Attributes: []abci.EventAttribute{{Key: "name", Value: "John"}},
},
}
err = s.PublishWithEvents(ctx, "Tombstone", events)
require.NoError(t, err)
assertReceive(t, "Tombstone", subscription.Out())
}

View File

@@ -39,6 +39,7 @@ import (
"errors"
"fmt"
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/service"
@@ -70,7 +71,7 @@ var (
// allows event types to repeat themselves with the same set of keys and
// different values.
type Query interface {
Matches(events map[string][]string) (bool, error)
Matches(events []types.Event) (bool, error)
String() string
}
@@ -102,7 +103,7 @@ type cmd struct {
// publish
msg interface{}
events map[string][]string
events []types.Event
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
@@ -314,13 +315,13 @@ func (s *Server) NumClientSubscriptions(clientID string) int {
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithEvents(ctx, msg, make(map[string][]string))
return s.PublishWithEvents(ctx, msg, []types.Event{})
}
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events map[string][]string) error {
func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error {
select {
case s.cmds <- cmd{op: pub, msg: msg, events: events}:
return nil
@@ -473,7 +474,7 @@ func (state *state) removeAll(reason error) {
}
}
func (state *state) send(msg interface{}, events map[string][]string) error {
func (state *state) send(msg interface{}, events []types.Event) error {
for qStr, clientSubscriptions := range state.subscriptions {
if sub, ok := clientSubscriptions[qStr]; ok && sub.id == qStr {
continue

View File

@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
@@ -35,8 +36,8 @@ func TestSubscribe(t *testing.T) {
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
assert.Equal(t, 1, s.NumClients())
assert.Equal(t, 1, s.NumClientSubscriptions(clientID))
require.Equal(t, 1, s.NumClients())
require.Equal(t, 1, s.NumClientSubscriptions(clientID))
err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err)
@@ -47,13 +48,13 @@ func TestSubscribe(t *testing.T) {
defer close(published)
err := s.Publish(ctx, "Quicksilver")
assert.NoError(t, err)
require.NoError(t, err)
err = s.Publish(ctx, "Asylum")
assert.NoError(t, err)
require.NoError(t, err)
err = s.Publish(ctx, "Ivan")
assert.NoError(t, err)
require.NoError(t, err)
}()
select {
@@ -77,11 +78,11 @@ func TestSubscribeWithCapacity(t *testing.T) {
})
ctx := context.Background()
assert.Panics(t, func() {
require.Panics(t, func() {
_, err = s.Subscribe(ctx, clientID, query.Empty{}, -1)
require.NoError(t, err)
})
assert.Panics(t, func() {
require.Panics(t, func() {
_, err = s.Subscribe(ctx, clientID, query.Empty{}, 0)
require.NoError(t, err)
})
@@ -112,10 +113,10 @@ func TestSubscribeUnbuffered(t *testing.T) {
defer close(published)
err := s.Publish(ctx, "Ultron")
assert.NoError(t, err)
require.NoError(t, err)
err = s.Publish(ctx, "Darkhawk")
assert.NoError(t, err)
require.NoError(t, err)
}()
select {
@@ -152,8 +153,8 @@ func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
func TestDifferentClients(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
err := s.Start()
require.NoError(t, err)
require.NoError(t, s.Start())
t.Cleanup(func() {
if err := s.Stop(); err != nil {
t.Error(err)
@@ -161,10 +162,18 @@ func TestDifferentClients(t *testing.T) {
})
ctx := context.Background()
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Iceman", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err)
events := []abci.Event{
{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events))
assertReceive(t, "Iceman", subscription1.Out())
subscription2, err := s.Subscribe(
@@ -173,12 +182,19 @@ func TestDifferentClients(t *testing.T) {
query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"),
)
require.NoError(t, err)
err = s.PublishWithEvents(
ctx,
"Ultimo",
map[string][]string{"tm.events.type": {"NewBlock"}, "abci.account.name": {"Igor"}},
)
require.NoError(t, err)
events = []abci.Event{
{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
},
{
Type: "abci.account",
Attributes: []abci.EventAttribute{{Key: "name", Value: "Igor"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Ultimo", events))
assertReceive(t, "Ultimo", subscription1.Out())
assertReceive(t, "Ultimo", subscription2.Out())
@@ -188,16 +204,25 @@ func TestDifferentClients(t *testing.T) {
query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"),
)
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Valeria Richards", map[string][]string{"tm.events.type": {"NewRoundStep"}})
require.NoError(t, err)
assert.Zero(t, len(subscription3.Out()))
events = []abci.Event{
{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Valeria Richards", events))
require.Zero(t, len(subscription3.Out()))
}
func TestSubscribeDuplicateKeys(t *testing.T) {
ctx := context.Background()
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
require.NoError(t, s.Start())
t.Cleanup(func() {
if err := s.Stop(); err != nil {
t.Error(err)
@@ -230,15 +255,26 @@ func TestSubscribeDuplicateKeys(t *testing.T) {
sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustParse(tc.query))
require.NoError(t, err)
err = s.PublishWithEvents(
ctx,
"Iceman",
map[string][]string{
"transfer.sender": {"foo", "bar", "baz"},
"withdraw.rewards": {"1", "17", "22"},
events := []abci.Event{
{
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "sender", Value: "foo"},
{Key: "sender", Value: "bar"},
{Key: "sender", Value: "baz"},
},
},
)
require.NoError(t, err)
{
Type: "withdraw",
Attributes: []abci.EventAttribute{
{Key: "rewards", Value: "1"},
{Key: "rewards", Value: "17"},
{Key: "rewards", Value: "22"},
},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events))
if tc.expected != nil {
assertReceive(t, tc.expected, sub.Out())
@@ -264,16 +300,22 @@ func TestClientSubscribesTwice(t *testing.T) {
subscription1, err := s.Subscribe(ctx, clientID, q)
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Goblin Queen", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err)
events := []abci.Event{
{
Type: "tm.events",
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, "Goblin Queen", events))
assertReceive(t, "Goblin Queen", subscription1.Out())
subscription2, err := s.Subscribe(ctx, clientID, q)
require.Error(t, err)
require.Nil(t, subscription2)
err = s.PublishWithEvents(ctx, "Spider-Man", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err)
require.NoError(t, s.PublishWithEvents(ctx, "Spider-Man", events))
assertReceive(t, "Spider-Man", subscription1.Out())
}
@@ -298,7 +340,7 @@ func TestUnsubscribe(t *testing.T) {
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")
require.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")
assertCanceled(t, subscription, pubsub.ErrUnsubscribed)
}
@@ -325,9 +367,9 @@ func TestClientUnsubscribesTwice(t *testing.T) {
err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustParse("tm.events.type='NewBlock'")})
assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
require.Equal(t, pubsub.ErrSubscriptionNotFound, err)
err = s.UnsubscribeAll(ctx, clientID)
assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
require.Equal(t, pubsub.ErrSubscriptionNotFound, err)
}
func TestResubscribe(t *testing.T) {
@@ -376,8 +418,8 @@ func TestUnsubscribeAll(t *testing.T) {
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")
require.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
require.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")
assertCanceled(t, subscription1, pubsub.ErrUnsubscribed)
assertCanceled(t, subscription2, pubsub.ErrUnsubscribed)
@@ -387,7 +429,7 @@ func TestBufferCapacity(t *testing.T) {
s := pubsub.NewServer(pubsub.BufferCapacity(2))
s.SetLogger(log.TestingLogger())
assert.Equal(t, 2, s.BufferCapacity())
require.Equal(t, 2, s.BufferCapacity())
ctx := context.Background()
err := s.Publish(ctx, "Nighthawk")
@@ -397,9 +439,10 @@ func TestBufferCapacity(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
err = s.Publish(ctx, "Ironclad")
if assert.Error(t, err) {
assert.Equal(t, context.DeadlineExceeded, err)
require.Equal(t, context.DeadlineExceeded, err)
}
}
@@ -447,12 +490,18 @@ func benchmarkNClients(n int, b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = s.PublishWithEvents(
ctx,
"Gamora",
map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {string(rune(i))}},
)
require.NoError(b, err)
events := []abci.Event{
{
Type: "abci.Account",
Attributes: []abci.EventAttribute{{Key: "Owner", Value: "Ivan"}},
},
{
Type: "abci.Invoices",
Attributes: []abci.EventAttribute{{Key: "Number", Value: string(rune(i))}},
},
}
require.NoError(b, s.PublishWithEvents(ctx, "Gamora", events))
}
}
@@ -487,10 +536,20 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"},
"abci.Invoices.Number": {"1"}})
require.NoError(b, err)
events := []abci.Event{
{
Type: "abci.Account",
Attributes: []abci.EventAttribute{{Key: "Owner", Value: "Ivan"}},
},
{
Type: "abci.Invoices",
Attributes: []abci.EventAttribute{{Key: "Number", Value: "1"}},
},
}
require.NoError(b, s.PublishWithEvents(ctx, "Gamora", events))
}
}
@@ -499,7 +558,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) {
select {
case actual := <-ch:
assert.Equal(t, expected, actual.Data(), msgAndArgs...)
require.Equal(t, expected, actual.Data(), msgAndArgs...)
case <-time.After(1 * time.Second):
t.Errorf("expected to receive %v from the channel, got nothing after 1s", expected)
debug.PrintStack()
@@ -508,6 +567,6 @@ func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message,
func assertCanceled(t *testing.T, subscription *pubsub.Subscription, err error) {
_, ok := <-subscription.Canceled()
assert.False(t, ok)
assert.Equal(t, err, subscription.Err())
require.False(t, ok)
require.Equal(t, err, subscription.Err())
}

View File

@@ -1,11 +1,15 @@
package query
import (
"github.com/tendermint/tendermint/abci/types"
)
// Empty query matches any set of events.
type Empty struct {
}
// Matches always returns true.
func (Empty) Matches(tags map[string][]string) (bool, error) {
func (Empty) Matches(events []types.Event) (bool, error) {
return true, nil
}

View File

@@ -3,8 +3,8 @@ package query_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
@@ -12,17 +12,44 @@ func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{}
testCases := []struct {
query map[string][]string
events []abci.Event
}{
{map[string][]string{}},
{map[string][]string{"Asher": {"Roth"}}},
{map[string][]string{"Route": {"66"}}},
{map[string][]string{"Route": {"66"}, "Billy": {"Blue"}}},
{
[]abci.Event{},
},
{
[]abci.Event{
{
Type: "Asher",
Attributes: []abci.EventAttribute{{Key: "Roth"}},
},
},
},
{
[]abci.Event{
{
Type: "Route",
Attributes: []abci.EventAttribute{{Key: "66"}},
},
},
},
{
[]abci.Event{
{
Type: "Route",
Attributes: []abci.EventAttribute{{Key: "66"}},
},
{
Type: "Billy",
Attributes: []abci.EventAttribute{{Key: "Blue"}},
},
},
},
}
for _, tc := range testCases {
match, err := q.Matches(tc.query)
assert.Nil(t, err)
assert.True(t, match)
match, err := q.Matches(tc.events)
require.Nil(t, err)
require.True(t, match)
}
}

View File

@@ -15,6 +15,8 @@ import (
"strconv"
"strings"
"time"
"github.com/tendermint/tendermint/abci/types"
)
var (
@@ -198,11 +200,13 @@ func (q *Query) Conditions() ([]Condition, error) {
//
// For example, query "name=John" matches events = {"name": ["John", "Eric"]}.
// More examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(events map[string][]string) (bool, error) {
if len(events) == 0 {
func (q *Query) Matches(rawEvents []types.Event) (bool, error) {
if len(rawEvents) == 0 {
return false, nil
}
events := flattenEvents(rawEvents)
var (
eventAttr string
op Operator
@@ -500,3 +504,24 @@ func matchValue(value string, op Operator, operand reflect.Value) (bool, error)
return false, nil
}
func flattenEvents(events []types.Event) map[string][]string {
flattened := make(map[string][]string)
for _, event := range events {
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
compositeEvent := fmt.Sprintf("%s.%s", event.Type, attr.Key)
flattened[compositeEvent] = append(flattened[compositeEvent], attr.Value)
}
}
return flattened
}

View File

@@ -2,15 +2,38 @@ package query_test
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
func expandEvents(flattenedEvents map[string][]string) []abci.Event {
events := make([]abci.Event, len(flattenedEvents))
for composite, values := range flattenedEvents {
tokens := strings.Split(composite, ".")
attrs := make([]abci.EventAttribute, len(values))
for i, v := range values {
attrs[i] = abci.EventAttribute{
Key: tokens[len(tokens)-1],
Value: v,
}
}
events = append(events, abci.Event{
Type: strings.Join(tokens[:len(tokens)-1], "."),
Attributes: attrs,
})
}
return events
}
func TestMatches(t *testing.T) {
var (
txDate = "2017-01-01"
@@ -159,21 +182,23 @@ func TestMatches(t *testing.T) {
}
require.NotNil(t, q, "Query '%s' should not be nil", tc.s)
rawEvents := expandEvents(tc.events)
if tc.matches {
match, err := q.Matches(tc.events)
assert.Nil(t, err, "Query '%s' should not error on match %v", tc.s, tc.events)
assert.True(t, match, "Query '%s' should match %v", tc.s, tc.events)
match, err := q.Matches(rawEvents)
require.Nil(t, err, "Query '%s' should not error on match %v", tc.s, tc.events)
require.True(t, match, "Query '%s' should match %v", tc.s, tc.events)
} else {
match, err := q.Matches(tc.events)
assert.Equal(t, tc.matchErr, err != nil, "Unexpected error for query '%s' match %v", tc.s, tc.events)
assert.False(t, match, "Query '%s' should not match %v", tc.s, tc.events)
match, err := q.Matches(rawEvents)
require.Equal(t, tc.matchErr, err != nil, "Unexpected error for query '%s' match %v", tc.s, tc.events)
require.False(t, match, "Query '%s' should not match %v", tc.s, tc.events)
}
}
}
func TestMustParse(t *testing.T) {
assert.Panics(t, func() { query.MustParse("=") })
assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") })
require.Panics(t, func() { query.MustParse("=") })
require.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") })
}
func TestConditions(t *testing.T) {
@@ -217,6 +242,6 @@ func TestConditions(t *testing.T) {
c, err := q.Conditions()
require.NoError(t, err)
assert.Equal(t, tc.conditions, c)
require.Equal(t, tc.conditions, c)
}
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/google/uuid"
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
)
@@ -89,10 +90,10 @@ func (s *Subscription) cancel(err error) {
type Message struct {
subID string
data interface{}
events map[string][]string
events []types.Event
}
func NewMessage(subID string, data interface{}, events map[string][]string) Message {
func NewMessage(subID string, data interface{}, events []types.Event) Message {
return Message{
subID: subID,
data: data,
@@ -108,4 +109,4 @@ func (msg Message) SubscriptionID() string { return msg.subID }
func (msg Message) Data() interface{} { return msg.data }
// Events returns events, which matched the client's query.
func (msg Message) Events() map[string][]string { return msg.events }
func (msg Message) Events() []types.Event { return msg.events }