mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-09 22:47:24 +00:00
committed by
Anton Kaliaev
parent
6d91c1faf4
commit
15aa8d82b3
@@ -123,6 +123,7 @@ program](https://hackerone.com/tendermint).
|
||||
- `consensus_validator_last_signed_height`: track at which height the validator last signed
|
||||
- `consensus_validator_missed_blocks`: total amount of missed blocks for a validator
|
||||
as gauges in prometheus for validator specific metrics
|
||||
- [rpc/lib] [\#4248](https://github.com/tendermint/tendermint/issues/4248) RPC client basic authentication support (@greg-szabo)
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -26,6 +27,57 @@ const (
|
||||
|
||||
//-------------------------------------------------------------
|
||||
|
||||
// Parsed URL structure
|
||||
type parsedURL struct {
|
||||
url.URL
|
||||
}
|
||||
|
||||
// Parse URL and set defaults
|
||||
func newParsedURL(remoteAddr string) (*parsedURL, error) {
|
||||
u, err := url.Parse(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// default to tcp if nothing specified
|
||||
if u.Scheme == "" {
|
||||
u.Scheme = protoTCP
|
||||
}
|
||||
|
||||
return &parsedURL{*u}, nil
|
||||
}
|
||||
|
||||
// Change protocol to HTTP for unknown protocols and TCP protocol - useful for RPC connections
|
||||
func (u *parsedURL) SetDefaultSchemeHTTP() {
|
||||
// protocol to use for http operations, to support both http and https
|
||||
switch u.Scheme {
|
||||
case protoHTTP, protoHTTPS, protoWS, protoWSS:
|
||||
// known protocols not changed
|
||||
default:
|
||||
// default to http for unknown protocols (ex. tcp)
|
||||
u.Scheme = protoHTTP
|
||||
}
|
||||
}
|
||||
|
||||
// Get full address without the protocol - useful for Dialer connections
|
||||
func (u parsedURL) GetHostWithPath() string {
|
||||
// Remove protocol, userinfo and # fragment, assume opaque is empty
|
||||
return u.Host + u.EscapedPath()
|
||||
}
|
||||
|
||||
// Get a trimmed address - useful for WS connections
|
||||
func (u parsedURL) GetTrimmedHostWithPath() string {
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
return strings.Replace(u.GetHostWithPath(), "/", ".", -1)
|
||||
}
|
||||
|
||||
// Get a trimmed address with protocol - useful as address in RPC connections
|
||||
func (u parsedURL) GetTrimmedURL() string {
|
||||
return u.Scheme + "://" + u.GetTrimmedHostWithPath()
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------
|
||||
|
||||
// HTTPClient is a common interface for JSON-RPC HTTP clients.
|
||||
type HTTPClient interface {
|
||||
// Call calls the given method with the params and returns a result.
|
||||
@@ -51,9 +103,12 @@ type JSONRPCCaller interface {
|
||||
//
|
||||
// JSONRPCClient is safe for concurrent use by multiple goroutines.
|
||||
type JSONRPCClient struct {
|
||||
address string
|
||||
client *http.Client
|
||||
cdc *amino.Codec
|
||||
address string
|
||||
username string
|
||||
password string
|
||||
|
||||
client *http.Client
|
||||
cdc *amino.Codec
|
||||
|
||||
mtx sync.Mutex
|
||||
nextReqID int
|
||||
@@ -84,15 +139,23 @@ func NewJSONRPCClientWithHTTPClient(remote string, client *http.Client) (*JSONRP
|
||||
panic("nil http.Client provided")
|
||||
}
|
||||
|
||||
clientAddress, err := toClientAddress(remote)
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid remote %s: %s", remote, err)
|
||||
}
|
||||
|
||||
parsedURL.SetDefaultSchemeHTTP()
|
||||
|
||||
address := parsedURL.GetTrimmedURL()
|
||||
username := parsedURL.User.Username()
|
||||
password, _ := parsedURL.User.Password()
|
||||
|
||||
rpcClient := &JSONRPCClient{
|
||||
address: clientAddress,
|
||||
client: client,
|
||||
cdc: amino.NewCodec(),
|
||||
address: address,
|
||||
username: username,
|
||||
password: password,
|
||||
client: client,
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
|
||||
return rpcClient, nil
|
||||
@@ -114,7 +177,15 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul
|
||||
}
|
||||
|
||||
requestBuf := bytes.NewBuffer(requestBytes)
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", requestBuf)
|
||||
httpRequest, err := http.NewRequest(http.MethodPost, c.address, requestBuf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Request failed")
|
||||
}
|
||||
httpRequest.Header.Set("Content-Type", "text/json")
|
||||
if c.username != "" || c.password != "" {
|
||||
httpRequest.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
httpResponse, err := c.client.Do(httpRequest)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Post failed")
|
||||
}
|
||||
@@ -153,7 +224,15 @@ func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interfa
|
||||
return nil, errors.Wrap(err, "failed to marshal requests")
|
||||
}
|
||||
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes))
|
||||
httpRequest, err := http.NewRequest(http.MethodPost, c.address, bytes.NewBuffer(requestBytes))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Request failed")
|
||||
}
|
||||
httpRequest.Header.Set("Content-Type", "text/json")
|
||||
if c.username != "" || c.password != "" {
|
||||
httpRequest.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
httpResponse, err := c.client.Do(httpRequest)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Post failed")
|
||||
}
|
||||
@@ -256,62 +335,14 @@ func (b *JSONRPCRequestBatch) Call(
|
||||
|
||||
//-------------------------------------------------------------
|
||||
|
||||
// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp")
|
||||
// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "."
|
||||
func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) {
|
||||
protocol, address, err := parseRemoteAddr(remoteAddr)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// protocol to use for http operations, to support both http and https
|
||||
var clientProtocol string
|
||||
// default to http for unknown protocols (ex. tcp)
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS, protoWS, protoWSS:
|
||||
clientProtocol = protocol
|
||||
default:
|
||||
clientProtocol = protoHTTP
|
||||
}
|
||||
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
trimmedAddress := strings.Replace(address, "/", ".", -1)
|
||||
return clientProtocol, trimmedAddress, nil
|
||||
}
|
||||
|
||||
func toClientAddress(remoteAddr string) (string, error) {
|
||||
clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return clientProtocol + "://" + trimmedAddress, nil
|
||||
}
|
||||
|
||||
// network - name of the network (for example, "tcp", "unix")
|
||||
// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80")
|
||||
// TODO: Deprecate support for IP:PORT or /path/to/socket
|
||||
func parseRemoteAddr(remoteAddr string) (network string, s string, err error) {
|
||||
parts := strings.SplitN(remoteAddr, "://", 2)
|
||||
var protocol, address string
|
||||
switch len(parts) {
|
||||
case 1:
|
||||
// default to tcp if nothing specified
|
||||
protocol, address = protoTCP, remoteAddr
|
||||
case 2:
|
||||
protocol, address = parts[0], parts[1]
|
||||
default:
|
||||
return "", "", fmt.Errorf("invalid addr: %s", remoteAddr)
|
||||
}
|
||||
|
||||
return protocol, address, nil
|
||||
}
|
||||
|
||||
func makeHTTPDialer(remoteAddr string) (func(string, string) (net.Conn, error), error) {
|
||||
protocol, address, err := parseRemoteAddr(remoteAddr)
|
||||
u, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
protocol := u.Scheme
|
||||
|
||||
// accept http(s) as an alias for tcp
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS:
|
||||
@@ -319,7 +350,7 @@ func makeHTTPDialer(remoteAddr string) (func(string, string) (net.Conn, error),
|
||||
}
|
||||
|
||||
dialFn := func(proto, addr string) (net.Conn, error) {
|
||||
return net.Dial(protocol, address)
|
||||
return net.Dial(protocol, u.GetHostWithPath())
|
||||
}
|
||||
|
||||
return dialFn, nil
|
||||
|
||||
@@ -7,15 +7,15 @@ import (
|
||||
)
|
||||
|
||||
func TestHTTPClientMakeHTTPDialer(t *testing.T) {
|
||||
remote := []string{"https://foo-bar.com:80", "http://foo-bar.net:80"}
|
||||
remote := []string{"https://foo-bar.com:80", "http://foo-bar.net:80", "https://user:pass@foo-bar.net:80"}
|
||||
|
||||
for _, f := range remote {
|
||||
protocol, address, err := parseRemoteAddr(f)
|
||||
u, err := newParsedURL(f)
|
||||
require.NoError(t, err)
|
||||
dialFn, err := makeHTTPDialer(f)
|
||||
require.Nil(t, err)
|
||||
|
||||
addr, err := dialFn(protocol, address)
|
||||
addr, err := dialFn(u.Scheme, u.GetHostWithPath())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, addr)
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ var _ HTTPClient = (*URIClient)(nil)
|
||||
// An error is returned on invalid remote.
|
||||
// The function panics when remote is nil.
|
||||
func NewURIClient(remote string) (*URIClient, error) {
|
||||
clientAddress, err := toClientAddress(remote)
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -45,8 +45,10 @@ func NewURIClient(remote string) (*URIClient, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
parsedURL.SetDefaultSchemeHTTP()
|
||||
|
||||
uriClient := &URIClient{
|
||||
address: clientAddress,
|
||||
address: parsedURL.GetTrimmedURL(),
|
||||
client: httpClient,
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
|
||||
@@ -86,13 +86,13 @@ type WSClient struct { // nolint: maligned
|
||||
// pong wait time. The endpoint argument must begin with a `/`.
|
||||
// An error is returned on invalid remote. The function panics when remote is nil.
|
||||
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSClient, error) {
|
||||
protocol, addr, err := toClientAddrAndParse(remoteAddr)
|
||||
parsedURL, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// default to ws protocol, unless wss is explicitly specified
|
||||
if protocol != "wss" {
|
||||
protocol = "ws"
|
||||
if parsedURL.Scheme != protoWSS {
|
||||
parsedURL.Scheme = protoWS
|
||||
}
|
||||
|
||||
dialFn, err := makeHTTPDialer(remoteAddr)
|
||||
@@ -102,7 +102,7 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSCl
|
||||
|
||||
c := &WSClient{
|
||||
cdc: amino.NewCodec(),
|
||||
Address: addr,
|
||||
Address: parsedURL.GetTrimmedHostWithPath(),
|
||||
Dialer: dialFn,
|
||||
Endpoint: endpoint,
|
||||
PingPongLatencyTimer: metrics.NewTimer(),
|
||||
@@ -111,7 +111,7 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSCl
|
||||
readWait: defaultReadWait,
|
||||
writeWait: defaultWriteWait,
|
||||
pingPeriod: defaultPingPeriod,
|
||||
protocol: protocol,
|
||||
protocol: parsedURL.Scheme,
|
||||
|
||||
// sentIDs: make(map[types.JSONRPCIntID]bool),
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func TestWSClientReconnectsAfterReadFailure(t *testing.T) {
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
c := startClient(t, "//"+s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(1)
|
||||
@@ -102,7 +102,7 @@ func TestWSClientReconnectsAfterWriteFailure(t *testing.T) {
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
c := startClient(t, "//"+s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(2)
|
||||
@@ -130,7 +130,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
c := startClient(t, "//"+s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
go func() {
|
||||
@@ -179,7 +179,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
||||
func TestNotBlockingOnStop(t *testing.T) {
|
||||
timeout := 2 * time.Second
|
||||
s := httptest.NewServer(&myHandler{})
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
c := startClient(t, "//"+s.Listener.Addr().String())
|
||||
c.Call(context.Background(), "a", make(map[string]interface{}))
|
||||
// Let the readRoutine get around to blocking
|
||||
time.Sleep(time.Second)
|
||||
|
||||
Reference in New Issue
Block a user