mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-14 16:52:49 +00:00
Compare commits
7 Commits
finalizeBl
...
marko/4698
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d2f98996c | ||
|
|
7906da2f54 | ||
|
|
893c2fcbb4 | ||
|
|
2df2ce0db9 | ||
|
|
0fe4ebfbc1 | ||
|
|
8b59eb6b46 | ||
|
|
4d9f573bf3 |
@@ -3,22 +3,23 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
tmos "github.com/tendermint/tendermint/libs/os"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
addr = flag.String("addr", ":26659", "Address of client to connect to")
|
||||
addr = flag.String("addr", "tcp://127.0.0.1:26659", "Address of client to connect to")
|
||||
chainID = flag.String("chain-id", "mychain", "chain id")
|
||||
privValKeyPath = flag.String("priv-key", "", "priv val key file path")
|
||||
privValStatePath = flag.String("priv-state", "", "priv val state file path")
|
||||
withCert = flag.String("cert", "", "absolutepath to server certificate")
|
||||
withKey = flag.String("key", "", "absolutepath to server key")
|
||||
|
||||
logger = log.NewTMLogger(
|
||||
log.NewSyncWriter(os.Stdout),
|
||||
@@ -36,21 +37,18 @@ func main() {
|
||||
|
||||
pv := privval.LoadFilePV(*privValKeyPath, *privValStatePath)
|
||||
|
||||
var dialer privval.SocketDialer
|
||||
protocol, address := tmnet.ProtocolAndAddress(*addr)
|
||||
switch protocol {
|
||||
case "unix":
|
||||
dialer = privval.DialUnixFn(address)
|
||||
case "tcp":
|
||||
connTimeout := 3 * time.Second // TODO
|
||||
dialer = privval.DialTCPFn(address, connTimeout, ed25519.GenPrivKey())
|
||||
default:
|
||||
logger.Error("Unknown protocol", "protocol", protocol)
|
||||
os.Exit(1)
|
||||
opts := []grpc.ServerOption{}
|
||||
if *withCert != "" && *withKey != "" {
|
||||
creds, err := credentials.NewServerTLSFromFile(*withCert, *withKey)
|
||||
if err != nil {
|
||||
logger.Error("Could not load TLS keys:", "err", err)
|
||||
}
|
||||
opts = append(opts, grpc.Creds(creds))
|
||||
} else {
|
||||
logger.Error("You are using an insecure gRPC connection! Provide a certificate and key to connect securely")
|
||||
}
|
||||
|
||||
sd := privval.NewSignerDialerEndpoint(logger, dialer)
|
||||
ss := privval.NewSignerServer(sd, *chainID, pv)
|
||||
ss := privval.NewSignerServer(*addr, *chainID, pv, logger, opts)
|
||||
|
||||
err := ss.Start()
|
||||
if err != nil {
|
||||
|
||||
@@ -46,6 +46,8 @@ var (
|
||||
defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName)
|
||||
defaultPrivValKeyPath = filepath.Join(defaultConfigDir, defaultPrivValKeyName)
|
||||
defaultPrivValStatePath = filepath.Join(defaultDataDir, defaultPrivValStateName)
|
||||
// if a certificate is not provided the privval connection with a remote signer will be insecure
|
||||
defaultPrivValClientCertificate = ""
|
||||
|
||||
defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
|
||||
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)
|
||||
@@ -201,6 +203,10 @@ type BaseConfig struct { //nolint: maligned
|
||||
// connections from an external PrivValidator process
|
||||
PrivValidatorListenAddr string `mapstructure:"priv_validator_laddr"`
|
||||
|
||||
// Path to client certificate file for secure private validator connection.
|
||||
// If a remote validator address is provided but no certificate, the connection will be insecure
|
||||
PrivValidatorClientCertificate string `mapstructure:"priv_validator_client_certificate"`
|
||||
|
||||
// A JSON file containing the private key to use for p2p authenticated encryption
|
||||
NodeKey string `mapstructure:"node_key_file"`
|
||||
|
||||
@@ -218,20 +224,21 @@ type BaseConfig struct { //nolint: maligned
|
||||
// DefaultBaseConfig returns a default base configuration for a Tendermint node
|
||||
func DefaultBaseConfig() BaseConfig {
|
||||
return BaseConfig{
|
||||
Genesis: defaultGenesisJSONPath,
|
||||
PrivValidatorKey: defaultPrivValKeyPath,
|
||||
PrivValidatorState: defaultPrivValStatePath,
|
||||
NodeKey: defaultNodeKeyPath,
|
||||
Moniker: defaultMoniker,
|
||||
ProxyApp: "tcp://127.0.0.1:26658",
|
||||
ABCI: "socket",
|
||||
LogLevel: DefaultPackageLogLevels(),
|
||||
LogFormat: LogFormatPlain,
|
||||
ProfListenAddress: "",
|
||||
FastSyncMode: true,
|
||||
FilterPeers: false,
|
||||
DBBackend: "goleveldb",
|
||||
DBPath: "data",
|
||||
Genesis: defaultGenesisJSONPath,
|
||||
PrivValidatorKey: defaultPrivValKeyPath,
|
||||
PrivValidatorState: defaultPrivValStatePath,
|
||||
PrivValidatorClientCertificate: defaultPrivValClientCertificate,
|
||||
NodeKey: defaultNodeKeyPath,
|
||||
Moniker: defaultMoniker,
|
||||
ProxyApp: "tcp://127.0.0.1:26658",
|
||||
ABCI: "socket",
|
||||
LogLevel: DefaultPackageLogLevels(),
|
||||
LogFormat: LogFormatPlain,
|
||||
ProfListenAddress: "",
|
||||
FastSyncMode: true,
|
||||
FilterPeers: false,
|
||||
DBBackend: "goleveldb",
|
||||
DBPath: "data",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -130,6 +130,10 @@ priv_validator_state_file = "{{ js .BaseConfig.PrivValidatorState }}"
|
||||
# connections from an external PrivValidator process
|
||||
priv_validator_laddr = "{{ .BaseConfig.PrivValidatorListenAddr }}"
|
||||
|
||||
# Path to client certificate file for secure private validator connection.
|
||||
# If a remote validator address is provided but no certificate, the connection will be insecure
|
||||
priv_validator_client_certificate = "{{ js .BaseConfig.PrivValidatorClientCertificate }}"
|
||||
|
||||
# Path to the JSON file containing the private key to use for node authentication in the p2p protocol
|
||||
node_key_file = "{{ js .BaseConfig.NodeKey }}"
|
||||
|
||||
|
||||
2
go.mod
2
go.mod
@@ -13,12 +13,14 @@ require (
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
github.com/golang/protobuf v1.4.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
|
||||
github.com/gtank/merlin v0.1.1
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/magiconair/properties v1.8.1
|
||||
github.com/minio/highwayhash v1.0.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.6.0
|
||||
github.com/prometheus/common v0.9.1
|
||||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
|
||||
github.com/rs/cors v1.7.0
|
||||
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
|
||||
|
||||
5
go.sum
5
go.sum
@@ -28,8 +28,10 @@ github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
|
||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
@@ -199,6 +201,7 @@ github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoA
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
|
||||
@@ -399,6 +402,7 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
@@ -650,6 +654,7 @@ google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zim
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
36
node/node.go
36
node/node.go
@@ -8,7 +8,6 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -662,7 +661,7 @@ func NewNode(config *cfg.Config,
|
||||
// external signing process.
|
||||
if config.PrivValidatorListenAddr != "" {
|
||||
// FIXME: we should start services inside OnStart
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, config.PrivValidatorClientCertificate, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator socket client: %w", err)
|
||||
}
|
||||
@@ -1312,14 +1311,13 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
|
||||
|
||||
func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr string,
|
||||
cert string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
pve, err := privval.NewSignerListener(listenAddr, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
pvsc, err := privval.NewSignerClient(pve)
|
||||
dialOptions := ConstructDialOptions(cert)
|
||||
|
||||
pvsc, err := privval.NewSignerClient(listenAddr, dialOptions, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
@@ -1334,28 +1332,6 @@ func createAndStartPrivValidatorSocketClient(
|
||||
retries = 50 // 50 * 100ms = 5s total
|
||||
timeout = 100 * time.Millisecond
|
||||
)
|
||||
pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)
|
||||
|
||||
return pvscWithRetries, nil
|
||||
}
|
||||
|
||||
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
||||
// slice of the string s with all leading and trailing Unicode code points
|
||||
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
|
||||
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
|
||||
// -1. also filter out empty strings, only return non-empty strings.
|
||||
func splitAndTrimEmpty(s, sep, cutset string) []string {
|
||||
if s == "" {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
spl := strings.Split(s, sep)
|
||||
nonEmptyStrings := make([]string, 0, len(spl))
|
||||
for i := 0; i < len(spl); i++ {
|
||||
element := strings.Trim(spl[i], cutset)
|
||||
if element != "" {
|
||||
nonEmptyStrings = append(nonEmptyStrings, element)
|
||||
}
|
||||
}
|
||||
return nonEmptyStrings
|
||||
return pvsc, nil
|
||||
}
|
||||
|
||||
@@ -73,25 +73,6 @@ func TestNodeStartStop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitAndTrimEmpty(t *testing.T) {
|
||||
testCases := []struct {
|
||||
s string
|
||||
sep string
|
||||
cutset string
|
||||
expected []string
|
||||
}{
|
||||
{"a,b,c", ",", " ", []string{"a", "b", "c"}},
|
||||
{" a , b , c ", ",", " ", []string{"a", "b", "c"}},
|
||||
{" a, b, c ", ",", " ", []string{"a", "b", "c"}},
|
||||
{" a, ", ",", " ", []string{"a"}},
|
||||
{" ", ",", " ", []string{}},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
assert.Equal(t, tc.expected, splitAndTrimEmpty(tc.s, tc.sep, tc.cutset), "%s", tc.s)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeDelayedStart(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_delayed_start_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
78
node/utils.go
Normal file
78
node/utils.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||
"github.com/prometheus/common/log"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
||||
// slice of the string s with all leading and trailing Unicode code points
|
||||
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
|
||||
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
|
||||
// -1. also filter out empty strings, only return non-empty strings.
|
||||
func splitAndTrimEmpty(s, sep, cutset string) []string {
|
||||
if s == "" {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
spl := strings.Split(s, sep)
|
||||
nonEmptyStrings := make([]string, 0, len(spl))
|
||||
for i := 0; i < len(spl); i++ {
|
||||
element := strings.Trim(spl[i], cutset)
|
||||
if element != "" {
|
||||
nonEmptyStrings = append(nonEmptyStrings, element)
|
||||
}
|
||||
}
|
||||
return nonEmptyStrings
|
||||
}
|
||||
|
||||
// ConstructDialOptions constructs a list of grpc dial options
|
||||
func ConstructDialOptions(
|
||||
withCert string,
|
||||
extraOpts ...grpc.DialOption,
|
||||
) []grpc.DialOption {
|
||||
var transportSecurity grpc.DialOption
|
||||
if withCert != "" {
|
||||
creds, err := credentials.NewClientTLSFromFile(withCert, "")
|
||||
if err != nil {
|
||||
log.Errorf("Could not get valid credentials: %v", err)
|
||||
return nil
|
||||
}
|
||||
transportSecurity = grpc.WithTransportCredentials(creds)
|
||||
} else {
|
||||
transportSecurity = grpc.WithInsecure()
|
||||
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
|
||||
}
|
||||
|
||||
const (
|
||||
retries = 50 // 50 * 100ms = 5s total
|
||||
timeout = 100 * time.Millisecond
|
||||
maxCallRecvMsgSize = 10 << 20 // Default 10Mb
|
||||
)
|
||||
|
||||
opts := []grpc_retry.CallOption{
|
||||
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(timeout)),
|
||||
}
|
||||
|
||||
dialOpts := []grpc.DialOption{
|
||||
transportSecurity,
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
|
||||
grpc_retry.WithMax(retries),
|
||||
),
|
||||
grpc.WithUnaryInterceptor(
|
||||
grpc_retry.UnaryClientInterceptor(opts...),
|
||||
),
|
||||
}
|
||||
|
||||
for _, opt := range extraOpts {
|
||||
dialOpts = append(dialOpts, opt)
|
||||
}
|
||||
|
||||
return dialOpts
|
||||
}
|
||||
26
node/utils_test.go
Normal file
26
node/utils_test.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestSplitAndTrimEmpty(t *testing.T) {
|
||||
testCases := []struct {
|
||||
s string
|
||||
sep string
|
||||
cutset string
|
||||
expected []string
|
||||
}{
|
||||
{"a,b,c", ",", " ", []string{"a", "b", "c"}},
|
||||
{" a , b , c ", ",", " ", []string{"a", "b", "c"}},
|
||||
{" a, b, c ", ",", " ", []string{"a", "b", "c"}},
|
||||
{" a, ", ",", " ", []string{"a"}},
|
||||
{" ", ",", " ", []string{}},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
assert.Equal(t, tc.expected, splitAndTrimEmpty(tc.s, tc.sep, tc.cutset), "%s", tc.s)
|
||||
}
|
||||
}
|
||||
111
privval/client.go
Normal file
111
privval/client.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cryptoenc "github.com/tendermint/tendermint/crypto/encoding"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
tmproto "github.com/tendermint/tendermint/proto/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// SignerClient implements PrivValidator.
|
||||
// Handles remote validator connections that provide signing services
|
||||
type SignerClient struct {
|
||||
ctx context.Context
|
||||
privValidator privvalproto.PrivValidatorAPIClient
|
||||
conn *grpc.ClientConn
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
var _ types.PrivValidator = (*SignerClient)(nil)
|
||||
|
||||
// NewSignerClient returns an instance of SignerClient.
|
||||
// it will start the endpoint (if not already started)
|
||||
func NewSignerClient(target string,
|
||||
opts []grpc.DialOption, log log.Logger) (*SignerClient, error) {
|
||||
if target == "" {
|
||||
return nil, fmt.Errorf("target connection parameter missing. endpoint %s", target)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, target, opts...)
|
||||
if err != nil {
|
||||
log.Error("unable to connect to client.", "target", target, "err", err)
|
||||
}
|
||||
|
||||
sc := &SignerClient{
|
||||
ctx: ctx,
|
||||
privValidator: privvalproto.NewPrivValidatorAPIClient(conn), // Create the Private Validator Client
|
||||
logger: log,
|
||||
}
|
||||
|
||||
return sc, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying connection
|
||||
func (sc *SignerClient) Close() error {
|
||||
sc.logger.Info("Stopping service")
|
||||
if sc.conn != nil {
|
||||
return sc.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
// GetPubKey retrieves a public key from a remote signer
|
||||
// returns an error if client is not able to provide the key
|
||||
func (sc *SignerClient) GetPubKey() (crypto.PubKey, error) {
|
||||
resp, err := sc.privValidator.GetPubKey(sc.ctx, &privvalproto.PubKeyRequest{})
|
||||
if err != nil {
|
||||
errStatus, _ := status.FromError(err)
|
||||
sc.logger.Error("SignerClient::GetPubKey", "err", errStatus.Message())
|
||||
return nil, fmt.Errorf("send GetPubKey request: %w", errStatus.Err())
|
||||
}
|
||||
|
||||
pk, err := cryptoenc.PubKeyFromProto(*resp.PubKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
// SignVote requests a remote signer to sign a vote
|
||||
func (sc *SignerClient) SignVote(chainID string, vote *tmproto.Vote) error {
|
||||
resp, err := sc.privValidator.SignVote(sc.ctx, &privvalproto.SignVoteRequest{ChainId: chainID, Vote: vote})
|
||||
if err != nil {
|
||||
errStatus, _ := status.FromError(err)
|
||||
sc.logger.Error("Client SignVote", "err", errStatus.Message())
|
||||
return fmt.Errorf("send SignVote request: %w", errStatus.Err())
|
||||
}
|
||||
|
||||
*vote = *resp.Vote
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignProposal requests a remote signer to sign a proposal
|
||||
func (sc *SignerClient) SignProposal(chainID string, proposal *tmproto.Proposal) error {
|
||||
resp, err := sc.privValidator.SignProposal(
|
||||
sc.ctx, &privvalproto.SignProposalRequest{ChainId: chainID, Proposal: proposal})
|
||||
|
||||
if err != nil {
|
||||
errStatus, _ := status.FromError(err)
|
||||
sc.logger.Error("SignerClient::SignProposal", "err", errStatus.Message())
|
||||
return fmt.Errorf("send SignProposal request: %w", errStatus.Err())
|
||||
}
|
||||
|
||||
*proposal = *resp.Proposal
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
)
|
||||
|
||||
// TODO: Add ChainIDRequest
|
||||
|
||||
func mustWrapMsg(pb proto.Message) privvalproto.Message {
|
||||
msg := privvalproto.Message{}
|
||||
|
||||
switch pb := pb.(type) {
|
||||
case *privvalproto.Message:
|
||||
msg = *pb
|
||||
case *privvalproto.PubKeyRequest:
|
||||
msg.Sum = &privvalproto.Message_PubKeyRequest{PubKeyRequest: pb}
|
||||
case *privvalproto.PubKeyResponse:
|
||||
msg.Sum = &privvalproto.Message_PubKeyResponse{PubKeyResponse: pb}
|
||||
case *privvalproto.SignVoteRequest:
|
||||
msg.Sum = &privvalproto.Message_SignVoteRequest{SignVoteRequest: pb}
|
||||
case *privvalproto.SignedVoteResponse:
|
||||
msg.Sum = &privvalproto.Message_SignedVoteResponse{SignedVoteResponse: pb}
|
||||
case *privvalproto.SignedProposalResponse:
|
||||
msg.Sum = &privvalproto.Message_SignedProposalResponse{SignedProposalResponse: pb}
|
||||
case *privvalproto.SignProposalRequest:
|
||||
msg.Sum = &privvalproto.Message_SignProposalRequest{SignProposalRequest: pb}
|
||||
case *privvalproto.PingRequest:
|
||||
msg.Sum = &privvalproto.Message_PingRequest{}
|
||||
case *privvalproto.PingResponse:
|
||||
msg.Sum = &privvalproto.Message_PingResponse{}
|
||||
default:
|
||||
panic(fmt.Errorf("unknown message type %T", msg))
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
@@ -1,84 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
tmproto "github.com/tendermint/tendermint/proto/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// RetrySignerClient wraps SignerClient adding retry for each operation (except
|
||||
// Ping) w/ a timeout.
|
||||
type RetrySignerClient struct {
|
||||
next *SignerClient
|
||||
retries int
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewRetrySignerClient returns RetrySignerClient. If +retries+ is 0, the
|
||||
// client will be retrying each operation indefinitely.
|
||||
func NewRetrySignerClient(sc *SignerClient, retries int, timeout time.Duration) *RetrySignerClient {
|
||||
return &RetrySignerClient{sc, retries, timeout}
|
||||
}
|
||||
|
||||
var _ types.PrivValidator = (*RetrySignerClient)(nil)
|
||||
|
||||
func (sc *RetrySignerClient) Close() error {
|
||||
return sc.next.Close()
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) IsConnected() bool {
|
||||
return sc.next.IsConnected()
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) WaitForConnection(maxWait time.Duration) error {
|
||||
return sc.next.WaitForConnection(maxWait)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
func (sc *RetrySignerClient) Ping() error {
|
||||
return sc.next.Ping()
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) GetPubKey() (crypto.PubKey, error) {
|
||||
var (
|
||||
pk crypto.PubKey
|
||||
err error
|
||||
)
|
||||
for i := 0; i < sc.retries || sc.retries == 0; i++ {
|
||||
pk, err = sc.next.GetPubKey()
|
||||
if err == nil {
|
||||
return pk, nil
|
||||
}
|
||||
time.Sleep(sc.timeout)
|
||||
}
|
||||
return nil, fmt.Errorf("exhausted all attempts to get pubkey: %w", err)
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) SignVote(chainID string, vote *tmproto.Vote) error {
|
||||
var err error
|
||||
for i := 0; i < sc.retries || sc.retries == 0; i++ {
|
||||
err = sc.next.SignVote(chainID, vote)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(sc.timeout)
|
||||
}
|
||||
return fmt.Errorf("exhausted all attempts to sign vote: %w", err)
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) SignProposal(chainID string, proposal *tmproto.Proposal) error {
|
||||
var err error
|
||||
for i := 0; i < sc.retries || sc.retries == 0; i++ {
|
||||
err = sc.next.SignProposal(chainID, proposal)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(sc.timeout)
|
||||
}
|
||||
return fmt.Errorf("exhausted all attempts to sign proposal: %w", err)
|
||||
}
|
||||
114
privval/server.go
Normal file
114
privval/server.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"net"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cryptoenc "github.com/tendermint/tendermint/crypto/encoding"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
type SignerServer struct {
|
||||
service.BaseService
|
||||
Logger log.Logger
|
||||
|
||||
target string
|
||||
ChainID string
|
||||
PrivVal types.PrivValidator
|
||||
Opts []grpc.ServerOption
|
||||
Srv *grpc.Server
|
||||
}
|
||||
|
||||
func NewSignerServer(target string, chainID string, privVal types.PrivValidator, log log.Logger, opts []grpc.ServerOption) *SignerServer {
|
||||
return &SignerServer{
|
||||
Logger: log,
|
||||
|
||||
target: target,
|
||||
ChainID: chainID,
|
||||
Opts: opts,
|
||||
PrivVal: privVal,
|
||||
}
|
||||
}
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (ss *SignerServer) OnStart() error {
|
||||
protocol, address := tmnet.ProtocolAndAddress(ss.target)
|
||||
lis, err := net.Listen(protocol, address)
|
||||
if err != nil {
|
||||
ss.Logger.Error("failed to listen: ", "err", err)
|
||||
}
|
||||
s := grpc.NewServer(ss.Opts...)
|
||||
ss.Srv = s
|
||||
|
||||
privvalproto.RegisterPrivValidatorAPIServer(ss.Srv, &SignerServer{})
|
||||
|
||||
if err := ss.Srv.Serve(lis); err != nil {
|
||||
ss.Logger.Error("failed to serve:", "err", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service.
|
||||
func (ss *SignerServer) OnStop() {
|
||||
ss.Logger.Debug("SignerServer: OnStop calling Close")
|
||||
ss.Srv.GracefulStop()
|
||||
}
|
||||
|
||||
var _ privvalproto.PrivValidatorAPIServer = (*SignerServer)(nil)
|
||||
|
||||
// PubKey receives a request for the pubkey
|
||||
// returns the pubkey on success and error on failure
|
||||
func (ss *SignerServer) GetPubKey(ctx context.Context, req *privvalproto.PubKeyRequest) (
|
||||
*privvalproto.PubKeyResponse, error) {
|
||||
var pubKey crypto.PubKey
|
||||
|
||||
pubKey, err := ss.PrivVal.GetPubKey()
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "error getting pubkey: %v", err)
|
||||
}
|
||||
|
||||
pk, err := cryptoenc.PubKeyToProto(pubKey)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "error transistioning pubkey to proto: %v", err)
|
||||
}
|
||||
|
||||
return &privvalproto.PubKeyResponse{PubKey: &pk}, nil
|
||||
}
|
||||
|
||||
// SignVote receives a vote sign requests, attempts to sign it
|
||||
// returns SignedVoteResponse on success and error on failure
|
||||
func (ss *SignerServer) SignVote(ctx context.Context, req *privvalproto.SignVoteRequest) (
|
||||
*privvalproto.SignedVoteResponse, error) {
|
||||
vote := req.Vote
|
||||
|
||||
err := ss.PrivVal.SignVote(req.ChainId, vote)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "error signing vote: %v", err)
|
||||
}
|
||||
|
||||
return &privvalproto.SignedVoteResponse{Vote: vote}, nil
|
||||
}
|
||||
|
||||
// SignProposal receives a proposal sign requests, attempts to sign it
|
||||
// returns SignedProposalResponse on success and error on failure
|
||||
func (ss *SignerServer) SignProposal(ctx context.Context, req *privvalproto.SignProposalRequest) (
|
||||
*privvalproto.SignedProposalResponse, error) {
|
||||
proposal := req.Proposal
|
||||
|
||||
err := ss.PrivVal.SignProposal(req.ChainId, proposal)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "error signing proposal: %v", err)
|
||||
}
|
||||
|
||||
return &privvalproto.SignedProposalResponse{Proposal: proposal}, nil
|
||||
}
|
||||
1
privval/server_test.go
Normal file
1
privval/server_test.go
Normal file
@@ -0,0 +1 @@
|
||||
package privval
|
||||
@@ -1,144 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cryptoenc "github.com/tendermint/tendermint/crypto/encoding"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
tmproto "github.com/tendermint/tendermint/proto/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// SignerClient implements PrivValidator.
|
||||
// Handles remote validator connections that provide signing services
|
||||
type SignerClient struct {
|
||||
endpoint *SignerListenerEndpoint
|
||||
}
|
||||
|
||||
var _ types.PrivValidator = (*SignerClient)(nil)
|
||||
|
||||
// NewSignerClient returns an instance of SignerClient.
|
||||
// it will start the endpoint (if not already started)
|
||||
func NewSignerClient(endpoint *SignerListenerEndpoint) (*SignerClient, error) {
|
||||
if !endpoint.IsRunning() {
|
||||
if err := endpoint.Start(); err != nil {
|
||||
return nil, fmt.Errorf("failed to start listener endpoint: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &SignerClient{endpoint: endpoint}, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying connection
|
||||
func (sc *SignerClient) Close() error {
|
||||
return sc.endpoint.Close()
|
||||
}
|
||||
|
||||
// IsConnected indicates with the signer is connected to a remote signing service
|
||||
func (sc *SignerClient) IsConnected() bool {
|
||||
return sc.endpoint.IsConnected()
|
||||
}
|
||||
|
||||
// WaitForConnection waits maxWait for a connection or returns a timeout error
|
||||
func (sc *SignerClient) WaitForConnection(maxWait time.Duration) error {
|
||||
return sc.endpoint.WaitForConnection(maxWait)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
// Ping sends a ping request to the remote signer
|
||||
func (sc *SignerClient) Ping() error {
|
||||
response, err := sc.endpoint.SendRequest(mustWrapMsg(&privvalproto.PingRequest{}))
|
||||
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::Ping", "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
pb := response.GetPingResponse()
|
||||
if pb == nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::Ping", "err", "response != PingResponse")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPubKey retrieves a public key from a remote signer
|
||||
// returns an error if client is not able to provide the key
|
||||
func (sc *SignerClient) GetPubKey() (crypto.PubKey, error) {
|
||||
response, err := sc.endpoint.SendRequest(mustWrapMsg(&privvalproto.PubKeyRequest{}))
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", err)
|
||||
return nil, fmt.Errorf("send: %w", err)
|
||||
}
|
||||
|
||||
pubKeyResp := response.GetPubKeyResponse()
|
||||
if pubKeyResp == nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != PubKeyResponse")
|
||||
return nil, fmt.Errorf("unexpected response type %T", response)
|
||||
}
|
||||
|
||||
if pubKeyResp.Error != nil {
|
||||
sc.endpoint.Logger.Error("failed to get private validator's public key", "err", pubKeyResp.Error)
|
||||
return nil, fmt.Errorf("remote error: %w", errors.New(pubKeyResp.Error.Description))
|
||||
}
|
||||
|
||||
pk, err := cryptoenc.PubKeyFromProto(*pubKeyResp.PubKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
// SignVote requests a remote signer to sign a vote
|
||||
func (sc *SignerClient) SignVote(chainID string, vote *tmproto.Vote) error {
|
||||
|
||||
response, err := sc.endpoint.SendRequest(mustWrapMsg(&privvalproto.SignVoteRequest{Vote: vote}))
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::SignVote", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
resp := response.GetSignedVoteResponse()
|
||||
if resp == nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != SignedVoteResponse")
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
|
||||
if resp.Error != nil {
|
||||
return &RemoteSignerError{Code: int(resp.Error.Code), Description: resp.Error.Description}
|
||||
}
|
||||
|
||||
*vote = *resp.Vote
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignProposal requests a remote signer to sign a proposal
|
||||
func (sc *SignerClient) SignProposal(chainID string, proposal *tmproto.Proposal) error {
|
||||
|
||||
response, err := sc.endpoint.SendRequest(mustWrapMsg(&privvalproto.SignProposalRequest{Proposal: *proposal}))
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
resp := response.GetSignedProposalResponse()
|
||||
if resp == nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", "response != SignedProposalResponse")
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return &RemoteSignerError{Code: int(resp.Error.Code), Description: resp.Error.Description}
|
||||
}
|
||||
|
||||
*proposal = *resp.Proposal
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,357 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
tmproto "github.com/tendermint/tendermint/proto/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
type signerTestCase struct {
|
||||
chainID string
|
||||
mockPV types.PrivValidator
|
||||
signerClient *SignerClient
|
||||
signerServer *SignerServer
|
||||
}
|
||||
|
||||
func getSignerTestCases(t *testing.T) []signerTestCase {
|
||||
testCases := make([]signerTestCase, 0)
|
||||
|
||||
// Get test cases for each possible dialer (DialTCP / DialUnix / etc)
|
||||
for _, dtc := range getDialerTestCases(t) {
|
||||
chainID := tmrand.Str(12)
|
||||
mockPV := types.NewMockPV()
|
||||
|
||||
// get a pair of signer listener, signer dialer endpoints
|
||||
sl, sd := getMockEndpoints(t, dtc.addr, dtc.dialer)
|
||||
sc, err := NewSignerClient(sl)
|
||||
require.NoError(t, err)
|
||||
ss := NewSignerServer(sd, chainID, mockPV)
|
||||
|
||||
err = ss.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
tc := signerTestCase{
|
||||
chainID: chainID,
|
||||
mockPV: mockPV,
|
||||
signerClient: sc,
|
||||
signerServer: ss,
|
||||
}
|
||||
|
||||
testCases = append(testCases, tc)
|
||||
}
|
||||
|
||||
return testCases
|
||||
}
|
||||
|
||||
func TestSignerClose(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
err := tc.signerClient.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = tc.signerServer.Stop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerPing(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
err := tc.signerClient.Ping()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerGetPubKey(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
pubKey, err := tc.signerClient.GetPubKey()
|
||||
require.NoError(t, err)
|
||||
expectedPubKey, err := tc.mockPV.GetPubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, expectedPubKey, pubKey)
|
||||
|
||||
pubKey, err = tc.signerClient.GetPubKey()
|
||||
require.NoError(t, err)
|
||||
expectedpk, err := tc.mockPV.GetPubKey()
|
||||
require.NoError(t, err)
|
||||
expectedAddr := expectedpk.Address()
|
||||
|
||||
assert.Equal(t, expectedAddr, pubKey.Address())
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerProposal(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
hash := tmrand.Bytes(tmhash.Size)
|
||||
have := &types.Proposal{
|
||||
Type: tmproto.ProposalType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
POLRound: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
}
|
||||
want := &types.Proposal{
|
||||
Type: tmproto.ProposalType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
POLRound: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
require.NoError(t, tc.mockPV.SignProposal(tc.chainID, want.ToProto()))
|
||||
require.NoError(t, tc.signerClient.SignProposal(tc.chainID, have.ToProto()))
|
||||
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerVote(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
hash := tmrand.Bytes(tmhash.Size)
|
||||
valAddr := tmrand.Bytes(crypto.AddressSize)
|
||||
want := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
}
|
||||
|
||||
have := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want.ToProto()))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have.ToProto()))
|
||||
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerVoteResetDeadline(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
hash := tmrand.Bytes(tmhash.Size)
|
||||
valAddr := tmrand.Bytes(crypto.AddressSize)
|
||||
want := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
}
|
||||
|
||||
have := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
time.Sleep(testTimeoutReadWrite2o3)
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want.ToProto()))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have.ToProto()))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
|
||||
// TODO(jleni): Clarify what is actually being tested
|
||||
|
||||
// This would exceed the deadline if it was not extended by the previous message
|
||||
time.Sleep(testTimeoutReadWrite2o3)
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want.ToProto()))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have.ToProto()))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerVoteKeepAlive(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
hash := tmrand.Bytes(tmhash.Size)
|
||||
valAddr := tmrand.Bytes(crypto.AddressSize)
|
||||
want := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
}
|
||||
|
||||
have := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
// Check that even if the client does not request a
|
||||
// signature for a long time. The service is still available
|
||||
|
||||
// in this particular case, we use the dialer logger to ensure that
|
||||
// test messages are properly interleaved in the test logs
|
||||
tc.signerServer.Logger.Debug("TEST: Forced Wait -------------------------------------------------")
|
||||
time.Sleep(testTimeoutReadWrite * 3)
|
||||
tc.signerServer.Logger.Debug("TEST: Forced Wait DONE---------------------------------------------")
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want.ToProto()))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have.ToProto()))
|
||||
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerSignProposalErrors(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
// Replace service with a mock that always fails
|
||||
tc.signerServer.privVal = types.NewErroringMockPV()
|
||||
tc.mockPV = types.NewErroringMockPV()
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
ts := time.Now()
|
||||
hash := tmrand.Bytes(tmhash.Size)
|
||||
proposal := &types.Proposal{
|
||||
Type: tmproto.ProposalType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
POLRound: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
Signature: []byte("signature"),
|
||||
}
|
||||
|
||||
err := tc.signerClient.SignProposal(tc.chainID, proposal.ToProto())
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = tc.mockPV.SignProposal(tc.chainID, proposal.ToProto())
|
||||
require.Error(t, err)
|
||||
|
||||
err = tc.signerClient.SignProposal(tc.chainID, proposal.ToProto())
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerSignVoteErrors(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
hash := tmrand.Bytes(tmhash.Size)
|
||||
valAddr := tmrand.Bytes(crypto.AddressSize)
|
||||
vote := &types.Vote{
|
||||
Type: tmproto.PrecommitType,
|
||||
Height: 1,
|
||||
Round: 2,
|
||||
BlockID: types.BlockID{Hash: hash, PartsHeader: types.PartSetHeader{Hash: hash, Total: 2}},
|
||||
Timestamp: ts,
|
||||
ValidatorAddress: valAddr,
|
||||
ValidatorIndex: 1,
|
||||
Signature: []byte("signature"),
|
||||
}
|
||||
|
||||
// Replace signer service privval with one that always fails
|
||||
tc.signerServer.privVal = types.NewErroringMockPV()
|
||||
tc.mockPV = types.NewErroringMockPV()
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
err := tc.signerClient.SignVote(tc.chainID, vote.ToProto())
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = tc.mockPV.SignVote(tc.chainID, vote.ToProto())
|
||||
require.Error(t, err)
|
||||
|
||||
err = tc.signerClient.SignVote(tc.chainID, vote.ToProto())
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func brokenHandler(privVal types.PrivValidator, request privvalproto.Message,
|
||||
chainID string) (privvalproto.Message, error) {
|
||||
var res privvalproto.Message
|
||||
var err error
|
||||
|
||||
switch r := request.Sum.(type) {
|
||||
// This is broken and will answer most requests with a pubkey response
|
||||
case *privvalproto.Message_PubKeyRequest:
|
||||
res = mustWrapMsg(&privvalproto.PubKeyResponse{PubKey: nil, Error: nil})
|
||||
case *privvalproto.Message_SignVoteRequest:
|
||||
res = mustWrapMsg(&privvalproto.PubKeyResponse{PubKey: nil, Error: nil})
|
||||
case *privvalproto.Message_SignProposalRequest:
|
||||
res = mustWrapMsg(&privvalproto.PubKeyResponse{PubKey: nil, Error: nil})
|
||||
case *privvalproto.Message_PingRequest:
|
||||
err, res = nil, mustWrapMsg(&privvalproto.PingResponse{})
|
||||
default:
|
||||
err = fmt.Errorf("unknown msg: %v", r)
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func TestSignerUnexpectedResponse(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
tc.signerServer.privVal = types.NewMockPV()
|
||||
tc.mockPV = types.NewMockPV()
|
||||
|
||||
tc.signerServer.SetRequestHandler(brokenHandler)
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
ts := time.Now()
|
||||
want := &types.Vote{Timestamp: ts, Type: tmproto.PrecommitType}
|
||||
|
||||
e := tc.signerClient.SignVote(tc.chainID, want.ToProto())
|
||||
assert.EqualError(t, e, "received unexpected response")
|
||||
}
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxDialRetries = 10
|
||||
defaultRetryWaitMilliseconds = 100
|
||||
)
|
||||
|
||||
// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint.
|
||||
type SignerServiceEndpointOption func(*SignerDialerEndpoint)
|
||||
|
||||
// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
|
||||
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
|
||||
}
|
||||
|
||||
// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection.
|
||||
func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption {
|
||||
return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries }
|
||||
}
|
||||
|
||||
// SignerDialerEndpointRetryWaitInterval sets the retry wait interval to a custom value
|
||||
func SignerDialerEndpointRetryWaitInterval(interval time.Duration) SignerServiceEndpointOption {
|
||||
return func(ss *SignerDialerEndpoint) { ss.retryWait = interval }
|
||||
}
|
||||
|
||||
// SignerDialerEndpoint dials using its dialer and responds to any
|
||||
// signature requests using its privVal.
|
||||
type SignerDialerEndpoint struct {
|
||||
signerEndpoint
|
||||
|
||||
dialer SocketDialer
|
||||
|
||||
retryWait time.Duration
|
||||
maxConnRetries int
|
||||
}
|
||||
|
||||
// NewSignerDialerEndpoint returns a SignerDialerEndpoint that will dial using the given
|
||||
// dialer and respond to any signature requests over the connection
|
||||
// using the given privVal.
|
||||
func NewSignerDialerEndpoint(
|
||||
logger log.Logger,
|
||||
dialer SocketDialer,
|
||||
) *SignerDialerEndpoint {
|
||||
|
||||
sd := &SignerDialerEndpoint{
|
||||
dialer: dialer,
|
||||
retryWait: defaultRetryWaitMilliseconds * time.Millisecond,
|
||||
maxConnRetries: defaultMaxDialRetries,
|
||||
}
|
||||
|
||||
sd.BaseService = *service.NewBaseService(logger, "SignerDialerEndpoint", sd)
|
||||
sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
|
||||
|
||||
return sd
|
||||
}
|
||||
|
||||
func (sd *SignerDialerEndpoint) ensureConnection() error {
|
||||
if sd.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
|
||||
retries := 0
|
||||
for retries < sd.maxConnRetries {
|
||||
conn, err := sd.dialer()
|
||||
|
||||
if err != nil {
|
||||
retries++
|
||||
sd.Logger.Debug("SignerDialer: Reconnection failed", "retries", retries, "max", sd.maxConnRetries, "err", err)
|
||||
// Wait between retries
|
||||
time.Sleep(sd.retryWait)
|
||||
} else {
|
||||
sd.SetConnection(conn)
|
||||
sd.Logger.Debug("SignerDialer: Connection Ready")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
sd.Logger.Debug("SignerDialer: Max retries exceeded", "retries", retries, "max", sd.maxConnRetries)
|
||||
|
||||
return ErrNoConnection
|
||||
}
|
||||
@@ -1,156 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/protoio"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTimeoutReadWriteSeconds = 3
|
||||
)
|
||||
|
||||
type signerEndpoint struct {
|
||||
service.BaseService
|
||||
|
||||
connMtx sync.Mutex
|
||||
conn net.Conn
|
||||
|
||||
timeoutReadWrite time.Duration
|
||||
}
|
||||
|
||||
// Close closes the underlying net.Conn.
|
||||
func (se *signerEndpoint) Close() error {
|
||||
se.DropConnection()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsConnected indicates if there is an active connection
|
||||
func (se *signerEndpoint) IsConnected() bool {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
return se.isConnected()
|
||||
}
|
||||
|
||||
// TryGetConnection retrieves a connection if it is already available
|
||||
func (se *signerEndpoint) GetAvailableConnection(connectionAvailableCh chan net.Conn) bool {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
// Is there a connection ready?
|
||||
select {
|
||||
case se.conn = <-connectionAvailableCh:
|
||||
return true
|
||||
default:
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// TryGetConnection retrieves a connection if it is already available
|
||||
func (se *signerEndpoint) WaitConnection(connectionAvailableCh chan net.Conn, maxWait time.Duration) error {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
select {
|
||||
case se.conn = <-connectionAvailableCh:
|
||||
case <-time.After(maxWait):
|
||||
return ErrConnectionTimeout
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetConnection replaces the current connection object
|
||||
func (se *signerEndpoint) SetConnection(newConnection net.Conn) {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
se.conn = newConnection
|
||||
}
|
||||
|
||||
// IsConnected indicates if there is an active connection
|
||||
func (se *signerEndpoint) DropConnection() {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
se.dropConnection()
|
||||
}
|
||||
|
||||
// ReadMessage reads a message from the endpoint
|
||||
func (se *signerEndpoint) ReadMessage() (msg privvalproto.Message, err error) {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
if !se.isConnected() {
|
||||
return msg, fmt.Errorf("endpoint is not connected: %w", ErrNoConnection)
|
||||
}
|
||||
// Reset read deadline
|
||||
deadline := time.Now().Add(se.timeoutReadWrite)
|
||||
|
||||
err = se.conn.SetReadDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
const maxRemoteSignerMsgSize = 1024 * 10
|
||||
protoReader := protoio.NewDelimitedReader(se.conn, maxRemoteSignerMsgSize)
|
||||
err = protoReader.ReadMsg(&msg)
|
||||
if _, ok := err.(timeoutError); ok {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("%v: %w", err, ErrReadTimeout)
|
||||
} else {
|
||||
err = fmt.Errorf("empty error: %w", ErrReadTimeout)
|
||||
}
|
||||
|
||||
se.Logger.Debug("Dropping [read]", "obj", se)
|
||||
se.dropConnection()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// WriteMessage writes a message from the endpoint
|
||||
func (se *signerEndpoint) WriteMessage(msg privvalproto.Message) (err error) {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
if !se.isConnected() {
|
||||
return fmt.Errorf("endpoint is not connected: %w", ErrNoConnection)
|
||||
}
|
||||
|
||||
protoWriter := protoio.NewDelimitedWriter(se.conn)
|
||||
|
||||
// Reset read deadline
|
||||
deadline := time.Now().Add(se.timeoutReadWrite)
|
||||
err = se.conn.SetWriteDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = protoWriter.WriteMsg(&msg)
|
||||
if _, ok := err.(timeoutError); ok {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("%v: %w", err, ErrWriteTimeout)
|
||||
} else {
|
||||
err = fmt.Errorf("empty error: %w", ErrWriteTimeout)
|
||||
}
|
||||
se.dropConnection()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (se *signerEndpoint) isConnected() bool {
|
||||
return se.conn != nil
|
||||
}
|
||||
|
||||
func (se *signerEndpoint) dropConnection() {
|
||||
if se.conn != nil {
|
||||
if err := se.conn.Close(); err != nil {
|
||||
se.Logger.Error("signerEndpoint::dropConnection", "err", err)
|
||||
}
|
||||
se.conn = nil
|
||||
}
|
||||
}
|
||||
@@ -1,199 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
)
|
||||
|
||||
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
|
||||
type SignerValidatorEndpointOption func(*SignerListenerEndpoint)
|
||||
|
||||
// SignerListenerEndpoint listens for an external process to dial in
|
||||
// and keeps the connection alive by dropping and reconnecting
|
||||
type SignerListenerEndpoint struct {
|
||||
signerEndpoint
|
||||
|
||||
listener net.Listener
|
||||
connectRequestCh chan struct{}
|
||||
connectionAvailableCh chan net.Conn
|
||||
|
||||
timeoutAccept time.Duration
|
||||
pingTimer *time.Ticker
|
||||
|
||||
instanceMtx sync.Mutex // Ensures instance public methods access, i.e. SendRequest
|
||||
}
|
||||
|
||||
// NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint.
|
||||
func NewSignerListenerEndpoint(
|
||||
logger log.Logger,
|
||||
listener net.Listener,
|
||||
) *SignerListenerEndpoint {
|
||||
sc := &SignerListenerEndpoint{
|
||||
listener: listener,
|
||||
timeoutAccept: defaultTimeoutAcceptSeconds * time.Second,
|
||||
}
|
||||
|
||||
sc.BaseService = *service.NewBaseService(logger, "SignerListenerEndpoint", sc)
|
||||
sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
|
||||
return sc
|
||||
}
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (sl *SignerListenerEndpoint) OnStart() error {
|
||||
sl.connectRequestCh = make(chan struct{})
|
||||
sl.connectionAvailableCh = make(chan net.Conn)
|
||||
|
||||
sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond)
|
||||
|
||||
go sl.serviceLoop()
|
||||
go sl.pingLoop()
|
||||
|
||||
sl.connectRequestCh <- struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service
|
||||
func (sl *SignerListenerEndpoint) OnStop() {
|
||||
sl.instanceMtx.Lock()
|
||||
defer sl.instanceMtx.Unlock()
|
||||
_ = sl.Close()
|
||||
|
||||
// Stop listening
|
||||
if sl.listener != nil {
|
||||
if err := sl.listener.Close(); err != nil {
|
||||
sl.Logger.Error("Closing Listener", "err", err)
|
||||
sl.listener = nil
|
||||
}
|
||||
}
|
||||
|
||||
sl.pingTimer.Stop()
|
||||
}
|
||||
|
||||
// WaitForConnection waits maxWait for a connection or returns a timeout error
|
||||
func (sl *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
|
||||
sl.instanceMtx.Lock()
|
||||
defer sl.instanceMtx.Unlock()
|
||||
return sl.ensureConnection(maxWait)
|
||||
}
|
||||
|
||||
// SendRequest ensures there is a connection, sends a request and waits for a response
|
||||
func (sl *SignerListenerEndpoint) SendRequest(request privvalproto.Message) (*privvalproto.Message, error) {
|
||||
sl.instanceMtx.Lock()
|
||||
defer sl.instanceMtx.Unlock()
|
||||
|
||||
err := sl.ensureConnection(sl.timeoutAccept)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = sl.WriteMessage(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := sl.ReadMessage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
|
||||
if sl.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Is there a connection ready? then use it
|
||||
if sl.GetAvailableConnection(sl.connectionAvailableCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// block until connected or timeout
|
||||
sl.triggerConnect()
|
||||
err := sl.WaitConnection(sl.connectionAvailableCh, maxWait)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) {
|
||||
if !sl.IsRunning() || sl.listener == nil {
|
||||
return nil, fmt.Errorf("endpoint is closing")
|
||||
}
|
||||
|
||||
// wait for a new conn
|
||||
sl.Logger.Info("SignerListener: Listening for new connection")
|
||||
conn, err := sl.listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) triggerConnect() {
|
||||
select {
|
||||
case sl.connectRequestCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) triggerReconnect() {
|
||||
sl.DropConnection()
|
||||
sl.triggerConnect()
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) serviceLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-sl.connectRequestCh:
|
||||
{
|
||||
conn, err := sl.acceptNewConnection()
|
||||
if err == nil {
|
||||
sl.Logger.Info("SignerListener: Connected")
|
||||
|
||||
// We have a good connection, wait for someone that needs one otherwise cancellation
|
||||
select {
|
||||
case sl.connectionAvailableCh <- conn:
|
||||
case <-sl.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case sl.connectRequestCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
case <-sl.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) pingLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-sl.pingTimer.C:
|
||||
{
|
||||
_, err := sl.SendRequest(mustWrapMsg(&privvalproto.PingRequest{}))
|
||||
if err != nil {
|
||||
sl.Logger.Error("SignerListener: Ping timeout")
|
||||
sl.triggerReconnect()
|
||||
}
|
||||
}
|
||||
case <-sl.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,199 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
|
||||
|
||||
testTimeoutReadWrite = 100 * time.Millisecond
|
||||
testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one
|
||||
)
|
||||
|
||||
type dialerTestCase struct {
|
||||
addr string
|
||||
dialer SocketDialer
|
||||
}
|
||||
|
||||
// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We
|
||||
// don't need this for Unix sockets because the OS instantly knows the state of
|
||||
// both ends of the socket connection. This basically causes the
|
||||
// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.acceptNewConnection() to return
|
||||
// successfully immediately, putting an instant stop to any retry attempts.
|
||||
func TestSignerRemoteRetryTCPOnly(t *testing.T) {
|
||||
var (
|
||||
attemptCh = make(chan int)
|
||||
retries = 10
|
||||
)
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Continuously Accept connection and close {attempts} times
|
||||
go func(ln net.Listener, attemptCh chan<- int) {
|
||||
attempts := 0
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = conn.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
attempts++
|
||||
|
||||
if attempts == retries {
|
||||
attemptCh <- attempts
|
||||
break
|
||||
}
|
||||
}
|
||||
}(ln, attemptCh)
|
||||
|
||||
dialerEndpoint := NewSignerDialerEndpoint(
|
||||
log.TestingLogger(),
|
||||
DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()),
|
||||
)
|
||||
SignerDialerEndpointTimeoutReadWrite(time.Millisecond)(dialerEndpoint)
|
||||
SignerDialerEndpointConnRetries(retries)(dialerEndpoint)
|
||||
|
||||
chainID := tmrand.Str(12)
|
||||
mockPV := types.NewMockPV()
|
||||
signerServer := NewSignerServer(dialerEndpoint, chainID, mockPV)
|
||||
|
||||
err = signerServer.Start()
|
||||
require.NoError(t, err)
|
||||
defer signerServer.Stop()
|
||||
|
||||
select {
|
||||
case attempts := <-attemptCh:
|
||||
assert.Equal(t, retries, attempts)
|
||||
case <-time.After(1500 * time.Millisecond):
|
||||
t.Error("expected remote to observe connection attempts")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryConnToRemoteSigner(t *testing.T) {
|
||||
for _, tc := range getDialerTestCases(t) {
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
chainID = tmrand.Str(12)
|
||||
mockPV = types.NewMockPV()
|
||||
endpointIsOpenCh = make(chan struct{})
|
||||
thisConnTimeout = testTimeoutReadWrite
|
||||
listenerEndpoint = newSignerListenerEndpoint(logger, tc.addr, thisConnTimeout)
|
||||
)
|
||||
|
||||
dialerEndpoint := NewSignerDialerEndpoint(
|
||||
logger,
|
||||
tc.dialer,
|
||||
)
|
||||
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
|
||||
SignerDialerEndpointConnRetries(10)(dialerEndpoint)
|
||||
|
||||
signerServer := NewSignerServer(dialerEndpoint, chainID, mockPV)
|
||||
|
||||
startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)
|
||||
defer listenerEndpoint.Stop()
|
||||
|
||||
require.NoError(t, signerServer.Start())
|
||||
assert.True(t, signerServer.IsRunning())
|
||||
<-endpointIsOpenCh
|
||||
signerServer.Stop()
|
||||
|
||||
dialerEndpoint2 := NewSignerDialerEndpoint(
|
||||
logger,
|
||||
tc.dialer,
|
||||
)
|
||||
signerServer2 := NewSignerServer(dialerEndpoint2, chainID, mockPV)
|
||||
|
||||
// let some pings pass
|
||||
require.NoError(t, signerServer2.Start())
|
||||
assert.True(t, signerServer2.IsRunning())
|
||||
defer signerServer2.Stop()
|
||||
|
||||
// give the client some time to re-establish the conn to the remote signer
|
||||
// should see sth like this in the logs:
|
||||
//
|
||||
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
|
||||
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
|
||||
time.Sleep(testTimeoutReadWrite * 2)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////
|
||||
|
||||
func newSignerListenerEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerListenerEndpoint {
|
||||
proto, address := tmnet.ProtocolAndAddress(addr)
|
||||
|
||||
ln, err := net.Listen(proto, address)
|
||||
logger.Info("SignerListener: Listening", "proto", proto, "address", address)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var listener net.Listener
|
||||
|
||||
if proto == "unix" {
|
||||
unixLn := NewUnixListener(ln)
|
||||
UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn)
|
||||
UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
|
||||
listener = unixLn
|
||||
} else {
|
||||
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn)
|
||||
TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
|
||||
listener = tcpLn
|
||||
}
|
||||
|
||||
return NewSignerListenerEndpoint(logger, listener)
|
||||
}
|
||||
|
||||
func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) {
|
||||
go func(sle *SignerListenerEndpoint) {
|
||||
require.NoError(t, sle.Start())
|
||||
assert.True(t, sle.IsRunning())
|
||||
close(endpointIsOpenCh)
|
||||
}(sle)
|
||||
}
|
||||
|
||||
func getMockEndpoints(
|
||||
t *testing.T,
|
||||
addr string,
|
||||
socketDialer SocketDialer,
|
||||
) (*SignerListenerEndpoint, *SignerDialerEndpoint) {
|
||||
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
endpointIsOpenCh = make(chan struct{})
|
||||
|
||||
dialerEndpoint = NewSignerDialerEndpoint(
|
||||
logger,
|
||||
socketDialer,
|
||||
)
|
||||
|
||||
listenerEndpoint = newSignerListenerEndpoint(logger, addr, testTimeoutReadWrite)
|
||||
)
|
||||
|
||||
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
|
||||
SignerDialerEndpointConnRetries(1e6)(dialerEndpoint)
|
||||
|
||||
startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)
|
||||
|
||||
require.NoError(t, dialerEndpoint.Start())
|
||||
assert.True(t, dialerEndpoint.IsRunning())
|
||||
|
||||
<-endpointIsOpenCh
|
||||
|
||||
return listenerEndpoint, dialerEndpoint
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cryptoenc "github.com/tendermint/tendermint/crypto/encoding"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func DefaultValidationRequestHandler(
|
||||
privVal types.PrivValidator,
|
||||
req privvalproto.Message,
|
||||
chainID string,
|
||||
) (privvalproto.Message, error) {
|
||||
var (
|
||||
res privvalproto.Message
|
||||
err error
|
||||
)
|
||||
|
||||
switch r := req.Sum.(type) {
|
||||
case *privvalproto.Message_PubKeyRequest:
|
||||
var pubKey crypto.PubKey
|
||||
pubKey, err = privVal.GetPubKey()
|
||||
pk, err := cryptoenc.PubKeyToProto(pubKey)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
res = mustWrapMsg(&privvalproto.PubKeyResponse{
|
||||
PubKey: nil, Error: &privvalproto.RemoteSignerError{Code: 0, Description: err.Error()}})
|
||||
} else {
|
||||
res = mustWrapMsg(&privvalproto.PubKeyResponse{PubKey: &pk, Error: nil})
|
||||
}
|
||||
|
||||
case *privvalproto.Message_SignVoteRequest:
|
||||
vote := r.SignVoteRequest.Vote
|
||||
|
||||
err = privVal.SignVote(chainID, vote)
|
||||
if err != nil {
|
||||
res = mustWrapMsg(&privvalproto.SignedVoteResponse{
|
||||
Vote: nil, Error: &privvalproto.RemoteSignerError{Code: 0, Description: err.Error()}})
|
||||
} else {
|
||||
res = mustWrapMsg(&privvalproto.SignedVoteResponse{Vote: vote, Error: nil})
|
||||
}
|
||||
|
||||
case *privvalproto.Message_SignProposalRequest:
|
||||
proposal := r.SignProposalRequest.Proposal
|
||||
|
||||
err = privVal.SignProposal(chainID, &proposal)
|
||||
if err != nil {
|
||||
res = mustWrapMsg(&privvalproto.SignedProposalResponse{
|
||||
Proposal: nil, Error: &privvalproto.RemoteSignerError{Code: 0, Description: err.Error()}})
|
||||
} else {
|
||||
res = mustWrapMsg(&privvalproto.SignedProposalResponse{Proposal: &proposal, Error: nil})
|
||||
}
|
||||
case *privvalproto.Message_PingRequest:
|
||||
err, res = nil, mustWrapMsg(&privvalproto.PingResponse{})
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown msg: %v", r)
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
@@ -1,106 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
privvalproto "github.com/tendermint/tendermint/proto/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// ValidationRequestHandlerFunc handles different remoteSigner requests
|
||||
type ValidationRequestHandlerFunc func(
|
||||
privVal types.PrivValidator,
|
||||
requestMessage privvalproto.Message,
|
||||
chainID string) (privvalproto.Message, error)
|
||||
|
||||
type SignerServer struct {
|
||||
service.BaseService
|
||||
|
||||
endpoint *SignerDialerEndpoint
|
||||
chainID string
|
||||
privVal types.PrivValidator
|
||||
|
||||
handlerMtx sync.Mutex
|
||||
validationRequestHandler ValidationRequestHandlerFunc
|
||||
}
|
||||
|
||||
func NewSignerServer(endpoint *SignerDialerEndpoint, chainID string, privVal types.PrivValidator) *SignerServer {
|
||||
ss := &SignerServer{
|
||||
endpoint: endpoint,
|
||||
chainID: chainID,
|
||||
privVal: privVal,
|
||||
validationRequestHandler: DefaultValidationRequestHandler,
|
||||
}
|
||||
|
||||
ss.BaseService = *service.NewBaseService(endpoint.Logger, "SignerServer", ss)
|
||||
|
||||
return ss
|
||||
}
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (ss *SignerServer) OnStart() error {
|
||||
go ss.serviceLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service.
|
||||
func (ss *SignerServer) OnStop() {
|
||||
ss.endpoint.Logger.Debug("SignerServer: OnStop calling Close")
|
||||
_ = ss.endpoint.Close()
|
||||
}
|
||||
|
||||
// SetRequestHandler override the default function that is used to service requests
|
||||
func (ss *SignerServer) SetRequestHandler(validationRequestHandler ValidationRequestHandlerFunc) {
|
||||
ss.handlerMtx.Lock()
|
||||
defer ss.handlerMtx.Unlock()
|
||||
ss.validationRequestHandler = validationRequestHandler
|
||||
}
|
||||
|
||||
func (ss *SignerServer) servicePendingRequest() {
|
||||
if !ss.IsRunning() {
|
||||
return // Ignore error from closing.
|
||||
}
|
||||
|
||||
req, err := ss.endpoint.ReadMessage()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
ss.Logger.Error("SignerServer: HandleMessage", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var res privvalproto.Message
|
||||
{
|
||||
// limit the scope of the lock
|
||||
ss.handlerMtx.Lock()
|
||||
defer ss.handlerMtx.Unlock()
|
||||
res, err = ss.validationRequestHandler(ss.privVal, req, ss.chainID)
|
||||
if err != nil {
|
||||
// only log the error; we'll reply with an error in res
|
||||
ss.Logger.Error("SignerServer: handleMessage", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = ss.endpoint.WriteMessage(res)
|
||||
if err != nil {
|
||||
ss.Logger.Error("SignerServer: writeMessage", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *SignerServer) serviceLoop() {
|
||||
for {
|
||||
select {
|
||||
default:
|
||||
err := ss.endpoint.ensureConnection()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ss.servicePendingRequest()
|
||||
|
||||
case <-ss.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
p2pconn "github.com/tendermint/tendermint/p2p/conn"
|
||||
)
|
||||
|
||||
// Socket errors.
|
||||
var (
|
||||
ErrDialRetryMax = errors.New("dialed maximum retries")
|
||||
)
|
||||
|
||||
// SocketDialer dials a remote address and returns a net.Conn or an error.
|
||||
type SocketDialer func() (net.Conn, error)
|
||||
|
||||
// DialTCPFn dials the given tcp addr, using the given timeoutReadWrite and
|
||||
// privKey for the authenticated encryption handshake.
|
||||
func DialTCPFn(addr string, timeoutReadWrite time.Duration, privKey crypto.PrivKey) SocketDialer {
|
||||
return func() (net.Conn, error) {
|
||||
conn, err := tmnet.Connect(addr)
|
||||
if err == nil {
|
||||
deadline := time.Now().Add(timeoutReadWrite)
|
||||
err = conn.SetDeadline(deadline)
|
||||
}
|
||||
if err == nil {
|
||||
conn, err = p2pconn.MakeSecretConnection(conn, privKey)
|
||||
}
|
||||
return conn, err
|
||||
}
|
||||
}
|
||||
|
||||
// DialUnixFn dials the given unix socket.
|
||||
func DialUnixFn(addr string) SocketDialer {
|
||||
return func() (net.Conn, error) {
|
||||
unixAddr := &net.UnixAddr{Name: addr, Net: "unix"}
|
||||
return net.DialUnix("unix", nil, unixAddr)
|
||||
}
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
)
|
||||
|
||||
func getDialerTestCases(t *testing.T) []dialerTestCase {
|
||||
tcpAddr := GetFreeLocalhostAddrPort()
|
||||
unixFilePath, err := testUnixAddr()
|
||||
require.NoError(t, err)
|
||||
unixAddr := fmt.Sprintf("unix://%s", unixFilePath)
|
||||
|
||||
return []dialerTestCase{
|
||||
{
|
||||
addr: tcpAddr,
|
||||
dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()),
|
||||
},
|
||||
{
|
||||
addr: unixAddr,
|
||||
dialer: DialUnixFn(unixFilePath),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsConnTimeoutForFundamentalTimeouts(t *testing.T) {
|
||||
// Generate a networking timeout
|
||||
tcpAddr := GetFreeLocalhostAddrPort()
|
||||
dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey())
|
||||
_, err := dialer()
|
||||
assert.Error(t, err)
|
||||
assert.True(t, IsConnTimeout(err))
|
||||
}
|
||||
|
||||
func TestIsConnTimeoutForWrappedConnTimeouts(t *testing.T) {
|
||||
tcpAddr := GetFreeLocalhostAddrPort()
|
||||
dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey())
|
||||
_, err := dialer()
|
||||
assert.Error(t, err)
|
||||
err = fmt.Errorf("%v: %w", err, ErrConnectionTimeout)
|
||||
assert.True(t, IsConnTimeout(err))
|
||||
}
|
||||
@@ -1,191 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
p2pconn "github.com/tendermint/tendermint/p2p/conn"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTimeoutAcceptSeconds = 3
|
||||
defaultPingPeriodMilliseconds = 100
|
||||
)
|
||||
|
||||
// timeoutError can be used to check if an error returned from the netp package
|
||||
// was due to a timeout.
|
||||
type timeoutError interface {
|
||||
Timeout() bool
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------
|
||||
// TCP Listener
|
||||
|
||||
// TCPListenerOption sets an optional parameter on the tcpListener.
|
||||
type TCPListenerOption func(*TCPListener)
|
||||
|
||||
// TCPListenerTimeoutAccept sets the timeout for the listener.
|
||||
// A zero time value disables the timeout.
|
||||
func TCPListenerTimeoutAccept(timeout time.Duration) TCPListenerOption {
|
||||
return func(tl *TCPListener) { tl.timeoutAccept = timeout }
|
||||
}
|
||||
|
||||
// TCPListenerTimeoutReadWrite sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func TCPListenerTimeoutReadWrite(timeout time.Duration) TCPListenerOption {
|
||||
return func(tl *TCPListener) { tl.timeoutReadWrite = timeout }
|
||||
}
|
||||
|
||||
// tcpListener implements net.Listener.
|
||||
var _ net.Listener = (*TCPListener)(nil)
|
||||
|
||||
// TCPListener wraps a *net.TCPListener to standardise protocol timeouts
|
||||
// and potentially other tuning parameters. It also returns encrypted connections.
|
||||
type TCPListener struct {
|
||||
*net.TCPListener
|
||||
|
||||
secretConnKey ed25519.PrivKey
|
||||
|
||||
timeoutAccept time.Duration
|
||||
timeoutReadWrite time.Duration
|
||||
}
|
||||
|
||||
// NewTCPListener returns a listener that accepts authenticated encrypted connections
|
||||
// using the given secretConnKey and the default timeout values.
|
||||
func NewTCPListener(ln net.Listener, secretConnKey ed25519.PrivKey) *TCPListener {
|
||||
return &TCPListener{
|
||||
TCPListener: ln.(*net.TCPListener),
|
||||
secretConnKey: secretConnKey,
|
||||
timeoutAccept: time.Second * defaultTimeoutAcceptSeconds,
|
||||
timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
// Accept implements net.Listener.
|
||||
func (ln *TCPListener) Accept() (net.Conn, error) {
|
||||
deadline := time.Now().Add(ln.timeoutAccept)
|
||||
err := ln.SetDeadline(deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc, err := ln.AcceptTCP()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the conn in our timeout and encryption wrappers
|
||||
timeoutConn := newTimeoutConn(tc, ln.timeoutReadWrite)
|
||||
secretConn, err := p2pconn.MakeSecretConnection(timeoutConn, ln.secretConnKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return secretConn, nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------
|
||||
// Unix Listener
|
||||
|
||||
// unixListener implements net.Listener.
|
||||
var _ net.Listener = (*UnixListener)(nil)
|
||||
|
||||
type UnixListenerOption func(*UnixListener)
|
||||
|
||||
// UnixListenerTimeoutAccept sets the timeout for the listener.
|
||||
// A zero time value disables the timeout.
|
||||
func UnixListenerTimeoutAccept(timeout time.Duration) UnixListenerOption {
|
||||
return func(ul *UnixListener) { ul.timeoutAccept = timeout }
|
||||
}
|
||||
|
||||
// UnixListenerTimeoutReadWrite sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func UnixListenerTimeoutReadWrite(timeout time.Duration) UnixListenerOption {
|
||||
return func(ul *UnixListener) { ul.timeoutReadWrite = timeout }
|
||||
}
|
||||
|
||||
// UnixListener wraps a *net.UnixListener to standardise protocol timeouts
|
||||
// and potentially other tuning parameters. It returns unencrypted connections.
|
||||
type UnixListener struct {
|
||||
*net.UnixListener
|
||||
|
||||
timeoutAccept time.Duration
|
||||
timeoutReadWrite time.Duration
|
||||
}
|
||||
|
||||
// NewUnixListener returns a listener that accepts unencrypted connections
|
||||
// using the default timeout values.
|
||||
func NewUnixListener(ln net.Listener) *UnixListener {
|
||||
return &UnixListener{
|
||||
UnixListener: ln.(*net.UnixListener),
|
||||
timeoutAccept: time.Second * defaultTimeoutAcceptSeconds,
|
||||
timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
// Accept implements net.Listener.
|
||||
func (ln *UnixListener) Accept() (net.Conn, error) {
|
||||
deadline := time.Now().Add(ln.timeoutAccept)
|
||||
err := ln.SetDeadline(deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc, err := ln.AcceptUnix()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the conn in our timeout wrapper
|
||||
conn := newTimeoutConn(tc, ln.timeoutReadWrite)
|
||||
|
||||
// TODO: wrap in something that authenticates
|
||||
// with a MAC - https://github.com/tendermint/tendermint/issues/3099
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------
|
||||
// Connection
|
||||
|
||||
// timeoutConn implements net.Conn.
|
||||
var _ net.Conn = (*timeoutConn)(nil)
|
||||
|
||||
// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets.
|
||||
type timeoutConn struct {
|
||||
net.Conn
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// newTimeoutConn returns an instance of timeoutConn.
|
||||
func newTimeoutConn(conn net.Conn, timeout time.Duration) *timeoutConn {
|
||||
return &timeoutConn{
|
||||
conn,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements net.Conn.
|
||||
func (c timeoutConn) Read(b []byte) (n int, err error) {
|
||||
// Reset deadline
|
||||
deadline := time.Now().Add(c.timeout)
|
||||
err = c.Conn.SetReadDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return c.Conn.Read(b)
|
||||
}
|
||||
|
||||
// Write implements net.Conn.
|
||||
func (c timeoutConn) Write(b []byte) (n int, err error) {
|
||||
// Reset deadline
|
||||
deadline := time.Now().Add(c.timeout)
|
||||
err = c.Conn.SetWriteDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return c.Conn.Write(b)
|
||||
}
|
||||
@@ -1,137 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
)
|
||||
|
||||
//-------------------------------------------
|
||||
// helper funcs
|
||||
|
||||
func newPrivKey() ed25519.PrivKey {
|
||||
return ed25519.GenPrivKey()
|
||||
}
|
||||
|
||||
//-------------------------------------------
|
||||
// tests
|
||||
|
||||
type listenerTestCase struct {
|
||||
description string // For test reporting purposes.
|
||||
listener net.Listener
|
||||
dialer SocketDialer
|
||||
}
|
||||
|
||||
// testUnixAddr will attempt to obtain a platform-independent temporary file
|
||||
// name for a Unix socket
|
||||
func testUnixAddr() (string, error) {
|
||||
f, err := ioutil.TempFile("", "tendermint-privval-test-*")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
addr := f.Name()
|
||||
f.Close()
|
||||
os.Remove(addr)
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
func tcpListenerTestCase(t *testing.T, timeoutAccept, timeoutReadWrite time.Duration) listenerTestCase {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tcpLn := NewTCPListener(ln, newPrivKey())
|
||||
TCPListenerTimeoutAccept(timeoutAccept)(tcpLn)
|
||||
TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
|
||||
return listenerTestCase{
|
||||
description: "TCP",
|
||||
listener: tcpLn,
|
||||
dialer: DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, newPrivKey()),
|
||||
}
|
||||
}
|
||||
|
||||
func unixListenerTestCase(t *testing.T, timeoutAccept, timeoutReadWrite time.Duration) listenerTestCase {
|
||||
addr, err := testUnixAddr()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ln, err := net.Listen("unix", addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
unixLn := NewUnixListener(ln)
|
||||
UnixListenerTimeoutAccept(timeoutAccept)(unixLn)
|
||||
UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
|
||||
return listenerTestCase{
|
||||
description: "Unix",
|
||||
listener: unixLn,
|
||||
dialer: DialUnixFn(addr),
|
||||
}
|
||||
}
|
||||
|
||||
func listenerTestCases(t *testing.T, timeoutAccept, timeoutReadWrite time.Duration) []listenerTestCase {
|
||||
return []listenerTestCase{
|
||||
tcpListenerTestCase(t, timeoutAccept, timeoutReadWrite),
|
||||
unixListenerTestCase(t, timeoutAccept, timeoutReadWrite),
|
||||
}
|
||||
}
|
||||
|
||||
func TestListenerTimeoutAccept(t *testing.T) {
|
||||
for _, tc := range listenerTestCases(t, time.Millisecond, time.Second) {
|
||||
_, err := tc.listener.Accept()
|
||||
opErr, ok := err.(*net.OpError)
|
||||
if !ok {
|
||||
t.Fatalf("for %s listener, have %v, want *net.OpError", tc.description, err)
|
||||
}
|
||||
|
||||
if have, want := opErr.Op, "accept"; have != want {
|
||||
t.Errorf("for %s listener, have %v, want %v", tc.description, have, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListenerTimeoutReadWrite(t *testing.T) {
|
||||
const (
|
||||
// This needs to be long enough s.t. the Accept will definitely succeed:
|
||||
timeoutAccept = time.Second
|
||||
// This can be really short but in the TCP case, the accept can
|
||||
// also trigger a timeoutReadWrite. Hence, we need to give it some time.
|
||||
// Note: this controls how long this test actually runs.
|
||||
timeoutReadWrite = 10 * time.Millisecond
|
||||
)
|
||||
for _, tc := range listenerTestCases(t, timeoutAccept, timeoutReadWrite) {
|
||||
go func(dialer SocketDialer) {
|
||||
_, err := dialer()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}(tc.dialer)
|
||||
|
||||
c, err := tc.listener.Accept()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// this will timeout because we don't write anything:
|
||||
msg := make([]byte, 200)
|
||||
_, err = c.Read(msg)
|
||||
opErr, ok := err.(*net.OpError)
|
||||
if !ok {
|
||||
t.Fatalf("for %s listener, have %v, want *net.OpError", tc.description, err)
|
||||
}
|
||||
|
||||
if have, want := opErr.Op, "read"; have != want {
|
||||
t.Errorf("for %s listener, have %v, want %v", tc.description, have, want)
|
||||
}
|
||||
|
||||
if !opErr.Timeout() {
|
||||
t.Errorf("for %s listener, got unexpected error: have %v, want Timeout error", tc.description, opErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
)
|
||||
|
||||
// IsConnTimeout returns a boolean indicating whether the error is known to
|
||||
// report that a connection timeout occurred. This detects both fundamental
|
||||
// network timeouts, as well as ErrConnTimeout errors.
|
||||
func IsConnTimeout(err error) bool {
|
||||
_, ok := errors.Unwrap(err).(timeoutError)
|
||||
switch {
|
||||
case errors.As(err, &EndpointTimeoutError{}):
|
||||
return true
|
||||
case ok:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// NewSignerListener creates a new SignerListenerEndpoint using the corresponding listen address
|
||||
func NewSignerListener(listenAddr string, logger log.Logger) (*SignerListenerEndpoint, error) {
|
||||
var listener net.Listener
|
||||
|
||||
protocol, address := tmnet.ProtocolAndAddress(listenAddr)
|
||||
ln, err := net.Listen(protocol, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch protocol {
|
||||
case "unix":
|
||||
listener = NewUnixListener(ln)
|
||||
case "tcp":
|
||||
// TODO: persist this key so external signer can actually authenticate us
|
||||
listener = NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
|
||||
protocol,
|
||||
)
|
||||
}
|
||||
|
||||
pve := NewSignerListenerEndpoint(logger.With("module", "privval"), listener)
|
||||
|
||||
return pve, nil
|
||||
}
|
||||
|
||||
// GetFreeLocalhostAddrPort returns a free localhost:port address
|
||||
func GetFreeLocalhostAddrPort() string {
|
||||
port, err := tmnet.GetFreePort()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return fmt.Sprintf("127.0.0.1:%d", port)
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIsConnTimeoutForNonTimeoutErrors(t *testing.T) {
|
||||
assert.False(t, IsConnTimeout(fmt.Errorf("max retries exceeded: %w", ErrDialRetryMax)))
|
||||
assert.False(t, IsConnTimeout(errors.New("completely irrelevant error")))
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -3,52 +3,39 @@ package tendermint.proto.privval;
|
||||
|
||||
option go_package = "github.com/tendermint/tendermint/proto/privval";
|
||||
|
||||
import "third_party/proto/gogoproto/gogo.proto";
|
||||
import "proto/crypto/keys/types.proto";
|
||||
import "proto/types/types.proto";
|
||||
|
||||
message RemoteSignerError {
|
||||
int32 code = 1;
|
||||
string description = 2;
|
||||
}
|
||||
|
||||
// PubKeyRequest requests the consensus public key from the remote signer.
|
||||
message PubKeyRequest {}
|
||||
|
||||
// PubKeyResponse is a response message containing the public key.
|
||||
message PubKeyResponse {
|
||||
tendermint.proto.crypto.keys.PublicKey pub_key = 1;
|
||||
RemoteSignerError error = 2;
|
||||
}
|
||||
|
||||
// SignVoteRequest is a request to sign a vote
|
||||
message SignVoteRequest {
|
||||
tendermint.proto.types.Vote vote = 1;
|
||||
string chain_id = 1;
|
||||
tendermint.proto.types.Vote vote = 2;
|
||||
}
|
||||
|
||||
// SignedVoteResponse is a response containing a signed vote or an error
|
||||
message SignedVoteResponse {
|
||||
tendermint.proto.types.Vote vote = 1;
|
||||
RemoteSignerError error = 2;
|
||||
tendermint.proto.types.Vote vote = 1;
|
||||
}
|
||||
|
||||
// SignProposalRequest is a request to sign a proposal
|
||||
message SignProposalRequest {
|
||||
tendermint.proto.types.Proposal proposal = 1 [(gogoproto.nullable) = false];
|
||||
string chain_id = 1;
|
||||
tendermint.proto.types.Proposal proposal = 2;
|
||||
}
|
||||
|
||||
// SignedProposalResponse is response containing a signed proposal or an error
|
||||
message SignedProposalResponse {
|
||||
tendermint.proto.types.Proposal proposal = 1;
|
||||
RemoteSignerError error = 2;
|
||||
}
|
||||
|
||||
// PingRequest is a request to confirm that the connection is alive.
|
||||
message PingRequest {}
|
||||
|
||||
// PingResponse is a response to confirm that the connection is alive.
|
||||
message PingResponse {}
|
||||
|
||||
message Message {
|
||||
oneof sum {
|
||||
PubKeyRequest pub_key_request = 1;
|
||||
@@ -57,7 +44,5 @@ message Message {
|
||||
SignedVoteResponse signed_vote_response = 4;
|
||||
SignProposalRequest sign_proposal_request = 5;
|
||||
SignedProposalResponse signed_proposal_response = 6;
|
||||
PingRequest ping_request = 7;
|
||||
PingResponse ping_response = 8;
|
||||
}
|
||||
}
|
||||
|
||||
199
proto/privval/service.pb.go
Normal file
199
proto/privval/service.pb.go
Normal file
@@ -0,0 +1,199 @@
|
||||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: proto/privval/service.proto
|
||||
|
||||
package privval
|
||||
|
||||
import (
|
||||
context "context"
|
||||
fmt "fmt"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
math "math"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
func init() { proto.RegisterFile("proto/privval/service.proto", fileDescriptor_1e25bbbdec23c036) }
|
||||
|
||||
var fileDescriptor_1e25bbbdec23c036 = []byte{
|
||||
// 247 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2e, 0x28, 0xca, 0x2f,
|
||||
0xc9, 0xd7, 0x2f, 0x28, 0xca, 0x2c, 0x2b, 0x4b, 0xcc, 0xd1, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c,
|
||||
0x4e, 0xd5, 0x03, 0x8b, 0x0a, 0x49, 0x94, 0xa4, 0xe6, 0xa5, 0xa4, 0x16, 0xe5, 0x66, 0xe6, 0x95,
|
||||
0x40, 0x44, 0xf4, 0xa0, 0xea, 0xa4, 0x24, 0x50, 0xb5, 0xe5, 0x16, 0xa7, 0x17, 0x43, 0x54, 0x18,
|
||||
0x9d, 0x60, 0xe2, 0x12, 0x08, 0x28, 0xca, 0x2c, 0x0b, 0x4b, 0xcc, 0xc9, 0x4c, 0x49, 0x2c, 0xc9,
|
||||
0x2f, 0x72, 0x0c, 0xf0, 0x14, 0x8a, 0xe3, 0xe2, 0x74, 0x4f, 0x2d, 0x09, 0x28, 0x4d, 0xf2, 0x4e,
|
||||
0xad, 0x14, 0x52, 0xd7, 0xc3, 0x65, 0xac, 0x1e, 0x44, 0x45, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71,
|
||||
0x89, 0x94, 0x06, 0x61, 0x85, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, 0x42, 0xc9, 0x5c, 0x1c, 0xc1,
|
||||
0x99, 0xe9, 0x79, 0x61, 0xf9, 0x25, 0xa9, 0x42, 0x9a, 0xb8, 0x75, 0xc1, 0xd4, 0xc0, 0x2c, 0xd0,
|
||||
0xc1, 0xaf, 0x34, 0x35, 0x05, 0xa2, 0x18, 0x6a, 0x49, 0x3e, 0x17, 0x0f, 0x48, 0x34, 0xa0, 0x28,
|
||||
0xbf, 0x20, 0xbf, 0x38, 0x31, 0x47, 0x48, 0x17, 0xbf, 0x6e, 0x98, 0x3a, 0x98, 0x65, 0x06, 0x84,
|
||||
0x2c, 0x43, 0x68, 0x80, 0x58, 0xe8, 0xe4, 0x71, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c,
|
||||
0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72,
|
||||
0x0c, 0x51, 0x7a, 0xe9, 0x99, 0x25, 0x19, 0xa5, 0x49, 0x7a, 0xc9, 0xf9, 0xb9, 0xfa, 0x08, 0x53,
|
||||
0x91, 0x99, 0x28, 0xf1, 0x93, 0xc4, 0x06, 0xe6, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x9a,
|
||||
0xcb, 0x00, 0x84, 0xee, 0x01, 0x00, 0x00,
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ context.Context
|
||||
var _ grpc.ClientConn
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
const _ = grpc.SupportPackageIsVersion4
|
||||
|
||||
// PrivValidatorAPIClient is the client API for PrivValidatorAPI service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
|
||||
type PrivValidatorAPIClient interface {
|
||||
GetPubKey(ctx context.Context, in *PubKeyRequest, opts ...grpc.CallOption) (*PubKeyResponse, error)
|
||||
SignVote(ctx context.Context, in *SignVoteRequest, opts ...grpc.CallOption) (*SignedVoteResponse, error)
|
||||
SignProposal(ctx context.Context, in *SignProposalRequest, opts ...grpc.CallOption) (*SignedProposalResponse, error)
|
||||
}
|
||||
|
||||
type privValidatorAPIClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewPrivValidatorAPIClient(cc *grpc.ClientConn) PrivValidatorAPIClient {
|
||||
return &privValidatorAPIClient{cc}
|
||||
}
|
||||
|
||||
func (c *privValidatorAPIClient) GetPubKey(ctx context.Context, in *PubKeyRequest, opts ...grpc.CallOption) (*PubKeyResponse, error) {
|
||||
out := new(PubKeyResponse)
|
||||
err := c.cc.Invoke(ctx, "/tendermint.proto.privval.PrivValidatorAPI/GetPubKey", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *privValidatorAPIClient) SignVote(ctx context.Context, in *SignVoteRequest, opts ...grpc.CallOption) (*SignedVoteResponse, error) {
|
||||
out := new(SignedVoteResponse)
|
||||
err := c.cc.Invoke(ctx, "/tendermint.proto.privval.PrivValidatorAPI/SignVote", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *privValidatorAPIClient) SignProposal(ctx context.Context, in *SignProposalRequest, opts ...grpc.CallOption) (*SignedProposalResponse, error) {
|
||||
out := new(SignedProposalResponse)
|
||||
err := c.cc.Invoke(ctx, "/tendermint.proto.privval.PrivValidatorAPI/SignProposal", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// PrivValidatorAPIServer is the server API for PrivValidatorAPI service.
|
||||
type PrivValidatorAPIServer interface {
|
||||
GetPubKey(context.Context, *PubKeyRequest) (*PubKeyResponse, error)
|
||||
SignVote(context.Context, *SignVoteRequest) (*SignedVoteResponse, error)
|
||||
SignProposal(context.Context, *SignProposalRequest) (*SignedProposalResponse, error)
|
||||
}
|
||||
|
||||
// UnimplementedPrivValidatorAPIServer can be embedded to have forward compatible implementations.
|
||||
type UnimplementedPrivValidatorAPIServer struct {
|
||||
}
|
||||
|
||||
func (*UnimplementedPrivValidatorAPIServer) GetPubKey(ctx context.Context, req *PubKeyRequest) (*PubKeyResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetPubKey not implemented")
|
||||
}
|
||||
func (*UnimplementedPrivValidatorAPIServer) SignVote(ctx context.Context, req *SignVoteRequest) (*SignedVoteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SignVote not implemented")
|
||||
}
|
||||
func (*UnimplementedPrivValidatorAPIServer) SignProposal(ctx context.Context, req *SignProposalRequest) (*SignedProposalResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SignProposal not implemented")
|
||||
}
|
||||
|
||||
func RegisterPrivValidatorAPIServer(s *grpc.Server, srv PrivValidatorAPIServer) {
|
||||
s.RegisterService(&_PrivValidatorAPI_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _PrivValidatorAPI_GetPubKey_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(PubKeyRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(PrivValidatorAPIServer).GetPubKey(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/tendermint.proto.privval.PrivValidatorAPI/GetPubKey",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(PrivValidatorAPIServer).GetPubKey(ctx, req.(*PubKeyRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _PrivValidatorAPI_SignVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SignVoteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(PrivValidatorAPIServer).SignVote(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/tendermint.proto.privval.PrivValidatorAPI/SignVote",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(PrivValidatorAPIServer).SignVote(ctx, req.(*SignVoteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _PrivValidatorAPI_SignProposal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SignProposalRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(PrivValidatorAPIServer).SignProposal(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/tendermint.proto.privval.PrivValidatorAPI/SignProposal",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(PrivValidatorAPIServer).SignProposal(ctx, req.(*SignProposalRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
var _PrivValidatorAPI_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "tendermint.proto.privval.PrivValidatorAPI",
|
||||
HandlerType: (*PrivValidatorAPIServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "GetPubKey",
|
||||
Handler: _PrivValidatorAPI_GetPubKey_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "SignVote",
|
||||
Handler: _PrivValidatorAPI_SignVote_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "SignProposal",
|
||||
Handler: _PrivValidatorAPI_SignProposal_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "proto/privval/service.proto",
|
||||
}
|
||||
14
proto/privval/service.proto
Normal file
14
proto/privval/service.proto
Normal file
@@ -0,0 +1,14 @@
|
||||
syntax = "proto3";
|
||||
package tendermint.proto.privval;
|
||||
option go_package = "github.com/tendermint/tendermint/proto/privval";
|
||||
|
||||
import "proto/privval/msgs.proto";
|
||||
|
||||
//----------------------------------------
|
||||
// Service Definition
|
||||
|
||||
service PrivValidatorAPI {
|
||||
rpc GetPubKey(tendermint.proto.privval.PubKeyRequest) returns (tendermint.proto.privval.PubKeyResponse);
|
||||
rpc SignVote(tendermint.proto.privval.SignVoteRequest) returns (tendermint.proto.privval.SignedVoteResponse);
|
||||
rpc SignProposal(tendermint.proto.privval.SignProposalRequest) returns (tendermint.proto.privval.SignedProposalResponse);
|
||||
}
|
||||
@@ -3,20 +3,18 @@ package internal
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
tmos "github.com/tendermint/tendermint/libs/os"
|
||||
tmproto "github.com/tendermint/tendermint/proto/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -104,12 +102,7 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err
|
||||
}
|
||||
logger.Info("Loaded genesis file", "chainID", st.ChainID)
|
||||
|
||||
spv, err := newTestHarnessListener(logger, cfg)
|
||||
if err != nil {
|
||||
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
|
||||
}
|
||||
|
||||
signerClient, err := privval.NewSignerClient(spv)
|
||||
signerClient, err := privval.NewSignerClient(cfg.BindAddr, []grpc.DialOption{}, logger)
|
||||
if err != nil {
|
||||
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
|
||||
}
|
||||
@@ -147,22 +140,22 @@ func (th *TestHarness) Run() {
|
||||
for acceptRetries := th.acceptRetries; acceptRetries > 0; acceptRetries-- {
|
||||
th.logger.Info("Attempting to accept incoming connection", "acceptRetries", acceptRetries)
|
||||
|
||||
if err := th.signerClient.WaitForConnection(10 * time.Millisecond); err != nil {
|
||||
// if it wasn't a timeout error
|
||||
if _, ok := err.(timeoutError); !ok {
|
||||
th.logger.Error("Failed to start listener", "err", err)
|
||||
th.Shutdown(newTestHarnessError(ErrFailedToStartListener, err, ""))
|
||||
// we need the return statements in case this is being run
|
||||
// from a unit test - otherwise this function will just die
|
||||
// when os.Exit is called
|
||||
return
|
||||
}
|
||||
startErr = err
|
||||
} else {
|
||||
th.logger.Info("Accepted external connection")
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
// if err := th.signerClient.WaitForConnection(10 * time.Millisecond); err != nil {
|
||||
// // if it wasn't a timeout error
|
||||
// if _, ok := err.(timeoutError); !ok {
|
||||
// th.logger.Error("Failed to start listener", "err", err)
|
||||
// th.Shutdown(newTestHarnessError(ErrFailedToStartListener, err, ""))
|
||||
// // we need the return statements in case this is being run
|
||||
// // from a unit test - otherwise this function will just die
|
||||
// // when os.Exit is called
|
||||
// return
|
||||
// }
|
||||
// startErr = err
|
||||
// } else {
|
||||
th.logger.Info("Accepted external connection")
|
||||
accepted = true
|
||||
break
|
||||
// }
|
||||
}
|
||||
if !accepted {
|
||||
th.logger.Error("Maximum accept retries reached", "acceptRetries", th.acceptRetries)
|
||||
@@ -343,43 +336,6 @@ func (th *TestHarness) Shutdown(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// newTestHarnessListener creates our client instance which we will use for testing.
|
||||
func newTestHarnessListener(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) {
|
||||
proto, addr := tmnet.ProtocolAndAddress(cfg.BindAddr)
|
||||
if proto == "unix" {
|
||||
// make sure the socket doesn't exist - if so, try to delete it
|
||||
if tmos.FileExists(addr) {
|
||||
if err := os.Remove(addr); err != nil {
|
||||
logger.Error("Failed to remove existing Unix domain socket", "addr", addr)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
ln, err := net.Listen(proto, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("Listening", "proto", proto, "addr", addr)
|
||||
var svln net.Listener
|
||||
switch proto {
|
||||
case "unix":
|
||||
unixLn := privval.NewUnixListener(ln)
|
||||
privval.UnixListenerTimeoutAccept(cfg.AcceptDeadline)(unixLn)
|
||||
privval.UnixListenerTimeoutReadWrite(cfg.ConnDeadline)(unixLn)
|
||||
svln = unixLn
|
||||
case "tcp":
|
||||
tcpLn := privval.NewTCPListener(ln, cfg.SecretConnKey)
|
||||
privval.TCPListenerTimeoutAccept(cfg.AcceptDeadline)(tcpLn)
|
||||
privval.TCPListenerTimeoutReadWrite(cfg.ConnDeadline)(tcpLn)
|
||||
logger.Info("Resolved TCP address for listener", "addr", tcpLn.Addr())
|
||||
svln = tcpLn
|
||||
default:
|
||||
logger.Error("Unsupported protocol (must be unix:// or tcp://)", "proto", proto)
|
||||
return nil, newTestHarnessError(ErrInvalidParameters, nil, fmt.Sprintf("Unsupported protocol: %s", proto))
|
||||
}
|
||||
return privval.NewSignerListenerEndpoint(logger, svln), nil
|
||||
}
|
||||
|
||||
func newTestHarnessError(code int, err error, info string) *TestHarnessError {
|
||||
return &TestHarnessError{
|
||||
Code: code,
|
||||
|
||||
@@ -9,10 +9,12 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -132,17 +134,7 @@ func newMockSignerServer(
|
||||
breakVoteSigning bool,
|
||||
) *privval.SignerServer {
|
||||
mockPV := types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning)
|
||||
|
||||
dialerEndpoint := privval.NewSignerDialerEndpoint(
|
||||
th.logger,
|
||||
privval.DialTCPFn(
|
||||
th.addr,
|
||||
time.Duration(defaultConnDeadline)*time.Millisecond,
|
||||
ed25519.GenPrivKey(),
|
||||
),
|
||||
)
|
||||
|
||||
return privval.NewSignerServer(dialerEndpoint, th.chainID, mockPV)
|
||||
return privval.NewSignerServer(th.addr, th.chainID, mockPV, th.logger, []grpc.ServerOption{})
|
||||
}
|
||||
|
||||
// For running relatively standard tests.
|
||||
@@ -169,7 +161,7 @@ func harnessTest(t *testing.T, signerServerMaker func(th *TestHarness) *privval.
|
||||
|
||||
func makeConfig(t *testing.T, acceptDeadline, acceptRetries int) TestHarnessConfig {
|
||||
return TestHarnessConfig{
|
||||
BindAddr: privval.GetFreeLocalhostAddrPort(),
|
||||
BindAddr: GetFreeLocalhostAddrPort(),
|
||||
KeyFile: makeTempFile("tm-testharness-keyfile", keyFileContents),
|
||||
StateFile: makeTempFile("tm-testharness-statefile", stateFileContents),
|
||||
GenesisFile: makeTempFile("tm-testharness-genesisfile", genesisFileContents),
|
||||
@@ -201,3 +193,12 @@ func makeTempFile(name, content string) string {
|
||||
}
|
||||
return tempFile.Name()
|
||||
}
|
||||
|
||||
// GetFreeLocalhostAddrPort returns a free localhost:port address
|
||||
func GetFreeLocalhostAddrPort() string {
|
||||
port, err := tmnet.GetFreePort()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return fmt.Sprintf("127.0.0.1:%d", port)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user