From 12e3419f2bd3f52f0b54828b2b01ee207cd5ff77 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 9 Nov 2021 15:35:45 -0500 Subject: [PATCH] rpc: Add experimental config params to allow for subscription buffer size control (tm v0.34.x) (#7230) A workaround for #6729. Add parameters to control buffer sizes for event subscription RPC clients. On some networks, buffering causes clients to be dropped and/or events to be lost. For additional context, see the discussion on #7188. - Add experimental_subscription_buffer_size config parameter - Add experimental_websocket_write_buffer_size config parameter - Add experimental_close_on_slow_client config parameter Co-authored-by: M. J. Fromberger --- CHANGELOG_PENDING.md | 4 ++- config/config.go | 40 ++++++++++++++++++++++++++++ config/toml.go | 27 +++++++++++++++++++ evidence/mocks/block_store.go | 1 + libs/pubsub/subscription.go | 2 +- node/node.go | 1 + proxy/mocks/app_conn_consensus.go | 1 + proxy/mocks/app_conn_mempool.go | 1 + proxy/mocks/client_creator.go | 1 + rpc/core/events.go | 24 ++++++++++++----- state/indexer/block/kv/kv_test.go | 3 ++- state/indexer/block/kv/util.go | 1 + state/indexer/sink/psql/psql.go | 1 + state/indexer/sink/psql/psql_test.go | 1 + state/mocks/evidence_pool.go | 1 + state/mocks/store.go | 1 + statesync/mocks/state_provider.go | 1 + 17 files changed, 101 insertions(+), 10 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 09f0402d4..02d3bc76f 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -2,7 +2,7 @@ ## v0.34.15 -Special thanks to external contributors on this release: +Special thanks to external contributors on this release: @thanethomson Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint). @@ -10,6 +10,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - CLI/RPC/Config + - [config] \#7230 rpc: Add experimental config params to allow for subscription buffer size control (@thanethomson). + - Apps - P2P Protocol diff --git a/config/config.go b/config/config.go index e1b72a1e9..5162d2d4f 100644 --- a/config/config.go +++ b/config/config.go @@ -52,6 +52,9 @@ var ( defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName) defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName) + + minSubscriptionBufferSize = 100 + defaultSubscriptionBufferSize = 200 ) // Config defines the top level configuration for a Tendermint node @@ -342,6 +345,29 @@ type RPCConfig struct { // to the estimated maximum number of broadcast_tx_commit calls per block. MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"` + // The number of events that can be buffered per subscription before + // returning `ErrOutOfCapacity`. + SubscriptionBufferSize int `mapstructure:"experimental_subscription_buffer_size"` + + // The maximum number of responses that can be buffered per WebSocket + // client. If clients cannot read from the WebSocket endpoint fast enough, + // they will be disconnected, so increasing this parameter may reduce the + // chances of them being disconnected (but will cause the node to use more + // memory). + // + // Must be at least the same as `SubscriptionBufferSize`, otherwise + // connections may be dropped unnecessarily. + WebSocketWriteBufferSize int `mapstructure:"experimental_websocket_write_buffer_size"` + + // If a WebSocket client cannot read fast enough, at present we may + // silently drop events instead of generating an error or disconnecting the + // client. + // + // Enabling this parameter will cause the WebSocket connection to be closed + // instead if it cannot read fast enough, allowing for greater + // predictability in subscription behaviour. + CloseOnSlowClient bool `mapstructure:"experimental_close_on_slow_client"` + // How long to wait for a tx to be committed during /broadcast_tx_commit // WARNING: Using a value larger than 10s will result in increasing the // global HTTP write timeout, which applies to all connections and endpoints. @@ -391,7 +417,9 @@ func DefaultRPCConfig() *RPCConfig { MaxSubscriptionClients: 100, MaxSubscriptionsPerClient: 5, + SubscriptionBufferSize: defaultSubscriptionBufferSize, TimeoutBroadcastTxCommit: 10 * time.Second, + WebSocketWriteBufferSize: defaultSubscriptionBufferSize, MaxBodyBytes: int64(1000000), // 1MB MaxHeaderBytes: 1 << 20, // same as the net/http default @@ -425,6 +453,18 @@ func (cfg *RPCConfig) ValidateBasic() error { if cfg.MaxSubscriptionsPerClient < 0 { return errors.New("max_subscriptions_per_client can't be negative") } + if cfg.SubscriptionBufferSize < minSubscriptionBufferSize { + return fmt.Errorf( + "experimental_subscription_buffer_size must be >= %d", + minSubscriptionBufferSize, + ) + } + if cfg.WebSocketWriteBufferSize < cfg.SubscriptionBufferSize { + return fmt.Errorf( + "experimental_websocket_write_buffer_size must be >= experimental_subscription_buffer_size (%d)", + cfg.SubscriptionBufferSize, + ) + } if cfg.TimeoutBroadcastTxCommit < 0 { return errors.New("timeout_broadcast_tx_commit can't be negative") } diff --git a/config/toml.go b/config/toml.go index fef452c4c..36fa00b7d 100644 --- a/config/toml.go +++ b/config/toml.go @@ -206,6 +206,33 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }} # the estimated # maximum number of broadcast_tx_commit calls per block. max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }} +# Experimental parameter to specify the maximum number of events a node will +# buffer, per subscription, before returning an error and closing the +# subscription. Must be set to at least 100, but higher values will accommodate +# higher event throughput rates (and will use more memory). +experimental_subscription_buffer_size = {{ .RPC.SubscriptionBufferSize }} + +# Experimental parameter to specify the maximum number of RPC responses that +# can be buffered per WebSocket client. If clients cannot read from the +# WebSocket endpoint fast enough, they will be disconnected, so increasing this +# parameter may reduce the chances of them being disconnected (but will cause +# the node to use more memory). +# +# Must be at least the same as "experimental_subscription_buffer_size", +# otherwise connections could be dropped unnecessarily. This value should +# ideally be somewhat higher than "experimental_subscription_buffer_size" to +# accommodate non-subscription-related RPC responses. +experimental_websocket_write_buffer_size = {{ .RPC.WebSocketWriteBufferSize }} + +# If a WebSocket client cannot read fast enough, at present we may +# silently drop events instead of generating an error or disconnecting the +# client. +# +# Enabling this experimental parameter will cause the WebSocket connection to +# be closed instead if it cannot read fast enough, allowing for greater +# predictability in subscription behaviour. +experimental_close_on_slow_client = {{ .RPC.CloseOnSlowClient }} + # How long to wait for a tx to be committed during /broadcast_tx_commit. # WARNING: Using a value larger than 10s will result in increasing the # global HTTP write timeout, which applies to all connections and endpoints. diff --git a/evidence/mocks/block_store.go b/evidence/mocks/block_store.go index e6205939a..d145518c6 100644 --- a/evidence/mocks/block_store.go +++ b/evidence/mocks/block_store.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + types "github.com/tendermint/tendermint/types" ) diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index 1c60c55d5..8f90e177a 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -12,7 +12,7 @@ var ( // ErrOutOfCapacity is returned by Err when a client is not pulling messages // fast enough. Note the client's subscription will be terminated. - ErrOutOfCapacity = errors.New("client is not pulling messages fast enough") + ErrOutOfCapacity = errors.New("internal subscription event buffer is out of capacity") ) // A Subscription represents a client subscription for a particular query and diff --git a/node/node.go b/node/node.go index 3c950c2e3..7b9c482ae 100644 --- a/node/node.go +++ b/node/node.go @@ -1090,6 +1090,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { } }), rpcserver.ReadLimit(config.MaxBodyBytes), + rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize), ) wm.SetLogger(wmLogger) mux.HandleFunc("/websocket", wm.WebsocketHandler) diff --git a/proxy/mocks/app_conn_consensus.go b/proxy/mocks/app_conn_consensus.go index f3546c11d..343a4f540 100644 --- a/proxy/mocks/app_conn_consensus.go +++ b/proxy/mocks/app_conn_consensus.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + abcicli "github.com/tendermint/tendermint/abci/client" types "github.com/tendermint/tendermint/abci/types" diff --git a/proxy/mocks/app_conn_mempool.go b/proxy/mocks/app_conn_mempool.go index a7d3ca307..a6bef210d 100644 --- a/proxy/mocks/app_conn_mempool.go +++ b/proxy/mocks/app_conn_mempool.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + abcicli "github.com/tendermint/tendermint/abci/client" types "github.com/tendermint/tendermint/abci/types" diff --git a/proxy/mocks/client_creator.go b/proxy/mocks/client_creator.go index 499313d17..e4b924ab5 100644 --- a/proxy/mocks/client_creator.go +++ b/proxy/mocks/client_creator.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + abcicli "github.com/tendermint/tendermint/abci/client" ) diff --git a/rpc/core/events.go b/rpc/core/events.go index 5e6b3db57..69cdd4fd5 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -2,6 +2,7 @@ package core import ( "context" + "errors" "fmt" "time" @@ -11,11 +12,6 @@ import ( rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" ) -const ( - // Buffer on the Tendermint (server) side to allow some slowness in clients. - subBufferSize = 100 -) - // Subscribe for events via WebSocket. // More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { @@ -37,11 +33,13 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() - sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize) + sub, err := env.EventBus.Subscribe(subCtx, addr, q, env.Config.SubscriptionBufferSize) if err != nil { return nil, err } + closeIfSlow := env.Config.CloseOnSlowClient + // Capture the current ID, since it can change in the future. subscriptionID := ctx.JSONReq.ID go func() { @@ -57,6 +55,18 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil { env.Logger.Info("Can't write response (slow client)", "to", addr, "subscriptionID", subscriptionID, "err", err) + + if closeIfSlow { + var ( + err = errors.New("subscription was cancelled (reason: slow client)") + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + env.Logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } + return + } } case <-sub.Cancelled(): if sub.Err() != tmpubsub.ErrUnsubscribed { @@ -70,7 +80,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er err = fmt.Errorf("subscription was cancelled (reason: %s)", reason) resp = rpctypes.RPCServerError(subscriptionID, err) ) - if ok := ctx.WSConn.TryWriteRPCResponse(resp); !ok { + if !ctx.WSConn.TryWriteRPCResponse(resp) { env.Logger.Info("Can't write response (slow client)", "to", addr, "subscriptionID", subscriptionID, "err", err) } diff --git a/state/indexer/block/kv/kv_test.go b/state/indexer/block/kv/kv_test.go index eacf51c42..249eb3e5a 100644 --- a/state/indexer/block/kv/kv_test.go +++ b/state/indexer/block/kv/kv_test.go @@ -6,11 +6,12 @@ import ( "testing" "github.com/stretchr/testify/require" + db "github.com/tendermint/tm-db" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" "github.com/tendermint/tendermint/types" - db "github.com/tendermint/tm-db" ) func TestBlockIndexer(t *testing.T) { diff --git a/state/indexer/block/kv/util.go b/state/indexer/block/kv/util.go index c0b88018e..05d6fc45c 100644 --- a/state/indexer/block/kv/util.go +++ b/state/indexer/block/kv/util.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/google/orderedcode" + "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" ) diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index fb9e3190d..3552591d4 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 875a72a57..07f4cdf1e 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -18,6 +18,7 @@ import ( "github.com/ory/dockertest/docker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/types" diff --git a/state/mocks/evidence_pool.go b/state/mocks/evidence_pool.go index 9d6091cde..f8dc4f47b 100644 --- a/state/mocks/evidence_pool.go +++ b/state/mocks/evidence_pool.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + state "github.com/tendermint/tendermint/state" types "github.com/tendermint/tendermint/types" diff --git a/state/mocks/store.go b/state/mocks/store.go index 91525b223..48b5e7483 100644 --- a/state/mocks/store.go +++ b/state/mocks/store.go @@ -4,6 +4,7 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + state "github.com/tendermint/tendermint/state" tendermintstate "github.com/tendermint/tendermint/proto/tendermint/state" diff --git a/statesync/mocks/state_provider.go b/statesync/mocks/state_provider.go index 47dbb86d2..42ff6946d 100644 --- a/statesync/mocks/state_provider.go +++ b/statesync/mocks/state_provider.go @@ -6,6 +6,7 @@ import ( context "context" mock "github.com/stretchr/testify/mock" + state "github.com/tendermint/tendermint/state" types "github.com/tendermint/tendermint/types"