From 59ec3d91e4f60e5f540feab6bbd0cd107abf8eca Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 4 Sep 2020 10:58:47 +0400 Subject: [PATCH] rpc/jsonrpc/server: ws server optimizations (#5312) * docs: goleveldb is much more stable now Refs https://github.com/syndtr/goleveldb/issues/226#issuecomment-682495490 * rpc/core/events: make sure WS client receives every event previously, if the write buffer was full, the response would've been lost without any trace (log msg, etc.) * rpc/jsonrpc/server: set defaultWSWriteChanCapacity to 1 Refs #3905 Closes #3829 setting write buffer capacity to 1 makes transactions count per block more stable and also reduces the pauses length by 20s. before: https://github.com/tendermint/tendermint/issues/3905#issuecomment-681854328 net.Read - 20s after: net.Read - 0.66s * rpc/jsonrpc/server: buffer writes and avoid io.ReadAll during read --- docs/tendermint-core/running-in-production.md | 5 +- rpc/client/event_test.go | 82 ++++++++++--------- rpc/client/main_test.go | 2 + rpc/core/events.go | 30 ++++--- rpc/jsonrpc/server/ws_handler.go | 79 +++++++++++++----- rpc/jsonrpc/types/types.go | 10 +-- 6 files changed, 133 insertions(+), 75 deletions(-) diff --git a/docs/tendermint-core/running-in-production.md b/docs/tendermint-core/running-in-production.md index 1170fe257..41f40641e 100644 --- a/docs/tendermint-core/running-in-production.md +++ b/docs/tendermint-core/running-in-production.md @@ -7,7 +7,10 @@ order: 4 ## Database By default, Tendermint uses the `syndtr/goleveldb` package for its in-process -key-value database. +key-value database. If you want maximal performance, it may be best to install +the real C-implementation of LevelDB and compile Tendermint to use that using +`make build TENDERMINT_BUILD_OPTIONS=cleveldb`. See the [install +instructions](../introduction/install.md) for details. Tendermint keeps multiple distinct databases in the `$TMROOT/data`: diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index ae04e34c2..2d668c409 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -28,7 +28,7 @@ func MakeTxKV() ([]byte, []byte, []byte) { func TestHeaderEvents(t *testing.T) { for i, c := range GetClients() { - i, c := i, c // capture params + i, c := i, c t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { // start for this test it if it wasn't already running if !c.IsRunning() { @@ -48,35 +48,39 @@ func TestHeaderEvents(t *testing.T) { } } +// subscribe to new blocks and make sure height increments by 1 func TestBlockEvents(t *testing.T) { - for i, c := range GetClients() { - i, c := i, c // capture params + for _, c := range GetClients() { + c := c t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { // start for this test it if it wasn't already running if !c.IsRunning() { // if so, then we start it, listen, and stop it. err := c.Start() - require.Nil(t, err, "%d: %+v", i, err) + require.Nil(t, err) defer c.Stop() } - // listen for a new block; ensure height increases by 1 + const subscriber = "TestBlockEvents" + + eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlock).String()) + require.NoError(t, err) + defer c.UnsubscribeAll(context.Background(), subscriber) + var firstBlockHeight int64 - for j := 0; j < 3; j++ { - evtTyp := types.EventNewBlock - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) - require.Nil(t, err, "%d: %+v", j, err) - blockEvent, ok := evt.(types.EventDataNewBlock) - require.True(t, ok, "%d: %#v", j, evt) + for i := int64(0); i < 3; i++ { + event := <-eventCh + blockEvent, ok := event.Data.(types.EventDataNewBlock) + require.True(t, ok) block := blockEvent.Block - if j == 0 { + + if firstBlockHeight == 0 { firstBlockHeight = block.Header.Height - continue } - require.Equal(t, block.Header.Height, firstBlockHeight+int64(j)) + require.Equal(t, firstBlockHeight+i, block.Header.Height) } }) } @@ -86,48 +90,48 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "a func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") } func testTxEventsSent(t *testing.T, broadcastMethod string) { - for i, c := range GetClients() { - i, c := i, c // capture params + for _, c := range GetClients() { + c := c t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { // start for this test it if it wasn't already running if !c.IsRunning() { // if so, then we start it, listen, and stop it. err := c.Start() - require.Nil(t, err, "%d: %+v", i, err) + require.Nil(t, err) defer c.Stop() } - // wait for the client subscription to get set up - time.Sleep(100 * time.Millisecond) - // make the tx _, _, tx := MakeTxKV() - evtTyp := types.EventTx // send - var ( - txres *ctypes.ResultBroadcastTx - err error - ) - switch broadcastMethod { - case "async": - txres, err = c.BroadcastTxAsync(tx) - case "sync": - txres, err = c.BroadcastTxSync(tx) - default: - panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod)) - } - - require.NoError(t, err) - require.Equal(t, txres.Code, abci.CodeTypeOK) + go func() { + var ( + txres *ctypes.ResultBroadcastTx + err error + ) + switch broadcastMethod { + case "async": + txres, err = c.BroadcastTxAsync(tx) + case "sync": + txres, err = c.BroadcastTxSync(tx) + default: + panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod)) + } + if assert.NoError(t, err) { + assert.Equal(t, txres.Code, abci.CodeTypeOK) + } + }() // and wait for confirmation - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) - require.Nil(t, err, "%d: %+v", i, err) + evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout) + require.Nil(t, err) + // and make sure it has the proper info txe, ok := evt.(types.EventDataTx) - require.True(t, ok, "%d: %#v", i, evt) + require.True(t, ok) + // make sure this is the proper tx require.EqualValues(t, tx, txe.Tx) require.True(t, txe.Result.IsOK()) diff --git a/rpc/client/main_test.go b/rpc/client/main_test.go index d600b32f8..c97311c81 100644 --- a/rpc/client/main_test.go +++ b/rpc/client/main_test.go @@ -18,6 +18,7 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } + app := kvstore.NewPersistentKVStoreApplication(dir) node = rpctest.StartTendermint(app) @@ -25,5 +26,6 @@ func TestMain(m *testing.M) { // and shut down proper at the end rpctest.StopTendermint(node) + _ = os.RemoveAll(dir) os.Exit(code) } diff --git a/rpc/core/events.go b/rpc/core/events.go index 33a53e42d..5e6b3db57 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "time" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" @@ -47,12 +48,16 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er for { select { case msg := <-sub.Out(): - resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()} - ctx.WSConn.TryWriteRPCResponse( - rpctypes.NewRPCSuccessResponse( - subscriptionID, - resultEvent, - )) + var ( + resultEvent = &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()} + resp = rpctypes.NewRPCSuccessResponse(subscriptionID, resultEvent) + ) + writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil { + env.Logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } case <-sub.Cancelled(): if sub.Err() != tmpubsub.ErrUnsubscribed { var reason string @@ -61,11 +66,14 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er } else { reason = sub.Err().Error() } - ctx.WSConn.TryWriteRPCResponse( - rpctypes.RPCServerError( - subscriptionID, - fmt.Errorf("subscription was cancelled (reason: %s)", reason), - )) + var ( + err = fmt.Errorf("subscription was cancelled (reason: %s)", reason) + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if ok := ctx.WSConn.TryWriteRPCResponse(resp); !ok { + env.Logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } } return } diff --git a/rpc/jsonrpc/server/ws_handler.go b/rpc/jsonrpc/server/ws_handler.go index 19b4c412d..84ca16720 100644 --- a/rpc/jsonrpc/server/ws_handler.go +++ b/rpc/jsonrpc/server/ws_handler.go @@ -3,6 +3,7 @@ package server import ( "context" "encoding/json" + "errors" "fmt" "net/http" "reflect" @@ -21,12 +22,16 @@ import ( /////////////////////////////////////////////////////////////////////////////// const ( - defaultWSWriteChanCapacity = 1000 + defaultWSWriteChanCapacity = 100 defaultWSWriteWait = 10 * time.Second defaultWSReadWait = 30 * time.Second defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 ) +var ( + newline = []byte{'\n'} +) + // WebsocketManager provides a WS handler for incoming connections and passes a // map of functions along with any additional params to new connections. // NOTE: The websocket path is defined externally, e.g. in node/node.go @@ -249,17 +254,22 @@ func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } -// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. +// WriteRPCResponse pushes a response to the writeChan, and blocks until it is +// accepted. // It implements WSRPCConnection. It is Goroutine-safe. -func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { +func (wsc *wsConnection) WriteRPCResponse(ctx context.Context, resp types.RPCResponse) error { select { case <-wsc.Quit(): - return + return errors.New("connection was stopped") + case <-ctx.Done(): + return ctx.Err() case wsc.writeChan <- resp: + return nil } } -// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. +// TryWriteRPCResponse attempts to push a response to the writeChan, but does +// not block. // It implements WSRPCConnection. It is Goroutine-safe func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { select { @@ -284,6 +294,9 @@ func (wsc *wsConnection) Context() context.Context { // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { + // readRoutine will block until response is written or WS connection is closed + writeCtx := context.Background() + defer func() { if r := recover(); r != nil { err, ok := r.(error) @@ -291,7 +304,7 @@ func (wsc *wsConnection) readRoutine() { err = fmt.Errorf("WSJSONRPC: %v", r) } wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) - wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCIntID(-1), err)) + wsc.WriteRPCResponse(writeCtx, types.RPCInternalError(types.JSONRPCIntID(-1), err)) go wsc.readRoutine() } }() @@ -309,8 +322,8 @@ func (wsc *wsConnection) readRoutine() { if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil { wsc.Logger.Error("failed to set read deadline", "err", err) } - var in []byte - _, in, err := wsc.baseConn.ReadMessage() + + _, r, err := wsc.baseConn.NextReader() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { wsc.Logger.Info("Client closed the connection") @@ -322,10 +335,11 @@ func (wsc *wsConnection) readRoutine() { return } + dec := json.NewDecoder(r) var request types.RPCRequest - err = json.Unmarshal(in, &request) + err = dec.Decode(&request) if err != nil { - wsc.WriteRPCResponse(types.RPCParseError(fmt.Errorf("error unmarshaling request: %w", err))) + wsc.WriteRPCResponse(writeCtx, types.RPCParseError(fmt.Errorf("error unmarshaling request: %w", err))) continue } @@ -342,7 +356,7 @@ func (wsc *wsConnection) readRoutine() { // Now, fetch the RPCFunc and execute it. rpcFunc := wsc.funcMap[request.Method] if rpcFunc == nil { - wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) + wsc.WriteRPCResponse(writeCtx, types.RPCMethodNotFoundError(request.ID)) continue } @@ -351,7 +365,7 @@ func (wsc *wsConnection) readRoutine() { if len(request.Params) > 0 { fnArgs, err := jsonParamsToArgs(rpcFunc, request.Params) if err != nil { - wsc.WriteRPCResponse( + wsc.WriteRPCResponse(writeCtx, types.RPCInternalError(request.ID, fmt.Errorf("error converting json params to arguments: %w", err)), ) continue @@ -366,11 +380,11 @@ func (wsc *wsConnection) readRoutine() { result, err := unreflectResult(returns) if err != nil { - wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) + wsc.WriteRPCResponse(writeCtx, types.RPCInternalError(request.ID, err)) continue } - wsc.WriteRPCResponse(types.NewRPCSuccessResponse(request.ID, result)) + wsc.WriteRPCResponse(writeCtx, types.NewRPCSuccessResponse(request.ID, result)) } } } @@ -378,9 +392,7 @@ func (wsc *wsConnection) readRoutine() { // receives on a write channel and writes out on the socket func (wsc *wsConnection) writeRoutine() { pingTicker := time.NewTicker(wsc.pingPeriod) - defer func() { - pingTicker.Stop() - }() + defer pingTicker.Stop() // https://github.com/gorilla/websocket/issues/97 pongs := make(chan string, 1) @@ -410,11 +422,40 @@ func (wsc *wsConnection) writeRoutine() { return } case msg := <-wsc.writeChan: + if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil { + wsc.Logger.Error("Failed to set write deadline", "err", err) + return + } + jsonBytes, err := json.MarshalIndent(msg, "", " ") if err != nil { wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) - } else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { - wsc.Logger.Error("Failed to write response", "msg", msg, "err", err) + continue + } + + w, err := wsc.baseConn.NextWriter(websocket.TextMessage) + if err != nil { + wsc.Logger.Error("Can't get NextWriter", "err", err) + return + } + w.Write(jsonBytes) + + // Add queued messages to the current websocket message. + n := len(wsc.writeChan) + for i := 0; i < n; i++ { + w.Write(newline) + + msg = <-wsc.writeChan + jsonBytes, err = json.MarshalIndent(msg, "", " ") + if err != nil { + wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) + continue + } + w.Write(jsonBytes) + } + + if err := w.Close(); err != nil { + wsc.Logger.Error("Can't close NextWriter", "err", err) return } } diff --git a/rpc/jsonrpc/types/types.go b/rpc/jsonrpc/types/types.go index 1dae8fbdb..54d17155c 100644 --- a/rpc/jsonrpc/types/types.go +++ b/rpc/jsonrpc/types/types.go @@ -205,7 +205,7 @@ func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCRes func (resp RPCResponse) String() string { if resp.Error == nil { - return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Result) + return fmt.Sprintf("RPCResponse{%s %X}", resp.ID, resp.Result) } return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Error) } @@ -246,10 +246,10 @@ func RPCServerError(id jsonrpcid, err error) RPCResponse { type WSRPCConnection interface { // GetRemoteAddr returns a remote address of the connection. GetRemoteAddr() string - // WriteRPCResponse writes the resp onto connection (BLOCKING). - WriteRPCResponse(resp RPCResponse) - // TryWriteRPCResponse tries to write the resp onto connection (NON-BLOCKING). - TryWriteRPCResponse(resp RPCResponse) bool + // WriteRPCResponse writes the response onto connection (BLOCKING). + WriteRPCResponse(context.Context, RPCResponse) error + // TryWriteRPCResponse tries to write the response onto connection (NON-BLOCKING). + TryWriteRPCResponse(RPCResponse) bool // Context returns the connection's context. Context() context.Context }