Files
tendermint/rpc/client/httpclient.go
Anton Kaliaev fb8b00f1d8 lite2: light client with weak subjectivity (#3989)
Refs #1771

ADR: https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-044-lite-client-with-weak-subjectivity.md

## Commits:

* add Verifier and VerifyCommitTrusting

* add two more checks

make trustLevel an option

* float32 for trustLevel

* check newHeader time

* started writing lite Client

* unify Verify methods

* ensure h2.Header.bfttime < h1.Header.bfttime + tp

* move trust checks into Verify function

* add more comments

* more docs

* started writing tests

* unbonding period failures

* tests are green

* export ErrNewHeaderTooFarIntoFuture

* make golangci happy

* test for non-adjusted headers

* more precision

* providers and stores

* VerifyHeader and VerifyHeaderAtHeight funcs

* fix compile errors

* remove lastVerifiedHeight, persist new trusted header

* sequential verification

* remove TrustedStore option

* started writing tests for light client

* cover basic cases for linear verification

* bisection tests PASS

* rename BisectingVerification to SkippingVerification

* refactor the code

* add TrustedHeader method

* consolidate sequential verification tests

* consolidate skipping verification tests

* rename trustedVals to trustedNextVals

* start writing docs

* ValidateTrustLevel func and ErrOldHeaderExpired error

* AutoClient and example tests

* fix errors

* update doc

* remove ErrNewHeaderTooFarIntoFuture

This check is unnecessary given existing a) ErrOldHeaderExpired b)
h2.Time > now checks.

* return an error if we're at more recent height

* add comments

* add LastSignedHeaderHeight method to Store

I think it's fine if Store tracks last height

* copy over proxy from old lite package

* make TrustedHeader return latest if height=0

* modify LastSignedHeaderHeight to return an error if no headers exist

* copy over proxy impl

* refactor proxy and start http lite client

* Tx and BlockchainInfo methods

* Block method

* commit method

* code compiles again

* lite client compiles

* extract updateLiteClientIfNeededTo func

* move final parts

* add placeholder for tests

* force usage of lite http client in proxy

* comment out query tests for now

* explicitly mention tp: trusting period

* verify nextVals in VerifyHeader

* refactor bisection

* move the NextValidatorsHash check into updateTrustedHeaderAndVals

+ update the comment

* add ConsensusParams method to RPC client

* add ConsensusParams to rpc/mock/client

* change trustLevel type to a new cmn.Fraction type

+ update SkippingVerification comment

* stress out trustLevel is only used for non-adjusted headers

* fixes after Fede's review

Co-authored-by: Federico Kunze <31522760+fedekunze@users.noreply.github.com>

* compare newHeader with a header from an alternative provider

* save pivot header

Refs https://github.com/tendermint/tendermint/pull/3989#discussion_r349122824

* check header can still be trusted in TrustedHeader

Refs https://github.com/tendermint/tendermint/pull/3989#discussion_r349101424

* lite: update Validators and Block endpoints

- Block no longer contains BlockMeta
- Validators now accept two additional params: page and perPage

* make linter happy
2019-11-25 19:07:40 +04:00

551 lines
16 KiB
Go

