Files
tendermint/rpc/core/events.go
Erik Grinaker ba3a2dde37 rpc: replace Amino with new JSON encoder (#4968)
Migrates the `rpc` package to use new JSON encoder in #4955. Branched off of that PR.

Tests pass, but I haven't done any manual testing beyond that. This should be handled as part of broader 0.34 testing.
2020-06-08 12:04:05 +00:00

105 lines
3.1 KiB
Go

package core
import (
"context"
"fmt"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
const (
// Buffer on the Tendermint (server) side to allow some slowness in clients.
subBufferSize = 100
)
// Subscribe for events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()
if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
} else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
}
env.Logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
}
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()
sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize)
if err != nil {
return nil, err
}
// Capture the current ID, since it can change in the future.
subscriptionID := ctx.JSONReq.ID
go func() {
for {
select {
case msg := <-sub.Out():
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()}
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
subscriptionID,
resultEvent,
))
case <-sub.Cancelled():
if sub.Err() != tmpubsub.ErrUnsubscribed {
var reason string
if sub.Err() == nil {
reason = "Tendermint exited"
} else {
reason = sub.Err().Error()
}
ctx.WSConn.TryWriteRPCResponse(
rpctypes.RPCServerError(
subscriptionID,
fmt.Errorf("subscription was cancelled (reason: %s)", reason),
))
}
return
}
}
}()
return &ctypes.ResultSubscribe{}, nil
}
// Unsubscribe from events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe
func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
}
err = env.EventBus.Unsubscribe(context.Background(), addr, q)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}
// UnsubscribeAll from all events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all
func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env.Logger.Info("Unsubscribe from all", "remote", addr)
err := env.EventBus.UnsubscribeAll(context.Background(), addr)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}