mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-05 04:55:18 +00:00
rpc/jsonrpc/server: ws server optimizations (#5312)
* docs: goleveldb is much more stable now Refs https://github.com/syndtr/goleveldb/issues/226#issuecomment-682495490 * rpc/core/events: make sure WS client receives every event previously, if the write buffer was full, the response would've been lost without any trace (log msg, etc.) * rpc/jsonrpc/server: set defaultWSWriteChanCapacity to 1 Refs #3905 Closes #3829 setting write buffer capacity to 1 makes transactions count per block more stable and also reduces the pauses length by 20s. before: https://github.com/tendermint/tendermint/issues/3905#issuecomment-681854328 net.Read - 20s after: net.Read - 0.66s * rpc/jsonrpc/server: buffer writes and avoid io.ReadAll during read
This commit is contained in:
@@ -28,7 +28,7 @@ func MakeTxKV() ([]byte, []byte, []byte) {
|
||||
|
||||
func TestHeaderEvents(t *testing.T) {
|
||||
for i, c := range GetClients() {
|
||||
i, c := i, c // capture params
|
||||
i, c := i, c
|
||||
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
||||
// start for this test it if it wasn't already running
|
||||
if !c.IsRunning() {
|
||||
@@ -48,35 +48,39 @@ func TestHeaderEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to new blocks and make sure height increments by 1
|
||||
func TestBlockEvents(t *testing.T) {
|
||||
for i, c := range GetClients() {
|
||||
i, c := i, c // capture params
|
||||
for _, c := range GetClients() {
|
||||
c := c
|
||||
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
||||
|
||||
// start for this test it if it wasn't already running
|
||||
if !c.IsRunning() {
|
||||
// if so, then we start it, listen, and stop it.
|
||||
err := c.Start()
|
||||
require.Nil(t, err, "%d: %+v", i, err)
|
||||
require.Nil(t, err)
|
||||
defer c.Stop()
|
||||
}
|
||||
|
||||
// listen for a new block; ensure height increases by 1
|
||||
const subscriber = "TestBlockEvents"
|
||||
|
||||
eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlock).String())
|
||||
require.NoError(t, err)
|
||||
defer c.UnsubscribeAll(context.Background(), subscriber)
|
||||
|
||||
var firstBlockHeight int64
|
||||
for j := 0; j < 3; j++ {
|
||||
evtTyp := types.EventNewBlock
|
||||
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
|
||||
require.Nil(t, err, "%d: %+v", j, err)
|
||||
blockEvent, ok := evt.(types.EventDataNewBlock)
|
||||
require.True(t, ok, "%d: %#v", j, evt)
|
||||
for i := int64(0); i < 3; i++ {
|
||||
event := <-eventCh
|
||||
blockEvent, ok := event.Data.(types.EventDataNewBlock)
|
||||
require.True(t, ok)
|
||||
|
||||
block := blockEvent.Block
|
||||
if j == 0 {
|
||||
|
||||
if firstBlockHeight == 0 {
|
||||
firstBlockHeight = block.Header.Height
|
||||
continue
|
||||
}
|
||||
|
||||
require.Equal(t, block.Header.Height, firstBlockHeight+int64(j))
|
||||
require.Equal(t, firstBlockHeight+i, block.Header.Height)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -86,48 +90,48 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "a
|
||||
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
|
||||
|
||||
func testTxEventsSent(t *testing.T, broadcastMethod string) {
|
||||
for i, c := range GetClients() {
|
||||
i, c := i, c // capture params
|
||||
for _, c := range GetClients() {
|
||||
c := c
|
||||
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
||||
|
||||
// start for this test it if it wasn't already running
|
||||
if !c.IsRunning() {
|
||||
// if so, then we start it, listen, and stop it.
|
||||
err := c.Start()
|
||||
require.Nil(t, err, "%d: %+v", i, err)
|
||||
require.Nil(t, err)
|
||||
defer c.Stop()
|
||||
}
|
||||
|
||||
// wait for the client subscription to get set up
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// make the tx
|
||||
_, _, tx := MakeTxKV()
|
||||
evtTyp := types.EventTx
|
||||
|
||||
// send
|
||||
var (
|
||||
txres *ctypes.ResultBroadcastTx
|
||||
err error
|
||||
)
|
||||
switch broadcastMethod {
|
||||
case "async":
|
||||
txres, err = c.BroadcastTxAsync(tx)
|
||||
case "sync":
|
||||
txres, err = c.BroadcastTxSync(tx)
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, txres.Code, abci.CodeTypeOK)
|
||||
go func() {
|
||||
var (
|
||||
txres *ctypes.ResultBroadcastTx
|
||||
err error
|
||||
)
|
||||
switch broadcastMethod {
|
||||
case "async":
|
||||
txres, err = c.BroadcastTxAsync(tx)
|
||||
case "sync":
|
||||
txres, err = c.BroadcastTxSync(tx)
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
|
||||
}
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, txres.Code, abci.CodeTypeOK)
|
||||
}
|
||||
}()
|
||||
|
||||
// and wait for confirmation
|
||||
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
|
||||
require.Nil(t, err, "%d: %+v", i, err)
|
||||
evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout)
|
||||
require.Nil(t, err)
|
||||
|
||||
// and make sure it has the proper info
|
||||
txe, ok := evt.(types.EventDataTx)
|
||||
require.True(t, ok, "%d: %#v", i, evt)
|
||||
require.True(t, ok)
|
||||
|
||||
// make sure this is the proper tx
|
||||
require.EqualValues(t, tx, txe.Tx)
|
||||
require.True(t, txe.Result.IsOK())
|
||||
|
||||
@@ -18,6 +18,7 @@ func TestMain(m *testing.M) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
app := kvstore.NewPersistentKVStoreApplication(dir)
|
||||
node = rpctest.StartTendermint(app)
|
||||
|
||||
@@ -25,5 +26,6 @@ func TestMain(m *testing.M) {
|
||||
|
||||
// and shut down proper at the end
|
||||
rpctest.StopTendermint(node)
|
||||
_ = os.RemoveAll(dir)
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package core
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
@@ -47,12 +48,16 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
|
||||
for {
|
||||
select {
|
||||
case msg := <-sub.Out():
|
||||
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()}
|
||||
ctx.WSConn.TryWriteRPCResponse(
|
||||
rpctypes.NewRPCSuccessResponse(
|
||||
subscriptionID,
|
||||
resultEvent,
|
||||
))
|
||||
var (
|
||||
resultEvent = &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()}
|
||||
resp = rpctypes.NewRPCSuccessResponse(subscriptionID, resultEvent)
|
||||
)
|
||||
writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil {
|
||||
env.Logger.Info("Can't write response (slow client)",
|
||||
"to", addr, "subscriptionID", subscriptionID, "err", err)
|
||||
}
|
||||
case <-sub.Cancelled():
|
||||
if sub.Err() != tmpubsub.ErrUnsubscribed {
|
||||
var reason string
|
||||
@@ -61,11 +66,14 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
|
||||
} else {
|
||||
reason = sub.Err().Error()
|
||||
}
|
||||
ctx.WSConn.TryWriteRPCResponse(
|
||||
rpctypes.RPCServerError(
|
||||
subscriptionID,
|
||||
fmt.Errorf("subscription was cancelled (reason: %s)", reason),
|
||||
))
|
||||
var (
|
||||
err = fmt.Errorf("subscription was cancelled (reason: %s)", reason)
|
||||
resp = rpctypes.RPCServerError(subscriptionID, err)
|
||||
)
|
||||
if ok := ctx.WSConn.TryWriteRPCResponse(resp); !ok {
|
||||
env.Logger.Info("Can't write response (slow client)",
|
||||
"to", addr, "subscriptionID", subscriptionID, "err", err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
@@ -21,12 +22,16 @@ import (
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
const (
|
||||
defaultWSWriteChanCapacity = 1000
|
||||
defaultWSWriteChanCapacity = 100
|
||||
defaultWSWriteWait = 10 * time.Second
|
||||
defaultWSReadWait = 30 * time.Second
|
||||
defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
|
||||
)
|
||||
|
||||
var (
|
||||
newline = []byte{'\n'}
|
||||
)
|
||||
|
||||
// WebsocketManager provides a WS handler for incoming connections and passes a
|
||||
// map of functions along with any additional params to new connections.
|
||||
// NOTE: The websocket path is defined externally, e.g. in node/node.go
|
||||
@@ -249,17 +254,22 @@ func (wsc *wsConnection) GetRemoteAddr() string {
|
||||
return wsc.remoteAddr
|
||||
}
|
||||
|
||||
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
|
||||
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is
|
||||
// accepted.
|
||||
// It implements WSRPCConnection. It is Goroutine-safe.
|
||||
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
||||
func (wsc *wsConnection) WriteRPCResponse(ctx context.Context, resp types.RPCResponse) error {
|
||||
select {
|
||||
case <-wsc.Quit():
|
||||
return
|
||||
return errors.New("connection was stopped")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case wsc.writeChan <- resp:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block.
|
||||
// TryWriteRPCResponse attempts to push a response to the writeChan, but does
|
||||
// not block.
|
||||
// It implements WSRPCConnection. It is Goroutine-safe
|
||||
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
||||
select {
|
||||
@@ -284,6 +294,9 @@ func (wsc *wsConnection) Context() context.Context {
|
||||
|
||||
// Read from the socket and subscribe to or unsubscribe from events
|
||||
func (wsc *wsConnection) readRoutine() {
|
||||
// readRoutine will block until response is written or WS connection is closed
|
||||
writeCtx := context.Background()
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err, ok := r.(error)
|
||||
@@ -291,7 +304,7 @@ func (wsc *wsConnection) readRoutine() {
|
||||
err = fmt.Errorf("WSJSONRPC: %v", r)
|
||||
}
|
||||
wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack()))
|
||||
wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCIntID(-1), err))
|
||||
wsc.WriteRPCResponse(writeCtx, types.RPCInternalError(types.JSONRPCIntID(-1), err))
|
||||
go wsc.readRoutine()
|
||||
}
|
||||
}()
|
||||
@@ -309,8 +322,8 @@ func (wsc *wsConnection) readRoutine() {
|
||||
if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil {
|
||||
wsc.Logger.Error("failed to set read deadline", "err", err)
|
||||
}
|
||||
var in []byte
|
||||
_, in, err := wsc.baseConn.ReadMessage()
|
||||
|
||||
_, r, err := wsc.baseConn.NextReader()
|
||||
if err != nil {
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
wsc.Logger.Info("Client closed the connection")
|
||||
@@ -322,10 +335,11 @@ func (wsc *wsConnection) readRoutine() {
|
||||
return
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(r)
|
||||
var request types.RPCRequest
|
||||
err = json.Unmarshal(in, &request)
|
||||
err = dec.Decode(&request)
|
||||
if err != nil {
|
||||
wsc.WriteRPCResponse(types.RPCParseError(fmt.Errorf("error unmarshaling request: %w", err)))
|
||||
wsc.WriteRPCResponse(writeCtx, types.RPCParseError(fmt.Errorf("error unmarshaling request: %w", err)))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -342,7 +356,7 @@ func (wsc *wsConnection) readRoutine() {
|
||||
// Now, fetch the RPCFunc and execute it.
|
||||
rpcFunc := wsc.funcMap[request.Method]
|
||||
if rpcFunc == nil {
|
||||
wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID))
|
||||
wsc.WriteRPCResponse(writeCtx, types.RPCMethodNotFoundError(request.ID))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -351,7 +365,7 @@ func (wsc *wsConnection) readRoutine() {
|
||||
if len(request.Params) > 0 {
|
||||
fnArgs, err := jsonParamsToArgs(rpcFunc, request.Params)
|
||||
if err != nil {
|
||||
wsc.WriteRPCResponse(
|
||||
wsc.WriteRPCResponse(writeCtx,
|
||||
types.RPCInternalError(request.ID, fmt.Errorf("error converting json params to arguments: %w", err)),
|
||||
)
|
||||
continue
|
||||
@@ -366,11 +380,11 @@ func (wsc *wsConnection) readRoutine() {
|
||||
|
||||
result, err := unreflectResult(returns)
|
||||
if err != nil {
|
||||
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err))
|
||||
wsc.WriteRPCResponse(writeCtx, types.RPCInternalError(request.ID, err))
|
||||
continue
|
||||
}
|
||||
|
||||
wsc.WriteRPCResponse(types.NewRPCSuccessResponse(request.ID, result))
|
||||
wsc.WriteRPCResponse(writeCtx, types.NewRPCSuccessResponse(request.ID, result))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -378,9 +392,7 @@ func (wsc *wsConnection) readRoutine() {
|
||||
// receives on a write channel and writes out on the socket
|
||||
func (wsc *wsConnection) writeRoutine() {
|
||||
pingTicker := time.NewTicker(wsc.pingPeriod)
|
||||
defer func() {
|
||||
pingTicker.Stop()
|
||||
}()
|
||||
defer pingTicker.Stop()
|
||||
|
||||
// https://github.com/gorilla/websocket/issues/97
|
||||
pongs := make(chan string, 1)
|
||||
@@ -410,11 +422,40 @@ func (wsc *wsConnection) writeRoutine() {
|
||||
return
|
||||
}
|
||||
case msg := <-wsc.writeChan:
|
||||
if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil {
|
||||
wsc.Logger.Error("Failed to set write deadline", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
jsonBytes, err := json.MarshalIndent(msg, "", " ")
|
||||
if err != nil {
|
||||
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
|
||||
} else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
|
||||
wsc.Logger.Error("Failed to write response", "msg", msg, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
w, err := wsc.baseConn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
wsc.Logger.Error("Can't get NextWriter", "err", err)
|
||||
return
|
||||
}
|
||||
w.Write(jsonBytes)
|
||||
|
||||
// Add queued messages to the current websocket message.
|
||||
n := len(wsc.writeChan)
|
||||
for i := 0; i < n; i++ {
|
||||
w.Write(newline)
|
||||
|
||||
msg = <-wsc.writeChan
|
||||
jsonBytes, err = json.MarshalIndent(msg, "", " ")
|
||||
if err != nil {
|
||||
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
|
||||
continue
|
||||
}
|
||||
w.Write(jsonBytes)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
wsc.Logger.Error("Can't close NextWriter", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCRes
|
||||
|
||||
func (resp RPCResponse) String() string {
|
||||
if resp.Error == nil {
|
||||
return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Result)
|
||||
return fmt.Sprintf("RPCResponse{%s %X}", resp.ID, resp.Result)
|
||||
}
|
||||
return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Error)
|
||||
}
|
||||
@@ -246,10 +246,10 @@ func RPCServerError(id jsonrpcid, err error) RPCResponse {
|
||||
type WSRPCConnection interface {
|
||||
// GetRemoteAddr returns a remote address of the connection.
|
||||
GetRemoteAddr() string
|
||||
// WriteRPCResponse writes the resp onto connection (BLOCKING).
|
||||
WriteRPCResponse(resp RPCResponse)
|
||||
// TryWriteRPCResponse tries to write the resp onto connection (NON-BLOCKING).
|
||||
TryWriteRPCResponse(resp RPCResponse) bool
|
||||
// WriteRPCResponse writes the response onto connection (BLOCKING).
|
||||
WriteRPCResponse(context.Context, RPCResponse) error
|
||||
// TryWriteRPCResponse tries to write the response onto connection (NON-BLOCKING).
|
||||
TryWriteRPCResponse(RPCResponse) bool
|
||||
// Context returns the connection's context.
|
||||
Context() context.Context
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user