mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-05 21:14:53 +00:00
rpc: reformat method signatures and use a context (#8377)
I was digging around over here, and thought it'd be good to cleanup/standardize the line formating on a few of these methods. Also found a few cases where we could use contexts better so did a little bit of cleanup there too!
This commit is contained in:
@@ -189,6 +189,5 @@ type MissedItemsError struct {
|
||||
|
||||
// Error satisfies the error interface.
|
||||
func (e *MissedItemsError) Error() string {
|
||||
return fmt.Sprintf("missed events matching %q between %q and %q",
|
||||
e.Query, e.NewestSeen, e.OldestPresent)
|
||||
return fmt.Sprintf("missed events matching %q between %q and %q", e.Query, e.NewestSeen, e.OldestPresent)
|
||||
}
|
||||
|
||||
@@ -207,19 +207,11 @@ func (c *baseRPCClient) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) ABCIQuery(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
data bytes.HexBytes,
|
||||
) (*coretypes.ResultABCIQuery, error) {
|
||||
func (c *baseRPCClient) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*coretypes.ResultABCIQuery, error) {
|
||||
return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions)
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) ABCIQueryWithOptions(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
data bytes.HexBytes,
|
||||
opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
|
||||
func (c *baseRPCClient) ABCIQueryWithOptions(ctx context.Context, path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
|
||||
result := new(coretypes.ResultABCIQuery)
|
||||
if err := c.caller.Call(ctx, "abci_query", abciQueryArgs{
|
||||
Path: path,
|
||||
@@ -232,10 +224,7 @@ func (c *baseRPCClient) ABCIQueryWithOptions(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BroadcastTxCommit(
|
||||
ctx context.Context,
|
||||
tx types.Tx,
|
||||
) (*coretypes.ResultBroadcastTxCommit, error) {
|
||||
func (c *baseRPCClient) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTxCommit, error) {
|
||||
result := new(coretypes.ResultBroadcastTxCommit)
|
||||
if err := c.caller.Call(ctx, "broadcast_tx_commit", txArgs{Tx: tx}, result); err != nil {
|
||||
return nil, err
|
||||
@@ -243,25 +232,15 @@ func (c *baseRPCClient) BroadcastTxCommit(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BroadcastTxAsync(
|
||||
ctx context.Context,
|
||||
tx types.Tx,
|
||||
) (*coretypes.ResultBroadcastTx, error) {
|
||||
func (c *baseRPCClient) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
|
||||
return c.broadcastTX(ctx, "broadcast_tx_async", tx)
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BroadcastTxSync(
|
||||
ctx context.Context,
|
||||
tx types.Tx,
|
||||
) (*coretypes.ResultBroadcastTx, error) {
|
||||
func (c *baseRPCClient) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
|
||||
return c.broadcastTX(ctx, "broadcast_tx_sync", tx)
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) broadcastTX(
|
||||
ctx context.Context,
|
||||
route string,
|
||||
tx types.Tx,
|
||||
) (*coretypes.ResultBroadcastTx, error) {
|
||||
func (c *baseRPCClient) broadcastTX(ctx context.Context, route string, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
|
||||
result := new(coretypes.ResultBroadcastTx)
|
||||
if err := c.caller.Call(ctx, route, txArgs{Tx: tx}, result); err != nil {
|
||||
return nil, err
|
||||
@@ -269,11 +248,7 @@ func (c *baseRPCClient) broadcastTX(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) UnconfirmedTxs(
|
||||
ctx context.Context,
|
||||
page *int,
|
||||
perPage *int,
|
||||
) (*coretypes.ResultUnconfirmedTxs, error) {
|
||||
func (c *baseRPCClient) UnconfirmedTxs(ctx context.Context, page *int, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) {
|
||||
result := new(coretypes.ResultUnconfirmedTxs)
|
||||
|
||||
if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Page: page, PerPage: perPage}, result); err != nil {
|
||||
@@ -329,10 +304,7 @@ func (c *baseRPCClient) ConsensusState(ctx context.Context) (*coretypes.ResultCo
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) ConsensusParams(
|
||||
ctx context.Context,
|
||||
height *int64,
|
||||
) (*coretypes.ResultConsensusParams, error) {
|
||||
func (c *baseRPCClient) ConsensusParams(ctx context.Context, height *int64) (*coretypes.ResultConsensusParams, error) {
|
||||
result := new(coretypes.ResultConsensusParams)
|
||||
if err := c.caller.Call(ctx, "consensus_params", heightArgs{Height: height}, result); err != nil {
|
||||
return nil, err
|
||||
@@ -356,11 +328,7 @@ func (c *baseRPCClient) Health(ctx context.Context) (*coretypes.ResultHealth, er
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BlockchainInfo(
|
||||
ctx context.Context,
|
||||
minHeight,
|
||||
maxHeight int64,
|
||||
) (*coretypes.ResultBlockchainInfo, error) {
|
||||
func (c *baseRPCClient) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) {
|
||||
result := new(coretypes.ResultBlockchainInfo)
|
||||
if err := c.caller.Call(ctx, "blockchain", blockchainInfoArgs{
|
||||
MinHeight: minHeight,
|
||||
@@ -403,10 +371,7 @@ func (c *baseRPCClient) BlockByHash(ctx context.Context, hash bytes.HexBytes) (*
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BlockResults(
|
||||
ctx context.Context,
|
||||
height *int64,
|
||||
) (*coretypes.ResultBlockResults, error) {
|
||||
func (c *baseRPCClient) BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error) {
|
||||
result := new(coretypes.ResultBlockResults)
|
||||
if err := c.caller.Call(ctx, "block_results", heightArgs{Height: height}, result); err != nil {
|
||||
return nil, err
|
||||
@@ -446,14 +411,7 @@ func (c *baseRPCClient) Tx(ctx context.Context, hash bytes.HexBytes, prove bool)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) TxSearch(
|
||||
ctx context.Context,
|
||||
query string,
|
||||
prove bool,
|
||||
page,
|
||||
perPage *int,
|
||||
orderBy string,
|
||||
) (*coretypes.ResultTxSearch, error) {
|
||||
func (c *baseRPCClient) TxSearch(ctx context.Context, query string, prove bool, page, perPage *int, orderBy string) (*coretypes.ResultTxSearch, error) {
|
||||
result := new(coretypes.ResultTxSearch)
|
||||
if err := c.caller.Call(ctx, "tx_search", searchArgs{
|
||||
Query: query,
|
||||
@@ -468,12 +426,7 @@ func (c *baseRPCClient) TxSearch(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BlockSearch(
|
||||
ctx context.Context,
|
||||
query string,
|
||||
page, perPage *int,
|
||||
orderBy string,
|
||||
) (*coretypes.ResultBlockSearch, error) {
|
||||
func (c *baseRPCClient) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) {
|
||||
result := new(coretypes.ResultBlockSearch)
|
||||
if err := c.caller.Call(ctx, "block_search", searchArgs{
|
||||
Query: query,
|
||||
@@ -487,12 +440,7 @@ func (c *baseRPCClient) BlockSearch(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) Validators(
|
||||
ctx context.Context,
|
||||
height *int64,
|
||||
page,
|
||||
perPage *int,
|
||||
) (*coretypes.ResultValidators, error) {
|
||||
func (c *baseRPCClient) Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error) {
|
||||
result := new(coretypes.ResultValidators)
|
||||
if err := c.caller.Call(ctx, "validators", validatorArgs{
|
||||
Height: height,
|
||||
@@ -504,10 +452,7 @@ func (c *baseRPCClient) Validators(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) BroadcastEvidence(
|
||||
ctx context.Context,
|
||||
ev types.Evidence,
|
||||
) (*coretypes.ResultBroadcastEvidence, error) {
|
||||
func (c *baseRPCClient) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*coretypes.ResultBroadcastEvidence, error) {
|
||||
result := new(coretypes.ResultBroadcastEvidence)
|
||||
if err := c.caller.Call(ctx, "broadcast_evidence", evidenceArgs{
|
||||
Evidence: coretypes.Evidence{Value: ev},
|
||||
|
||||
@@ -107,6 +107,7 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
info, ok := w.subscriptions[query]
|
||||
if ok {
|
||||
if info.id != "" {
|
||||
@@ -114,7 +115,6 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
|
||||
}
|
||||
delete(w.subscriptions, info.query)
|
||||
}
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -129,8 +129,8 @@ func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
w.subscriptions = make(map[string]*wsSubscription)
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -78,11 +78,7 @@ func (c *Local) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes)
|
||||
return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions)
|
||||
}
|
||||
|
||||
func (c *Local) ABCIQueryWithOptions(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
data bytes.HexBytes,
|
||||
opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
|
||||
func (c *Local) ABCIQueryWithOptions(ctx context.Context, path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
|
||||
return c.env.ABCIQuery(ctx, path, data, opts.Height, opts.Prove)
|
||||
}
|
||||
|
||||
@@ -189,23 +185,11 @@ func (c *Local) Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coret
|
||||
return c.env.Tx(ctx, hash, prove)
|
||||
}
|
||||
|
||||
func (c *Local) TxSearch(
|
||||
ctx context.Context,
|
||||
queryString string,
|
||||
prove bool,
|
||||
page,
|
||||
perPage *int,
|
||||
orderBy string,
|
||||
) (*coretypes.ResultTxSearch, error) {
|
||||
func (c *Local) TxSearch(ctx context.Context, queryString string, prove bool, page, perPage *int, orderBy string) (*coretypes.ResultTxSearch, error) {
|
||||
return c.env.TxSearch(ctx, queryString, prove, page, perPage, orderBy)
|
||||
}
|
||||
|
||||
func (c *Local) BlockSearch(
|
||||
ctx context.Context,
|
||||
queryString string,
|
||||
page, perPage *int,
|
||||
orderBy string,
|
||||
) (*coretypes.ResultBlockSearch, error) {
|
||||
func (c *Local) BlockSearch(ctx context.Context, queryString string, page, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) {
|
||||
return c.env.BlockSearch(ctx, queryString, page, perPage, orderBy)
|
||||
}
|
||||
|
||||
@@ -213,11 +197,7 @@ func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*core
|
||||
return c.env.BroadcastEvidence(ctx, coretypes.Evidence{Value: ev})
|
||||
}
|
||||
|
||||
func (c *Local) Subscribe(
|
||||
ctx context.Context,
|
||||
subscriber,
|
||||
queryString string,
|
||||
capacity ...int) (out <-chan coretypes.ResultEvent, err error) {
|
||||
func (c *Local) Subscribe(ctx context.Context, subscriber, queryString string, capacity ...int) (<-chan coretypes.ResultEvent, error) {
|
||||
q, err := query.New(queryString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse query: %w", err)
|
||||
@@ -251,12 +231,7 @@ func (c *Local) Subscribe(
|
||||
return outc, nil
|
||||
}
|
||||
|
||||
func (c *Local) eventsRoutine(
|
||||
ctx context.Context,
|
||||
sub eventbus.Subscription,
|
||||
subArgs pubsub.SubscribeArgs,
|
||||
outc chan<- coretypes.ResultEvent,
|
||||
) {
|
||||
func (c *Local) eventsRoutine(ctx context.Context, sub eventbus.Subscription, subArgs pubsub.SubscribeArgs, outc chan<- coretypes.ResultEvent) {
|
||||
qstr := subArgs.Query.String()
|
||||
for {
|
||||
msg, err := sub.Next(ctx)
|
||||
@@ -271,17 +246,24 @@ func (c *Local) eventsRoutine(
|
||||
}
|
||||
continue
|
||||
}
|
||||
outc <- coretypes.ResultEvent{
|
||||
select {
|
||||
case outc <- coretypes.ResultEvent{
|
||||
SubscriptionID: msg.SubscriptionID(),
|
||||
Query: qstr,
|
||||
Data: msg.Data(),
|
||||
Events: msg.Events(),
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try to resubscribe with exponential backoff.
|
||||
func (c *Local) resubscribe(ctx context.Context, subArgs pubsub.SubscribeArgs) eventbus.Subscription {
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
attempts := 0
|
||||
for {
|
||||
if !c.IsRunning() {
|
||||
@@ -294,7 +276,13 @@ func (c *Local) resubscribe(ctx context.Context, subArgs pubsub.SubscribeArgs) e
|
||||
}
|
||||
|
||||
attempts++
|
||||
time.Sleep((10 << uint(attempts)) * time.Millisecond) // 10ms -> 20ms -> 40ms
|
||||
timer.Reset((10 << uint(attempts)) * time.Millisecond) // 10ms -> 20ms -> 40ms
|
||||
select {
|
||||
case <-timer.C:
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,13 +46,7 @@ func DefaultConfig() *Config {
|
||||
|
||||
// Serve creates a http.Server and calls Serve with the given listener. It
|
||||
// wraps handler to recover panics and limit the request body size.
|
||||
func Serve(
|
||||
ctx context.Context,
|
||||
listener net.Listener,
|
||||
handler http.Handler,
|
||||
logger log.Logger,
|
||||
config *Config,
|
||||
) error {
|
||||
func Serve(ctx context.Context, listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
|
||||
logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr()))
|
||||
h := recoverAndLogHandler(MaxBytesHandler(handler, config.MaxBodyBytes), logger)
|
||||
s := &http.Server{
|
||||
@@ -83,19 +77,14 @@ func Serve(
|
||||
// Serve creates a http.Server and calls ServeTLS with the given listener,
|
||||
// certFile and keyFile. It wraps handler to recover panics and limit the
|
||||
// request body size.
|
||||
func ServeTLS(
|
||||
ctx context.Context,
|
||||
listener net.Listener,
|
||||
handler http.Handler,
|
||||
certFile, keyFile string,
|
||||
logger log.Logger,
|
||||
config *Config,
|
||||
) error {
|
||||
logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
|
||||
listener.Addr(), certFile, keyFile))
|
||||
h := recoverAndLogHandler(MaxBytesHandler(handler, config.MaxBodyBytes), logger)
|
||||
func ServeTLS(ctx context.Context, listener net.Listener, handler http.Handler, certFile, keyFile string, logger log.Logger, config *Config) error {
|
||||
logger.Info("Starting RPC HTTPS server",
|
||||
"listenterAddr", listener.Addr(),
|
||||
"certFile", certFile,
|
||||
"keyFile", keyFile)
|
||||
|
||||
s := &http.Server{
|
||||
Handler: h,
|
||||
Handler: recoverAndLogHandler(MaxBytesHandler(handler, config.MaxBodyBytes), logger),
|
||||
ReadTimeout: config.ReadTimeout,
|
||||
WriteTimeout: config.WriteTimeout,
|
||||
MaxHeaderBytes: config.MaxHeaderBytes,
|
||||
|
||||
@@ -36,11 +36,7 @@ type WebsocketManager struct {
|
||||
|
||||
// NewWebsocketManager returns a new WebsocketManager that passes a map of
|
||||
// functions, connection options and logger to new WS connections.
|
||||
func NewWebsocketManager(
|
||||
logger log.Logger,
|
||||
funcMap map[string]*RPCFunc,
|
||||
wsConnOptions ...func(*wsConnection),
|
||||
) *WebsocketManager {
|
||||
func NewWebsocketManager(logger log.Logger, funcMap map[string]*RPCFunc, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
|
||||
return &WebsocketManager{
|
||||
funcMap: funcMap,
|
||||
Upgrader: websocket.Upgrader{
|
||||
@@ -137,12 +133,7 @@ type wsConnection struct {
|
||||
// description of how to configure ping period and pong wait time. NOTE: if the
|
||||
// write buffer is full, pongs may be dropped, which may cause clients to
|
||||
// disconnect. see https://github.com/gorilla/websocket/issues/97
|
||||
func newWSConnection(
|
||||
baseConn *websocket.Conn,
|
||||
funcMap map[string]*RPCFunc,
|
||||
logger log.Logger,
|
||||
options ...func(*wsConnection),
|
||||
) *wsConnection {
|
||||
func newWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, logger log.Logger, options ...func(*wsConnection)) *wsConnection {
|
||||
wsc := &wsConnection{
|
||||
Logger: logger,
|
||||
remoteAddr: baseConn.RemoteAddr().String(),
|
||||
|
||||
Reference in New Issue
Block a user