mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-08 22:23:11 +00:00
* Update error message to correspond to changes in v0.34.x * Add buffer size and client-close config parameters Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
@@ -12,6 +12,8 @@ Special thanks to external contributors on this release:
|
||||
|
||||
- CLI/RPC/Config
|
||||
|
||||
- [config] \#7276 rpc: Add experimental config params to allow for subscription buffer size control (@thanethomson).
|
||||
|
||||
- Apps
|
||||
|
||||
- P2P Protocol
|
||||
|
||||
@@ -64,6 +64,9 @@ var (
|
||||
|
||||
defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
|
||||
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)
|
||||
|
||||
minSubscriptionBufferSize = 100
|
||||
defaultSubscriptionBufferSize = 200
|
||||
)
|
||||
|
||||
// Config defines the top level configuration for a Tendermint node
|
||||
@@ -496,6 +499,29 @@ type RPCConfig struct {
|
||||
// to the estimated maximum number of broadcast_tx_commit calls per block.
|
||||
MaxSubscriptionsPerClient int `mapstructure:"max-subscriptions-per-client"`
|
||||
|
||||
// The number of events that can be buffered per subscription before
|
||||
// returning `ErrOutOfCapacity`.
|
||||
SubscriptionBufferSize int `mapstructure:"experimental-subscription-buffer-size"`
|
||||
|
||||
// The maximum number of responses that can be buffered per WebSocket
|
||||
// client. If clients cannot read from the WebSocket endpoint fast enough,
|
||||
// they will be disconnected, so increasing this parameter may reduce the
|
||||
// chances of them being disconnected (but will cause the node to use more
|
||||
// memory).
|
||||
//
|
||||
// Must be at least the same as `SubscriptionBufferSize`, otherwise
|
||||
// connections may be dropped unnecessarily.
|
||||
WebSocketWriteBufferSize int `mapstructure:"experimental-websocket-write-buffer-size"`
|
||||
|
||||
// If a WebSocket client cannot read fast enough, at present we may
|
||||
// silently drop events instead of generating an error or disconnecting the
|
||||
// client.
|
||||
//
|
||||
// Enabling this parameter will cause the WebSocket connection to be closed
|
||||
// instead if it cannot read fast enough, allowing for greater
|
||||
// predictability in subscription behavior.
|
||||
CloseOnSlowClient bool `mapstructure:"experimental-close-on-slow-client"`
|
||||
|
||||
// How long to wait for a tx to be committed during /broadcast_tx_commit
|
||||
// WARNING: Using a value larger than 10s will result in increasing the
|
||||
// global HTTP write timeout, which applies to all connections and endpoints.
|
||||
@@ -545,7 +571,9 @@ func DefaultRPCConfig() *RPCConfig {
|
||||
|
||||
MaxSubscriptionClients: 100,
|
||||
MaxSubscriptionsPerClient: 5,
|
||||
SubscriptionBufferSize: defaultSubscriptionBufferSize,
|
||||
TimeoutBroadcastTxCommit: 10 * time.Second,
|
||||
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
|
||||
|
||||
MaxBodyBytes: int64(1000000), // 1MB
|
||||
MaxHeaderBytes: 1 << 20, // same as the net/http default
|
||||
@@ -579,6 +607,18 @@ func (cfg *RPCConfig) ValidateBasic() error {
|
||||
if cfg.MaxSubscriptionsPerClient < 0 {
|
||||
return errors.New("max-subscriptions-per-client can't be negative")
|
||||
}
|
||||
if cfg.SubscriptionBufferSize < minSubscriptionBufferSize {
|
||||
return fmt.Errorf(
|
||||
"experimental-subscription-buffer-size must be >= %d",
|
||||
minSubscriptionBufferSize,
|
||||
)
|
||||
}
|
||||
if cfg.WebSocketWriteBufferSize < cfg.SubscriptionBufferSize {
|
||||
return fmt.Errorf(
|
||||
"experimental-websocket-write-buffer-size must be >= experimental-subscription-buffer-size (%d)",
|
||||
cfg.SubscriptionBufferSize,
|
||||
)
|
||||
}
|
||||
if cfg.TimeoutBroadcastTxCommit < 0 {
|
||||
return errors.New("timeout-broadcast-tx-commit can't be negative")
|
||||
}
|
||||
|
||||
@@ -236,6 +236,33 @@ max-subscription-clients = {{ .RPC.MaxSubscriptionClients }}
|
||||
# the estimated # maximum number of broadcast_tx_commit calls per block.
|
||||
max-subscriptions-per-client = {{ .RPC.MaxSubscriptionsPerClient }}
|
||||
|
||||
# Experimental parameter to specify the maximum number of events a node will
|
||||
# buffer, per subscription, before returning an error and closing the
|
||||
# subscription. Must be set to at least 100, but higher values will accommodate
|
||||
# higher event throughput rates (and will use more memory).
|
||||
experimental-subscription-buffer-size = {{ .RPC.SubscriptionBufferSize }}
|
||||
|
||||
# Experimental parameter to specify the maximum number of RPC responses that
|
||||
# can be buffered per WebSocket client. If clients cannot read from the
|
||||
# WebSocket endpoint fast enough, they will be disconnected, so increasing this
|
||||
# parameter may reduce the chances of them being disconnected (but will cause
|
||||
# the node to use more memory).
|
||||
#
|
||||
# Must be at least the same as "experimental-subscription-buffer-size",
|
||||
# otherwise connections could be dropped unnecessarily. This value should
|
||||
# ideally be somewhat higher than "experimental-subscription-buffer-size" to
|
||||
# accommodate non-subscription-related RPC responses.
|
||||
experimental-websocket-write-buffer-size = {{ .RPC.WebSocketWriteBufferSize }}
|
||||
|
||||
# If a WebSocket client cannot read fast enough, at present we may
|
||||
# silently drop events instead of generating an error or disconnecting the
|
||||
# client.
|
||||
#
|
||||
# Enabling this experimental parameter will cause the WebSocket connection to
|
||||
# be closed instead if it cannot read fast enough, allowing for greater
|
||||
# predictability in subscription behavior.
|
||||
experimental-close-on-slow-client = {{ .RPC.CloseOnSlowClient }}
|
||||
|
||||
# How long to wait for a tx to be committed during /broadcast_tx_commit.
|
||||
# WARNING: Using a value larger than 10s will result in increasing the
|
||||
# global HTTP write timeout, which applies to all connections and endpoints.
|
||||
|
||||
@@ -13,9 +13,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Buffer on the Tendermint (server) side to allow some slowness in clients.
|
||||
subBufferSize = 100
|
||||
|
||||
// maxQueryLength is the maximum length of a query string that will be
|
||||
// accepted. This is just a safety check to avoid outlandish queries.
|
||||
maxQueryLength = 512
|
||||
@@ -44,11 +41,13 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*coretyp
|
||||
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
|
||||
defer cancel()
|
||||
|
||||
sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize)
|
||||
sub, err := env.EventBus.Subscribe(subCtx, addr, q, env.Config.SubscriptionBufferSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
closeIfSlow := env.Config.CloseOnSlowClient
|
||||
|
||||
// Capture the current ID, since it can change in the future.
|
||||
subscriptionID := ctx.JSONReq.ID
|
||||
go func() {
|
||||
@@ -64,6 +63,18 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*coretyp
|
||||
if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil {
|
||||
env.Logger.Info("Can't write response (slow client)",
|
||||
"to", addr, "subscriptionID", subscriptionID, "err", err)
|
||||
|
||||
if closeIfSlow {
|
||||
var (
|
||||
err = errors.New("subscription was canceled (reason: slow client)")
|
||||
resp = rpctypes.RPCServerError(subscriptionID, err)
|
||||
)
|
||||
if !ctx.WSConn.TryWriteRPCResponse(resp) {
|
||||
env.Logger.Info("Can't write response (slow client)",
|
||||
"to", addr, "subscriptionID", subscriptionID, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-sub.Canceled():
|
||||
if sub.Err() != tmpubsub.ErrUnsubscribed {
|
||||
|
||||
@@ -15,7 +15,7 @@ var (
|
||||
|
||||
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
|
||||
// fast enough. Note the client's subscription will be terminated.
|
||||
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
|
||||
ErrOutOfCapacity = errors.New("internal subscription event buffer is out of capacity")
|
||||
)
|
||||
|
||||
// A Subscription represents a client subscription for a particular query and
|
||||
|
||||
@@ -889,6 +889,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
}
|
||||
}),
|
||||
rpcserver.ReadLimit(cfg.MaxBodyBytes),
|
||||
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
|
||||
)
|
||||
wm.SetLogger(wmLogger)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
|
||||
Reference in New Issue
Block a user