package client
import (
"context"
"net/http"
"strings"
"sync"
"time"
"github.com/pkg/errors"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
)
/*
HTTP is a Client implementation that communicates with a Tendermint node over
JSON RPC and WebSockets.
This is the main implementation you probably want to use in production code.
There are other implementations when calling the Tendermint node in-process
(Local), or when you want to mock out the server for test code (mock).
You can subscribe for any event published by Tendermint using Subscribe method.
Note delivery is best-effort. If you don't read events fast enough or network is
slow, Tendermint might cancel the subscription. The client will attempt to
resubscribe (you don't need to do anything). It will keep trying every second
indefinitely until successful.
Request batching is available for JSON RPC requests over HTTP, which conforms to
the JSON RPC specification (https://www.jsonrpc.org/specification#batch). See
the example for more details.
*/
type HTTP struct {
remote string
rpc *rpcclient.JSONRPCClient
*baseRPCClient
*WSEvents
}
// BatchHTTP provides the same interface as `HTTP`, but allows for batching of
// requests (as per https://www.jsonrpc.org/specification#batch). Do not
// instantiate directly - rather use the HTTP.NewBatch() method to create an
// instance of this struct.
//
// Batching of HTTP requests is thread-safe in the sense that multiple
// goroutines can each create their own batches and send them using the same
// HTTP client. Multiple goroutines could also enqueue transactions in a single
// batch, but ordering of transactions in the batch cannot be guaranteed in such
// an example.
type BatchHTTP struct {
rpcBatch *rpcclient.JSONRPCRequestBatch
*baseRPCClient
}
// rpcClient is an internal interface to which our RPC clients (batch and
// non-batch) must conform. Acts as an additional code-level sanity check to
// make sure the implementations stay coherent.
type rpcClient interface {
ABCIClient
HistoryClient
NetworkClient
SignClient
StatusClient
}
// baseRPCClient implements the basic RPC method logic without the actual
// underlying RPC call functionality, which is provided by `caller`.
type baseRPCClient struct {
caller rpcclient.JSONRPCCaller
}
var _ rpcClient = (*HTTP)(nil)
var _ rpcClient = (*BatchHTTP)(nil)
var _ rpcClient = (*baseRPCClient)(nil)
//-----------------------------------------------------------------------------
// HTTP
// NewHTTP takes a remote endpoint in the form <protocol>://<host>:<port> and
// the websocket path (which always seems to be "/websocket")
// The function panics if the provided remote is invalid.<Paste>
func NewHTTP(remote, wsEndpoint string) *HTTP {
httpClient := rpcclient.DefaultHTTPClient(remote)
return NewHTTPWithClient(remote, wsEndpoint, httpClient)
}
// NewHTTPWithClient allows for setting a custom http client. See NewHTTP
// The function panics if the provided client is nil or remote is invalid.
func NewHTTPWithClient(remote, wsEndpoint string, client *http.Client) *HTTP {
if client == nil {
panic("nil http.Client provided")
}
rc := rpcclient.NewJSONRPCClientWithHTTPClient(remote, client)
cdc := rc.Codec()
ctypes.RegisterAmino(cdc)
rc.SetCodec(cdc)
return &HTTP{
rpc: rc,
remote: remote,
baseRPCClient: &baseRPCClient{caller: rc},
WSEvents: newWSEvents(cdc, remote, wsEndpoint),
}
}
var _ Client = (*HTTP)(nil)
func (c *HTTP) SetLogger(l log.Logger) {
c.WSEvents.SetLogger(l)
}
// NewBatch creates a new batch client for this HTTP client.
func (c *HTTP) NewBatch() *BatchHTTP {
rpcBatch := c.rpc.NewRequestBatch()
return &BatchHTTP{
rpcBatch: rpcBatch,
baseRPCClient: &baseRPCClient{
caller: rpcBatch,
},
}
}
//-----------------------------------------------------------------------------
// BatchHTTP
// Send is a convenience function for an HTTP batch that will trigger the
// compilation of the batched requests and send them off using the client as a
// single request. On success, this returns a list of the deserialized results
// from each request in the sent batch.
func (b *BatchHTTP) Send() ([]interface{}, error) {
return b.rpcBatch.Send()
}
// Clear will empty out this batch of requests and return the number of requests
// that were cleared out.
func (b *BatchHTTP) Clear() int {
return b.rpcBatch.Clear()
}
// Count returns the number of enqueued requests waiting to be sent.
func (b *BatchHTTP) Count() int {
return b.rpcBatch.Count()
}
//-----------------------------------------------------------------------------
// baseRPCClient
func (c *baseRPCClient) Status() (*ctypes.ResultStatus, error) {
result := new(ctypes.ResultStatus)
_, err := c.caller.Call("status", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "Status")
}
return result, nil
}
func (c *baseRPCClient) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
result := new(ctypes.ResultABCIInfo)
_, err := c.caller.Call("abci_info", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "ABCIInfo")
}
return result, nil
}
func (c *baseRPCClient) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
}
func (c *baseRPCClient) ABCIQueryWithOptions(
path string,
data cmn.HexBytes,
opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
result := new(ctypes.ResultABCIQuery)
_, err := c.caller.Call("abci_query",
map[string]interface{}{"path": path, "data": data, "height": opts.Height, "prove": opts.Prove},
result)
if err != nil {
return nil, errors.Wrap(err, "ABCIQuery")
}
return result, nil
}
func (c *baseRPCClient) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
result := new(ctypes.ResultBroadcastTxCommit)
_, err := c.caller.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
if err != nil {
return nil, errors.Wrap(err, "broadcast_tx_commit")
}
return result, nil
}
func (c *baseRPCClient) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return c.broadcastTX("broadcast_tx_async", tx)
}
func (c *baseRPCClient) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return c.broadcastTX("broadcast_tx_sync", tx)
}
func (c *baseRPCClient) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
result := new(ctypes.ResultBroadcastTx)
_, err := c.caller.Call(route, map[string]interface{}{"tx": tx}, result)
if err != nil {
return nil, errors.Wrap(err, route)
}
return result, nil
}
func (c *baseRPCClient) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
result := new(ctypes.ResultUnconfirmedTxs)
_, err := c.caller.Call("unconfirmed_txs", map[string]interface{}{"limit": limit}, result)
if err != nil {
return nil, errors.Wrap(err, "unconfirmed_txs")
}
return result, nil
}
func (c *baseRPCClient) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
result := new(ctypes.ResultUnconfirmedTxs)
_, err := c.caller.Call("num_unconfirmed_txs", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "num_unconfirmed_txs")
}
return result, nil
}
func (c *baseRPCClient) NetInfo() (*ctypes.ResultNetInfo, error) {
result := new(ctypes.ResultNetInfo)
_, err := c.caller.Call("net_info", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "NetInfo")
}
return result, nil
}
func (c *baseRPCClient) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
result := new(ctypes.ResultDumpConsensusState)
_, err := c.caller.Call("dump_consensus_state", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "DumpConsensusState")
}
return result, nil
}
func (c *baseRPCClient) ConsensusState() (*ctypes.ResultConsensusState, error) {
result := new(ctypes.ResultConsensusState)
_, err := c.caller.Call("consensus_state", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "ConsensusState")
}
return result, nil
}
func (c *baseRPCClient) ConsensusParams(height *int64) (*ctypes.ResultConsensusParams, error) {
result := new(ctypes.ResultConsensusParams)
_, err := c.caller.Call("consensus_params", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "ConsensusParams")
}
return result, nil
}
func (c *baseRPCClient) Health() (*ctypes.ResultHealth, error) {
result := new(ctypes.ResultHealth)
_, err := c.caller.Call("health", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "Health")
}
return result, nil
}
func (c *baseRPCClient) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
result := new(ctypes.ResultBlockchainInfo)
_, err := c.caller.Call("blockchain",
map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
result)
if err != nil {
return nil, errors.Wrap(err, "BlockchainInfo")
}
return result, nil
}
func (c *baseRPCClient) Genesis() (*ctypes.ResultGenesis, error) {
result := new(ctypes.ResultGenesis)
_, err := c.caller.Call("genesis", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "Genesis")
}
return result, nil
}
func (c *baseRPCClient) Block(height *int64) (*ctypes.ResultBlock, error) {
result := new(ctypes.ResultBlock)
_, err := c.caller.Call("block", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Block")
}
return result, nil
}
func (c *baseRPCClient) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) {
result := new(ctypes.ResultBlockResults)
_, err := c.caller.Call("block_results", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Block Result")
}
return result, nil
}
func (c *baseRPCClient) Commit(height *int64) (*ctypes.ResultCommit, error) {
result := new(ctypes.ResultCommit)
_, err := c.caller.Call("commit", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Commit")
}
return result, nil
}
func (c *baseRPCClient) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
result := new(ctypes.ResultTx)
params := map[string]interface{}{
"hash": hash,
"prove": prove,
}
_, err := c.caller.Call("tx", params, result)
if err != nil {
return nil, errors.Wrap(err, "Tx")
}
return result, nil
}
func (c *baseRPCClient) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
result := new(ctypes.ResultTxSearch)
params := map[string]interface{}{
"query": query,
"prove": prove,
"page": page,
"per_page": perPage,
}
_, err := c.caller.Call("tx_search", params, result)
if err != nil {
return nil, errors.Wrap(err, "TxSearch")
}
return result, nil
}
func (c *baseRPCClient) Validators(height *int64, page, perPage int) (*ctypes.ResultValidators, error) {
result := new(ctypes.ResultValidators)
_, err := c.caller.Call("validators", map[string]interface{}{
"height": height,
"page": page,
"per_page": perPage,
}, result)
if err != nil {
return nil, errors.Wrap(err, "Validators")
}
return result, nil
}
func (c *baseRPCClient) BroadcastEvidence(ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
result := new(ctypes.ResultBroadcastEvidence)
_, err := c.caller.Call("broadcast_evidence", map[string]interface{}{"evidence": ev}, result)
if err != nil {
return nil, errors.Wrap(err, "BroadcastEvidence")
}
return result, nil
}
//-----------------------------------------------------------------------------
// WSEvents
type WSEvents struct {
cmn.BaseService
cdc *amino.Codec
remote string
endpoint string
ws *rpcclient.WSClient
mtx sync.RWMutex
// query -> chan
subscriptions map[string]chan ctypes.ResultEvent
}
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
wsEvents := &WSEvents{
cdc: cdc,
endpoint: endpoint,
remote: remote,
subscriptions: make(map[string]chan ctypes.ResultEvent),
}
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
return wsEvents
}
// OnStart implements cmn.Service by starting WSClient and event loop.
func (w *WSEvents) OnStart() error {
w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
// resubscribe immediately
w.redoSubscriptionsAfter(0 * time.Second)
}))
w.ws.SetCodec(w.cdc)
w.ws.SetLogger(w.Logger)
err := w.ws.Start()
if err != nil {
return err
}
go w.eventListener()
return nil
}
// OnStop implements cmn.Service by stopping WSClient.
func (w *WSEvents) OnStop() {
_ = w.ws.Stop()
}
// Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. By default, returns a channel with cap=1. Error is
// returned if it fails to subscribe.
// Channel is never closed to prevent clients from seeing an erroneus event.
func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
if err := w.ws.Subscribe(ctx, query); err != nil {
return nil, err
}
outCap := 1
if len(outCapacity) > 0 {
outCap = outCapacity[0]
}
outc := make(chan ctypes.ResultEvent, outCap)
w.mtx.Lock()
// subscriber param is ignored because Tendermint will override it with
// remote IP anyway.
w.subscriptions[query] = outc
w.mtx.Unlock()
return outc, nil
}
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
// subscriber from query.
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
if err := w.ws.Unsubscribe(ctx, query); err != nil {
return err
}
w.mtx.Lock()
_, ok := w.subscriptions[query]
if ok {
delete(w.subscriptions, query)
}
w.mtx.Unlock()
return nil
}
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// given subscriber from all the queries.
func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
if err := w.ws.UnsubscribeAll(ctx); err != nil {
return err
}
w.mtx.Lock()
w.subscriptions = make(map[string]chan ctypes.ResultEvent)
w.mtx.Unlock()
return nil
}
// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) {
time.Sleep(d)
w.mtx.RLock()
defer w.mtx.RUnlock()
for q := range w.subscriptions {
err := w.ws.Subscribe(context.Background(), q)
if err != nil {
w.Logger.Error("Failed to resubscribe", "err", err)
}
}
}
func isErrAlreadySubscribed(err error) bool {
return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error())
}
func (w *WSEvents) eventListener() {
for {
select {
case resp, ok := <-w.ws.ResponsesCh:
if !ok {
return
}
if resp.Error != nil {
w.Logger.Error("WS error", "err", resp.Error.Error())
// Error can be ErrAlreadySubscribed or max client (subscriptions per
// client) reached or Tendermint exited.
// We can ignore ErrAlreadySubscribed, but need to retry in other
// cases.
if !isErrAlreadySubscribed(resp.Error) {
// Resubscribe after 1 second to give Tendermint time to restart (if
// crashed).
w.redoSubscriptionsAfter(1 * time.Second)
}
continue
}
result := new(ctypes.ResultEvent)
err := w.cdc.UnmarshalJSON(resp.Result, result)
if err != nil {
w.Logger.Error("failed to unmarshal response", "err", err)
continue
}
w.mtx.RLock()
if out, ok := w.subscriptions[result.Query]; ok {
if cap(out) == 0 {
out <- *result
} else {
select {
case out <- *result:
default:
w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query)
}
}
}
w.mtx.RUnlock()
case <-w.Quit():
return
}
}
}