mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-05 19:40:44 +00:00
Add writeRPCResponse and writeHTTPResponse helpers, that handle the way RPC responses are written to HTTP replies. These replace the exported helpers. Visible effects: - JSON results are now marshaled without indentation. - HTTP status codes are now normalized. - Cache control headers are no longer set. Details: - When writing a response to a URL (GET) request, do not marshal the whole JSON-RPC object into the body, only encode the result or the error object. This is a user-visible change. - Do not change the HTTP status code for RPC errors. The RPC error already reports what went wrong, the HTTP status should only report problems with the HTTP transaction itself. This is a user-visible change. - Encode JSON without indentation in POST response bodies. This is mainly cosmetic but saves quite a bit of response data. Indent is still applied to GET responses to make life easier for code examples. - Remove an obsolete TODO about reporting an HTTP error on websocket upgrade. Nothing needed to change; the upgrader already reports an error. - Report an HTTP error when starting the server loop fails. - Improve logging for encoding errors. - Log less aggressively.
480 lines
14 KiB
Go
480 lines
14 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"reflect"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/rpc/client"
|
|
"github.com/tendermint/tendermint/rpc/coretypes"
|
|
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
|
|
)
|
|
|
|
// WebSocket handler
|
|
|
|
const (
|
|
defaultWSWriteChanCapacity = 100
|
|
defaultWSWriteWait = 10 * time.Second
|
|
defaultWSReadWait = 30 * time.Second
|
|
defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
|
|
)
|
|
|
|
// WebsocketManager provides a WS handler for incoming connections and passes a
|
|
// map of functions along with any additional params to new connections.
|
|
// NOTE: The websocket path is defined externally, e.g. in node/node.go
|
|
type WebsocketManager struct {
|
|
websocket.Upgrader
|
|
|
|
funcMap map[string]*RPCFunc
|
|
logger log.Logger
|
|
wsConnOptions []func(*wsConnection)
|
|
}
|
|
|
|
// NewWebsocketManager returns a new WebsocketManager that passes a map of
|
|
// functions, connection options and logger to new WS connections.
|
|
func NewWebsocketManager(
|
|
logger log.Logger,
|
|
funcMap map[string]*RPCFunc,
|
|
wsConnOptions ...func(*wsConnection),
|
|
) *WebsocketManager {
|
|
return &WebsocketManager{
|
|
funcMap: funcMap,
|
|
Upgrader: websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
// TODO ???
|
|
//
|
|
// The default behavior would be relevant to browser-based clients,
|
|
// afaik. I suppose having a pass-through is a workaround for allowing
|
|
// for more complex security schemes, shifting the burden of
|
|
// AuthN/AuthZ outside the Tendermint RPC.
|
|
// I can't think of other uses right now that would warrant a TODO
|
|
// though. The real backstory of this TODO shall remain shrouded in
|
|
// mystery
|
|
return true
|
|
},
|
|
},
|
|
logger: logger,
|
|
wsConnOptions: wsConnOptions,
|
|
}
|
|
}
|
|
|
|
// WebsocketHandler upgrades the request/response (via http.Hijack) and starts
|
|
// the wsConnection.
|
|
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
|
wsConn, err := wm.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
// The upgrader has already reported an HTTP error to the client, so we
|
|
// need only log it.
|
|
wm.logger.Error("Failed to upgrade connection", "err", err)
|
|
return
|
|
}
|
|
defer func() {
|
|
if err := wsConn.Close(); err != nil {
|
|
wm.logger.Error("Failed to close connection", "err", err)
|
|
}
|
|
}()
|
|
|
|
// register connection
|
|
logger := wm.logger.With("remote", wsConn.RemoteAddr())
|
|
conn := newWSConnection(wsConn, wm.funcMap, logger, wm.wsConnOptions...)
|
|
wm.logger.Info("New websocket connection", "remote", conn.remoteAddr)
|
|
|
|
// starting the conn is blocking
|
|
if err = conn.Start(r.Context()); err != nil {
|
|
wm.logger.Error("Failed to start connection", "err", err)
|
|
writeInternalError(w, err)
|
|
return
|
|
}
|
|
|
|
if err := conn.Stop(); err != nil {
|
|
wm.logger.Error("error while stopping connection", "error", err)
|
|
}
|
|
}
|
|
|
|
// WebSocket connection
|
|
|
|
// A single websocket connection contains listener id, underlying ws
|
|
// connection, and the event switch for subscribing to events.
|
|
//
|
|
// In case of an error, the connection is stopped.
|
|
type wsConnection struct {
|
|
*client.RunState
|
|
|
|
remoteAddr string
|
|
baseConn *websocket.Conn
|
|
// writeChan is never closed, to allow WriteRPCResponse() to fail.
|
|
writeChan chan rpctypes.RPCResponse
|
|
|
|
// chan, which is closed when/if readRoutine errors
|
|
// used to abort writeRoutine
|
|
readRoutineQuit chan struct{}
|
|
|
|
funcMap map[string]*RPCFunc
|
|
|
|
// write channel capacity
|
|
writeChanCapacity int
|
|
|
|
// each write times out after this.
|
|
writeWait time.Duration
|
|
|
|
// Connection times out if we haven't received *anything* in this long, not even pings.
|
|
readWait time.Duration
|
|
|
|
// Send pings to server with this period. Must be less than readWait, but greater than zero.
|
|
pingPeriod time.Duration
|
|
|
|
// Maximum message size.
|
|
readLimit int64
|
|
|
|
// callback which is called upon disconnect
|
|
onDisconnect func(remoteAddr string)
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewWSConnection wraps websocket.Conn.
|
|
//
|
|
// See the commentary on the func(*wsConnection) functions for a detailed
|
|
// description of how to configure ping period and pong wait time. NOTE: if the
|
|
// write buffer is full, pongs may be dropped, which may cause clients to
|
|
// disconnect. see https://github.com/gorilla/websocket/issues/97
|
|
func newWSConnection(
|
|
baseConn *websocket.Conn,
|
|
funcMap map[string]*RPCFunc,
|
|
logger log.Logger,
|
|
options ...func(*wsConnection),
|
|
) *wsConnection {
|
|
wsc := &wsConnection{
|
|
RunState: client.NewRunState("wsConnection", logger),
|
|
remoteAddr: baseConn.RemoteAddr().String(),
|
|
baseConn: baseConn,
|
|
funcMap: funcMap,
|
|
writeWait: defaultWSWriteWait,
|
|
writeChanCapacity: defaultWSWriteChanCapacity,
|
|
readWait: defaultWSReadWait,
|
|
pingPeriod: defaultWSPingPeriod,
|
|
readRoutineQuit: make(chan struct{}),
|
|
}
|
|
for _, option := range options {
|
|
option(wsc)
|
|
}
|
|
wsc.baseConn.SetReadLimit(wsc.readLimit)
|
|
return wsc
|
|
}
|
|
|
|
// OnDisconnect sets a callback which is used upon disconnect - not
|
|
// Goroutine-safe. Nop by default.
|
|
func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) {
|
|
return func(wsc *wsConnection) {
|
|
wsc.onDisconnect = onDisconnect
|
|
}
|
|
}
|
|
|
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
|
func WriteWait(writeWait time.Duration) func(*wsConnection) {
|
|
return func(wsc *wsConnection) {
|
|
wsc.writeWait = writeWait
|
|
}
|
|
}
|
|
|
|
// WriteChanCapacity sets the capacity of the websocket write channel.
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
|
func WriteChanCapacity(cap int) func(*wsConnection) {
|
|
return func(wsc *wsConnection) {
|
|
wsc.writeChanCapacity = cap
|
|
}
|
|
}
|
|
|
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
|
func ReadWait(readWait time.Duration) func(*wsConnection) {
|
|
return func(wsc *wsConnection) {
|
|
wsc.readWait = readWait
|
|
}
|
|
}
|
|
|
|
// PingPeriod sets the duration for sending websocket pings.
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
|
func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
|
|
return func(wsc *wsConnection) {
|
|
wsc.pingPeriod = pingPeriod
|
|
}
|
|
}
|
|
|
|
// ReadLimit sets the maximum size for reading message.
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
|
func ReadLimit(readLimit int64) func(*wsConnection) {
|
|
return func(wsc *wsConnection) {
|
|
wsc.readLimit = readLimit
|
|
}
|
|
}
|
|
|
|
// Start starts the client service routines and blocks until there is an error.
|
|
func (wsc *wsConnection) Start(ctx context.Context) error {
|
|
if err := wsc.RunState.Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
wsc.writeChan = make(chan rpctypes.RPCResponse, wsc.writeChanCapacity)
|
|
|
|
// Read subscriptions/unsubscriptions to events
|
|
go wsc.readRoutine(ctx)
|
|
// Write responses, BLOCKING.
|
|
wsc.writeRoutine(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop unsubscribes the remote from all subscriptions.
|
|
func (wsc *wsConnection) Stop() error {
|
|
if err := wsc.RunState.Stop(); err != nil {
|
|
return err
|
|
}
|
|
if wsc.onDisconnect != nil {
|
|
wsc.onDisconnect(wsc.remoteAddr)
|
|
}
|
|
if wsc.ctx != nil {
|
|
wsc.cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetRemoteAddr returns the remote address of the underlying connection.
|
|
// It implements WSRPCConnection
|
|
func (wsc *wsConnection) GetRemoteAddr() string {
|
|
return wsc.remoteAddr
|
|
}
|
|
|
|
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is
|
|
// accepted.
|
|
// It implements WSRPCConnection. It is Goroutine-safe.
|
|
func (wsc *wsConnection) WriteRPCResponse(ctx context.Context, resp rpctypes.RPCResponse) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case wsc.writeChan <- resp:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// TryWriteRPCResponse attempts to push a response to the writeChan, but does
|
|
// not block.
|
|
// It implements WSRPCConnection. It is Goroutine-safe
|
|
func (wsc *wsConnection) TryWriteRPCResponse(ctx context.Context, resp rpctypes.RPCResponse) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case wsc.writeChan <- resp:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Context returns the connection's context.
|
|
// The context is canceled when the client's connection closes.
|
|
func (wsc *wsConnection) Context() context.Context {
|
|
if wsc.ctx != nil {
|
|
return wsc.ctx
|
|
}
|
|
wsc.ctx, wsc.cancel = context.WithCancel(context.Background())
|
|
return wsc.ctx
|
|
}
|
|
|
|
// Read from the socket and subscribe to or unsubscribe from events
|
|
func (wsc *wsConnection) readRoutine(ctx context.Context) {
|
|
// readRoutine will block until response is written or WS connection is closed
|
|
writeCtx := context.Background()
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err, ok := r.(error)
|
|
if !ok {
|
|
err = fmt.Errorf("WSJSONRPC: %v", r)
|
|
}
|
|
wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack()))
|
|
if err := wsc.WriteRPCResponse(writeCtx, rpctypes.RPCInternalError(rpctypes.JSONRPCIntID(-1), err)); err != nil {
|
|
wsc.Logger.Error("error writing RPC response", "err", err)
|
|
}
|
|
go wsc.readRoutine(ctx)
|
|
}
|
|
}()
|
|
|
|
wsc.baseConn.SetPongHandler(func(m string) error {
|
|
return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait))
|
|
})
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// reset deadline for every type of message (control or data)
|
|
if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil {
|
|
wsc.Logger.Error("failed to set read deadline", "err", err)
|
|
}
|
|
|
|
_, r, err := wsc.baseConn.NextReader()
|
|
if err != nil {
|
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
wsc.Logger.Info("Client closed the connection")
|
|
} else {
|
|
wsc.Logger.Error("Failed to read request", "err", err)
|
|
}
|
|
if err := wsc.Stop(); err != nil {
|
|
wsc.Logger.Error("error closing websocket connection", "err", err)
|
|
}
|
|
close(wsc.readRoutineQuit)
|
|
return
|
|
}
|
|
|
|
dec := json.NewDecoder(r)
|
|
var request rpctypes.RPCRequest
|
|
err = dec.Decode(&request)
|
|
if err != nil {
|
|
if err := wsc.WriteRPCResponse(writeCtx,
|
|
rpctypes.RPCParseError(fmt.Errorf("error unmarshaling request: %w", err))); err != nil {
|
|
wsc.Logger.Error("error writing RPC response", "err", err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// A Notification is a Request object without an "id" member.
|
|
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
|
|
if request.ID == nil {
|
|
wsc.Logger.Debug(
|
|
"WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)",
|
|
"req", request,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Now, fetch the RPCFunc and execute it.
|
|
rpcFunc := wsc.funcMap[request.Method]
|
|
if rpcFunc == nil {
|
|
if err := wsc.WriteRPCResponse(writeCtx, rpctypes.RPCMethodNotFoundError(request.ID)); err != nil {
|
|
wsc.Logger.Error("error writing RPC response", "err", err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
fctx := rpctypes.WithCallInfo(wsc.Context(), &rpctypes.CallInfo{
|
|
RPCRequest: &request,
|
|
WSConn: wsc,
|
|
})
|
|
args := []reflect.Value{reflect.ValueOf(fctx)}
|
|
if len(request.Params) > 0 {
|
|
fnArgs, err := jsonParamsToArgs(rpcFunc, request.Params)
|
|
if err != nil {
|
|
if err := wsc.WriteRPCResponse(writeCtx,
|
|
rpctypes.RPCInvalidParamsError(request.ID, fmt.Errorf("error converting json params to arguments: %w", err)),
|
|
); err != nil {
|
|
wsc.Logger.Error("error writing RPC response", "err", err)
|
|
}
|
|
continue
|
|
}
|
|
args = append(args, fnArgs...)
|
|
}
|
|
|
|
returns := rpcFunc.f.Call(args)
|
|
|
|
// TODO: Need to encode args/returns to string if we want to log them
|
|
wsc.Logger.Info("WSJSONRPC", "method", request.Method)
|
|
|
|
var resp rpctypes.RPCResponse
|
|
result, err := unreflectResult(returns)
|
|
switch e := err.(type) {
|
|
// if no error then return a success response
|
|
case nil:
|
|
resp = rpctypes.NewRPCSuccessResponse(request.ID, result)
|
|
|
|
// if this already of type RPC error then forward that error
|
|
case *rpctypes.RPCError:
|
|
resp = rpctypes.NewRPCErrorResponse(request.ID, e.Code, e.Message, e.Data)
|
|
|
|
default: // we need to unwrap the error and parse it accordingly
|
|
switch errors.Unwrap(err) {
|
|
// check if the error was due to an invald request
|
|
case coretypes.ErrZeroOrNegativeHeight, coretypes.ErrZeroOrNegativePerPage,
|
|
coretypes.ErrPageOutOfRange, coretypes.ErrInvalidRequest:
|
|
resp = rpctypes.RPCInvalidRequestError(request.ID, err)
|
|
|
|
// lastly default all remaining errors as internal errors
|
|
default: // includes ctypes.ErrHeightNotAvailable and ctypes.ErrHeightExceedsChainHead
|
|
resp = rpctypes.RPCInternalError(request.ID, err)
|
|
}
|
|
}
|
|
|
|
if err := wsc.WriteRPCResponse(writeCtx, resp); err != nil {
|
|
wsc.Logger.Error("error writing RPC response", "err", err)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// receives on a write channel and writes out on the socket
|
|
func (wsc *wsConnection) writeRoutine(ctx context.Context) {
|
|
pingTicker := time.NewTicker(wsc.pingPeriod)
|
|
defer pingTicker.Stop()
|
|
|
|
// https://github.com/gorilla/websocket/issues/97
|
|
pongs := make(chan string, 1)
|
|
wsc.baseConn.SetPingHandler(func(m string) error {
|
|
select {
|
|
case pongs <- m:
|
|
default:
|
|
}
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-wsc.readRoutineQuit: // error in readRoutine
|
|
return
|
|
case m := <-pongs:
|
|
err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m))
|
|
if err != nil {
|
|
wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err)
|
|
}
|
|
case <-pingTicker.C:
|
|
err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
|
|
if err != nil {
|
|
wsc.Logger.Error("Failed to write ping", "err", err)
|
|
return
|
|
}
|
|
case msg := <-wsc.writeChan:
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "msg", msg, "err", err)
|
|
continue
|
|
}
|
|
if err = wsc.writeMessageWithDeadline(websocket.TextMessage, data); err != nil {
|
|
wsc.Logger.Error("Failed to write response", "msg", msg, "err", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// All writes to the websocket must (re)set the write deadline.
|
|
// If some writes don't set it while others do, they may timeout incorrectly
|
|
// (https://github.com/tendermint/tendermint/issues/553)
|
|
func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
|
|
if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil {
|
|
return err
|
|
}
|
|
return wsc.baseConn.WriteMessage(msgType, msg)
|
|
}
|