mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-31 19:36:20 +00:00
Merge pull request #56 from tendermint/develop
Merge develop into master
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1,5 +1,6 @@
|
||||
*/vendor
|
||||
*/.glide
|
||||
.terraform
|
||||
terraform.tfstate
|
||||
terraform.tfstate.backup
|
||||
terraform.tfstate.d
|
||||
|
||||
terraform.tfstate.d
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM alpine:3.5
|
||||
FROM alpine:3.6
|
||||
|
||||
WORKDIR /app
|
||||
COPY tm-bench /app/tm-bench
|
||||
|
||||
@@ -7,6 +7,6 @@ COPY Makefile /go/src/github.com/tendermint/tools/tm-bench/
|
||||
COPY glide.yaml /go/src/github.com/tendermint/tools/tm-bench/
|
||||
COPY glide.lock /go/src/github.com/tendermint/tools/tm-bench/
|
||||
|
||||
RUN make get_deps
|
||||
RUN make get_vendor_deps
|
||||
|
||||
COPY . /go/src/github.com/tendermint/tools/tm-bench
|
||||
|
||||
@@ -7,7 +7,7 @@ GOTOOLS = \
|
||||
tools:
|
||||
go get -v $(GOTOOLS)
|
||||
|
||||
get_deps: tools
|
||||
get_vendor_deps: tools
|
||||
glide install
|
||||
|
||||
build:
|
||||
@@ -44,4 +44,4 @@ clean:
|
||||
rm -f ./tm-bench
|
||||
rm -rf ./dist
|
||||
|
||||
.PHONY: tools get_deps build install test build-all dist clean build-docker
|
||||
.PHONY: tools get_vendor_deps build install test build-all dist clean build-docker
|
||||
|
||||
136
tm-bench/glide.lock
generated
136
tm-bench/glide.lock
generated
@@ -1,113 +1,84 @@
|
||||
hash: 795aa94747f3d877df3ea1ec134e9a34e1c46713dd6eb59b6fdd6a33cb698234
|
||||
updated: 2017-04-20T19:19:22.26004087-04:00
|
||||
hash: 765fd22d79f7d7123197548b3228ebf56f72be9541b64b04cde875f2d09214f8
|
||||
updated: 2017-10-06T07:40:33.279710782Z
|
||||
imports:
|
||||
- name: github.com/btcsuite/btcd
|
||||
version: 583684b21bfbde9b5fc4403916fd7c807feb0289
|
||||
version: 4803a8291c92a1d2d41041b942a9a9e37deab065
|
||||
subpackages:
|
||||
- btcec
|
||||
- name: github.com/BurntSushi/toml
|
||||
version: 99064174e013895bbd9b025c31100bd1d9b590ca
|
||||
- name: github.com/go-kit/kit
|
||||
version: b6f30a2e0632f5722fb26d8765d726335b79d3e6
|
||||
version: 4dc7be5d2d12881735283bcab7352178e190fc71
|
||||
subpackages:
|
||||
- log
|
||||
- log/level
|
||||
- log/term
|
||||
- term
|
||||
- name: github.com/go-logfmt/logfmt
|
||||
version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
|
||||
- name: github.com/go-playground/locales
|
||||
version: 1e5f1161c6416a5ff48840eb8724a394e48cc534
|
||||
subpackages:
|
||||
- currency
|
||||
- name: github.com/go-playground/universal-translator
|
||||
version: 71201497bace774495daed26a3874fd339e0b538
|
||||
- name: github.com/go-stack/stack
|
||||
version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82
|
||||
version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf
|
||||
- name: github.com/golang/protobuf
|
||||
version: 69b215d01a5606c843240eab4937eab3acee6530
|
||||
version: 130e6b02ab059e7b717a096f397c5b60111cae74
|
||||
subpackages:
|
||||
- proto
|
||||
- name: github.com/golang/snappy
|
||||
version: 553a641470496b2327abcac10b36396bd98e45c9
|
||||
- ptypes
|
||||
- ptypes/any
|
||||
- ptypes/duration
|
||||
- ptypes/timestamp
|
||||
- name: github.com/gorilla/websocket
|
||||
version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13
|
||||
- name: github.com/jmhodges/levigo
|
||||
version: c42d9e0ca023e2198120196f842701bb4c55d7b9
|
||||
version: 4201258b820c74ac8e6922fc9e6b52f71fe46f8d
|
||||
- name: github.com/kr/logfmt
|
||||
version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0
|
||||
- name: github.com/mattn/go-colorable
|
||||
version: d898aa9fb31c91f35dd28ca75db377eff023c076
|
||||
- name: github.com/mattn/go-isatty
|
||||
version: dda3de49cbfcec471bd7a70e6cc01fcc3ff90109
|
||||
- name: github.com/pkg/errors
|
||||
version: bfd5150e4e41705ded2129ec33379de1cb90b513
|
||||
version: 2b3a18b5f0fb6b4f9190549597d3f962c02bc5eb
|
||||
- name: github.com/rcrowley/go-metrics
|
||||
version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c
|
||||
- name: github.com/syndtr/goleveldb
|
||||
version: 3c5717caf1475fd25964109a0fc640bd150fce43
|
||||
subpackages:
|
||||
- leveldb
|
||||
- leveldb/cache
|
||||
- leveldb/comparer
|
||||
- leveldb/errors
|
||||
- leveldb/filter
|
||||
- leveldb/iterator
|
||||
- leveldb/journal
|
||||
- leveldb/memdb
|
||||
- leveldb/opt
|
||||
- leveldb/storage
|
||||
- leveldb/table
|
||||
- leveldb/util
|
||||
- name: github.com/tendermint/abci
|
||||
version: 56e13d87f4e3ec1ea756957d6b23caa6ebcf0998
|
||||
version: 191c4b6d176169ffc7f9972d490fa362a3b7d940
|
||||
subpackages:
|
||||
- client
|
||||
- example/dummy
|
||||
- types
|
||||
- name: github.com/tendermint/ed25519
|
||||
version: 1f52c6f8b8a5c7908aff4497c186af344b428925
|
||||
subpackages:
|
||||
- edwards25519
|
||||
- extra25519
|
||||
- name: github.com/tendermint/go-common
|
||||
version: f9e3db037330c8a8d61d3966de8473eaf01154fa
|
||||
- name: github.com/tendermint/go-config
|
||||
version: 620dcbbd7d587cf3599dedbf329b64311b0c307a
|
||||
- name: github.com/tendermint/go-crypto
|
||||
version: 0ca2c6fdb0706001ca4c4b9b80c9f428e8cf39da
|
||||
- name: github.com/tendermint/go-data
|
||||
version: e7fcc6d081ec8518912fcdc103188275f83a3ee5
|
||||
- name: github.com/tendermint/go-db
|
||||
version: 9643f60bc2578693844aacf380a7c32e4c029fee
|
||||
- name: github.com/tendermint/go-events
|
||||
version: fddee66d90305fccb6f6d84d16c34fa65ea5b7f6
|
||||
- name: github.com/tendermint/go-flowrate
|
||||
version: a20c98e61957faa93b4014fbd902f20ab9317a6a
|
||||
subpackages:
|
||||
- flowrate
|
||||
- name: github.com/tendermint/go-logger
|
||||
version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2
|
||||
- name: github.com/tendermint/go-merkle
|
||||
version: 714d4d04557fd068a7c2a1748241ce8428015a96
|
||||
- name: github.com/tendermint/go-p2p
|
||||
version: 17124989a93774833df33107fbf17157a7f8ef31
|
||||
subpackages:
|
||||
- upnp
|
||||
- name: github.com/tendermint/go-rpc
|
||||
version: 1a42f946dc6bcd88f9f58c7f2fb86f785584d793
|
||||
subpackages:
|
||||
- client
|
||||
- types
|
||||
version: 311e8c1bf00fa5868daad4f8ea56dcad539182c0
|
||||
- name: github.com/tendermint/go-wire
|
||||
version: 2f3b7aafe21c80b19b6ee3210ecb3e3d07c7a471
|
||||
- name: github.com/tendermint/log15
|
||||
version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6
|
||||
version: 5f88da3dbc1a72844e6dfaf274ce87f851d488eb
|
||||
subpackages:
|
||||
- term
|
||||
- data
|
||||
- name: github.com/tendermint/tendermint
|
||||
version: 083fe959e25421fca3d41298d9111167a3b47122
|
||||
version: 7682ad9a60162dd17fd6f61aeed7049a8635ac78
|
||||
subpackages:
|
||||
- config
|
||||
- p2p
|
||||
- p2p/upnp
|
||||
- rpc/core/types
|
||||
- rpc/lib/client
|
||||
- rpc/lib/types
|
||||
- types
|
||||
- name: github.com/tendermint/tmlibs
|
||||
version: 096dcb90e60aa00b748b3fe49a4b95e48ebf1e13
|
||||
subpackages:
|
||||
- common
|
||||
- events
|
||||
- flowrate
|
||||
- log
|
||||
- merkle
|
||||
- name: github.com/tendermint/tools
|
||||
version: 12ce526668e384100afd32686ec7db3749423d51
|
||||
version: 9708c66576d3e7d4fd0a5cdec7d951f1ef002efc
|
||||
subpackages:
|
||||
- tm-monitor/eventmeter
|
||||
- tm-monitor/monitor
|
||||
- name: golang.org/x/crypto
|
||||
version: 453249f01cfeb54c3d549ddb75ff152ca243f9d8
|
||||
version: 9419663f5a44be8b34ca85f08abc5fe1be11f8a3
|
||||
subpackages:
|
||||
- curve25519
|
||||
- nacl/box
|
||||
@@ -118,7 +89,7 @@ imports:
|
||||
- ripemd160
|
||||
- salsa20/salsa
|
||||
- name: golang.org/x/net
|
||||
version: 906cda9512f77671ab44f8c8563b13a8e707b230
|
||||
version: a04bdaca5b32abe1c069418fb7088ae607de5bd0
|
||||
subpackages:
|
||||
- context
|
||||
- http2
|
||||
@@ -127,21 +98,36 @@ imports:
|
||||
- internal/timeseries
|
||||
- lex/httplex
|
||||
- trace
|
||||
- name: golang.org/x/sys
|
||||
version: 76cc09b634294339fa19ec41b5f2a0b3932cea8b
|
||||
- name: golang.org/x/text
|
||||
version: d82c1812e304abfeeabd31e995a115a2855bf642
|
||||
subpackages:
|
||||
- unix
|
||||
- secure/bidirule
|
||||
- transform
|
||||
- unicode/bidi
|
||||
- unicode/norm
|
||||
- name: google.golang.org/genproto
|
||||
version: f676e0f3ac6395ff1a529ae59a6670878a8371a6
|
||||
subpackages:
|
||||
- googleapis/rpc/status
|
||||
- name: google.golang.org/grpc
|
||||
version: 8b2e129857480cb0f07ef7d9d10b8b252c7ac984
|
||||
version: 5279edf262dc22329b1e53281ce9d55c0a998216
|
||||
subpackages:
|
||||
- balancer
|
||||
- codes
|
||||
- connectivity
|
||||
- credentials
|
||||
- grpclb/grpc_lb_v1/messages
|
||||
- grpclog
|
||||
- internal
|
||||
- keepalive
|
||||
- metadata
|
||||
- naming
|
||||
- peer
|
||||
- resolver
|
||||
- stats
|
||||
- status
|
||||
- tap
|
||||
- transport
|
||||
- name: gopkg.in/go-playground/validator.v9
|
||||
version: a021b2ec9a8a8bb970f3f15bc42617cb520e8a64
|
||||
testImports: []
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
package: github.com/tendermint/tools/tm-bench
|
||||
import:
|
||||
- package: github.com/pkg/errors
|
||||
- package: github.com/tendermint/go-rpc
|
||||
version: develop
|
||||
subpackages:
|
||||
- client
|
||||
- types
|
||||
- package: github.com/tendermint/tools
|
||||
version: develop
|
||||
subpackages:
|
||||
- tm-monitor/monitor
|
||||
- package: github.com/go-kit/kit
|
||||
subpackages:
|
||||
- log/term
|
||||
- package: github.com/gorilla/websocket
|
||||
- package: github.com/pkg/errors
|
||||
- package: github.com/rcrowley/go-metrics
|
||||
- package: github.com/tendermint/tendermint
|
||||
version: v0.11.0
|
||||
subpackages:
|
||||
- rpc/lib/types
|
||||
- types
|
||||
- package: github.com/tendermint/tmlibs
|
||||
subpackages:
|
||||
- log
|
||||
- term
|
||||
- package: github.com/tendermint/tools
|
||||
version: 9708c66576d3e7d4fd0a5cdec7d951f1ef002efc
|
||||
subpackages:
|
||||
- tm-monitor/monitor
|
||||
|
||||
@@ -8,14 +8,15 @@ import (
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/term"
|
||||
metrics "github.com/rcrowley/go-metrics"
|
||||
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
"github.com/tendermint/tools/tm-monitor/monitor"
|
||||
)
|
||||
|
||||
var version = "0.1.0"
|
||||
var version = "0.2.0"
|
||||
|
||||
var logger = log.NewNopLogger()
|
||||
|
||||
@@ -63,7 +64,7 @@ Examples:
|
||||
}
|
||||
return term.FgBgColor{}
|
||||
}
|
||||
logger = term.NewLogger(os.Stdout, log.NewLogfmtLogger, colorFn)
|
||||
logger = log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn)
|
||||
}
|
||||
|
||||
fmt.Printf("Running %ds test @ %s\n", duration, flag.Arg(0))
|
||||
@@ -123,7 +124,7 @@ func startNodes(endpoints []string, blockCh chan<- tmtypes.Header, blockLatencyC
|
||||
|
||||
for i, e := range endpoints {
|
||||
n := monitor.NewNode(e)
|
||||
n.SetLogger(log.With(logger, "node", e))
|
||||
n.SetLogger(logger.With("node", e))
|
||||
n.SendBlocksTo(blockCh)
|
||||
n.SendBlockLatenciesTo(blockLatencyCh)
|
||||
if err := n.Start(); err != nil {
|
||||
|
||||
@@ -1,27 +1,33 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
rpctypes "github.com/tendermint/go-rpc/types"
|
||||
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
const (
|
||||
sendTimeout = 500 * time.Millisecond
|
||||
sendTimeout = 10 * time.Second
|
||||
// see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
|
||||
pingPeriod = (30 * 9 / 10) * time.Second
|
||||
|
||||
// the size of a transaction in bytes.
|
||||
txSize = 250
|
||||
)
|
||||
|
||||
type transacter struct {
|
||||
@@ -56,6 +62,8 @@ func (t *transacter) SetLogger(l log.Logger) {
|
||||
func (t *transacter) Start() error {
|
||||
t.stopped = false
|
||||
|
||||
rand.Seed(time.Now().Unix())
|
||||
|
||||
for i := 0; i < t.Connections; i++ {
|
||||
c, _, err := connect(t.Target)
|
||||
if err != nil {
|
||||
@@ -90,8 +98,8 @@ func (t *transacter) receiveLoop(connIndex int) {
|
||||
for {
|
||||
_, _, err := c.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
|
||||
t.logger.Log("err", errors.Wrap(err, "failed to read response"))
|
||||
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
t.logger.Error("failed to read response", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -104,7 +112,18 @@ func (t *transacter) receiveLoop(connIndex int) {
|
||||
// sendLoop generates transactions at a given rate.
|
||||
func (t *transacter) sendLoop(connIndex int) {
|
||||
c := t.conns[connIndex]
|
||||
logger := log.With(t.logger, "addr", c.RemoteAddr())
|
||||
|
||||
c.SetPingHandler(func(message string) error {
|
||||
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
|
||||
if err == websocket.ErrCloseSent {
|
||||
return nil
|
||||
} else if e, ok := err.(net.Error); ok && e.Temporary() {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
logger := t.logger.With("addr", c.RemoteAddr())
|
||||
|
||||
var txNumber = 0
|
||||
|
||||
@@ -116,24 +135,38 @@ func (t *transacter) sendLoop(connIndex int) {
|
||||
t.wg.Done()
|
||||
}()
|
||||
|
||||
// hash of the host name is a part of each tx
|
||||
var hostnameHash [md5.Size]byte
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
hostname = "127.0.0.1"
|
||||
}
|
||||
hostnameHash = md5.Sum([]byte(hostname))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-txsTicker.C:
|
||||
startTime := time.Now()
|
||||
|
||||
for i := 0; i < t.Rate; i++ {
|
||||
// each transaction embeds connection index and tx number
|
||||
tx := generateTx(connIndex, txNumber)
|
||||
// each transaction embeds connection index, tx number and hash of the hostname
|
||||
tx := generateTx(connIndex, txNumber, hostnameHash)
|
||||
paramsJson, err := json.Marshal(map[string]interface{}{"tx": hex.EncodeToString(tx)})
|
||||
if err != nil {
|
||||
fmt.Printf("failed to encode params: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
rawParamsJson := json.RawMessage(paramsJson)
|
||||
|
||||
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
||||
err := c.WriteJSON(rpctypes.RPCRequest{
|
||||
err = c.WriteJSON(rpctypes.RPCRequest{
|
||||
JSONRPC: "2.0",
|
||||
ID: "",
|
||||
ID: "tm-bench",
|
||||
Method: "broadcast_tx_async",
|
||||
Params: []interface{}{hex.EncodeToString(tx)},
|
||||
Params: &rawParamsJson,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed"))
|
||||
fmt.Printf("%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
@@ -142,12 +175,12 @@ func (t *transacter) sendLoop(connIndex int) {
|
||||
|
||||
timeToSend := time.Now().Sub(startTime)
|
||||
time.Sleep(time.Second - timeToSend)
|
||||
logger.Log("event", fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
|
||||
logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
|
||||
case <-pingsTicker.C:
|
||||
// Right now go-rpc server closes the connection in the absence of pings
|
||||
// go-rpc server closes the connection in the absence of pings
|
||||
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
||||
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
|
||||
logger.Log("err", errors.Wrap(err, "failed to write ping message"))
|
||||
logger.Error("failed to write ping message", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +190,7 @@ func (t *transacter) sendLoop(connIndex int) {
|
||||
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
||||
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||
if err != nil {
|
||||
logger.Log("err", errors.Wrap(err, "failed to write close message"))
|
||||
logger.Error("failed to write close message", "err", err)
|
||||
}
|
||||
|
||||
return
|
||||
@@ -170,12 +203,18 @@ func connect(host string) (*websocket.Conn, *http.Response, error) {
|
||||
return websocket.DefaultDialer.Dial(u.String(), nil)
|
||||
}
|
||||
|
||||
func generateTx(a int, b int) []byte {
|
||||
tx := make([]byte, 250)
|
||||
binary.PutUvarint(tx[:32], uint64(a))
|
||||
binary.PutUvarint(tx[32:64], uint64(b))
|
||||
if _, err := rand.Read(tx[234:]); err != nil {
|
||||
panic(errors.Wrap(err, "failed to generate transaction"))
|
||||
func generateTx(connIndex int, txNumber int, hostnameHash [md5.Size]byte) []byte {
|
||||
tx := make([]byte, txSize)
|
||||
|
||||
binary.PutUvarint(tx[:8], uint64(connIndex))
|
||||
binary.PutUvarint(tx[8:16], uint64(txNumber))
|
||||
copy(tx[16:32], hostnameHash[:16])
|
||||
binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
|
||||
|
||||
// 40-* random data
|
||||
if _, err := rand.Read(tx[40:]); err != nil {
|
||||
panic(errors.Wrap(err, "failed to read random bytes"))
|
||||
}
|
||||
|
||||
return tx
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM alpine:3.5
|
||||
FROM alpine:3.6
|
||||
|
||||
WORKDIR /app
|
||||
COPY tm-monitor /app/tm-monitor
|
||||
|
||||
@@ -7,6 +7,6 @@ COPY Makefile /go/src/github.com/tendermint/tools/tm-monitor/
|
||||
COPY glide.yaml /go/src/github.com/tendermint/tools/tm-monitor/
|
||||
COPY glide.lock /go/src/github.com/tendermint/tools/tm-monitor/
|
||||
|
||||
RUN make get_deps
|
||||
RUN make get_vendor_deps
|
||||
|
||||
COPY . /go/src/github.com/tendermint/tools/tm-monitor
|
||||
|
||||
@@ -8,7 +8,7 @@ PACKAGES=$(shell go list ./... | grep -v '/vendor/')
|
||||
tools:
|
||||
go get -v $(GOTOOLS)
|
||||
|
||||
get_deps: tools
|
||||
get_vendor_deps: tools
|
||||
glide install
|
||||
|
||||
build:
|
||||
@@ -45,4 +45,4 @@ clean:
|
||||
rm -f ./tm-monitor
|
||||
rm -rf ./dist
|
||||
|
||||
.PHONY: tools get_deps build install test build-all dist clean build-docker
|
||||
.PHONY: tools get_vendor_deps build install test build-all dist clean build-docker
|
||||
|
||||
@@ -1,30 +1,28 @@
|
||||
// eventmeter - generic system to subscribe to events and record their frequency.
|
||||
package eventmeter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pkg/errors"
|
||||
metrics "github.com/rcrowley/go-metrics"
|
||||
client "github.com/tendermint/tendermint/rpc/lib/client"
|
||||
"github.com/tendermint/tmlibs/events"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
//------------------------------------------------------
|
||||
// Generic system to subscribe to events and record their frequency
|
||||
//------------------------------------------------------
|
||||
const (
|
||||
// Get ping/pong latency and call LatencyCallbackFunc with this period.
|
||||
latencyPeriod = 1 * time.Second
|
||||
|
||||
//------------------------------------------------------
|
||||
// Meter for a particular event
|
||||
// Check if the WS client is connected every
|
||||
connectionCheckPeriod = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// Closure to enable side effects from receiving an event
|
||||
type EventCallbackFunc func(em *EventMetric, data events.EventData)
|
||||
|
||||
// Metrics for a given event
|
||||
// EventMetric exposes metrics for an event.
|
||||
type EventMetric struct {
|
||||
ID string `json:"id"`
|
||||
Started time.Time `json:"start_time"`
|
||||
@@ -42,15 +40,15 @@ type EventMetric struct {
|
||||
Rate15 float64 `json:"rate_15" wire:"unsafe"`
|
||||
RateMean float64 `json:"rate_mean" wire:"unsafe"`
|
||||
|
||||
// so the event can have effects in the event-meter's consumer.
|
||||
// runs in a go routine
|
||||
// so the event can have effects in the eventmeter's consumer. runs in a go
|
||||
// routine.
|
||||
callback EventCallbackFunc
|
||||
}
|
||||
|
||||
func (metric *EventMetric) Copy() *EventMetric {
|
||||
metric2 := *metric
|
||||
metric2.meter = metric.meter.Snapshot()
|
||||
return &metric2
|
||||
metricCopy := *metric
|
||||
metricCopy.meter = metric.meter.Snapshot()
|
||||
return &metricCopy
|
||||
}
|
||||
|
||||
// called on GetMetric
|
||||
@@ -63,35 +61,32 @@ func (metric *EventMetric) fillMetric() *EventMetric {
|
||||
return metric
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
// Websocket client and event meter for many events
|
||||
// EventCallbackFunc is a closure to enable side effects from receiving an
|
||||
// event.
|
||||
type EventCallbackFunc func(em *EventMetric, data interface{})
|
||||
|
||||
const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn
|
||||
|
||||
// Get the eventID and data out of the raw json received over the go-rpc websocket
|
||||
// EventUnmarshalFunc is a closure to get the eventType and data out of the raw
|
||||
// JSON received over the RPC WebSocket.
|
||||
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
|
||||
|
||||
// Closure to enable side effects from receiving a pong
|
||||
// LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
|
||||
type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
|
||||
|
||||
// Closure to notify consumer that the connection died
|
||||
// DisconnectCallbackFunc is a closure to notify a consumer that the connection
|
||||
// has died.
|
||||
type DisconnectCallbackFunc func()
|
||||
|
||||
// Each node gets an event meter to track events for that node
|
||||
// EventMeter tracks events, reports latency and disconnects.
|
||||
type EventMeter struct {
|
||||
wsc *client.WSClient
|
||||
|
||||
mtx sync.Mutex
|
||||
events map[string]*EventMetric
|
||||
|
||||
// to record ws latency
|
||||
timer metrics.Timer
|
||||
lastPing time.Time
|
||||
receivedPong bool
|
||||
unmarshalEvent EventUnmarshalFunc
|
||||
latencyCallback LatencyCallbackFunc
|
||||
disconnectCallback DisconnectCallbackFunc
|
||||
|
||||
unmarshalEvent EventUnmarshalFunc
|
||||
subscribed bool
|
||||
|
||||
quit chan struct{}
|
||||
|
||||
@@ -99,54 +94,44 @@ type EventMeter struct {
|
||||
}
|
||||
|
||||
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
|
||||
em := &EventMeter{
|
||||
wsc: client.NewWSClient(addr, "/websocket"),
|
||||
return &EventMeter{
|
||||
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)),
|
||||
events: make(map[string]*EventMetric),
|
||||
timer: metrics.NewTimer(),
|
||||
receivedPong: true,
|
||||
unmarshalEvent: unmarshalEvent,
|
||||
logger: log.NewNopLogger(),
|
||||
}
|
||||
return em
|
||||
}
|
||||
|
||||
// SetLogger lets you set your own logger
|
||||
// SetLogger lets you set your own logger.
|
||||
func (em *EventMeter) SetLogger(l log.Logger) {
|
||||
em.logger = l
|
||||
em.wsc.SetLogger(l.With("module", "rpcclient"))
|
||||
}
|
||||
|
||||
// String returns a string representation of event meter.
|
||||
func (em *EventMeter) String() string {
|
||||
return em.wsc.Address
|
||||
}
|
||||
|
||||
// Start boots up event meter.
|
||||
func (em *EventMeter) Start() error {
|
||||
if _, err := em.wsc.Reset(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := em.wsc.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
em.wsc.Conn.SetPongHandler(func(m string) error {
|
||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.receivedPong = true
|
||||
em.timer.UpdateSince(em.lastPing)
|
||||
if em.latencyCallback != nil {
|
||||
go em.latencyCallback(em.timer.Mean())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
em.quit = make(chan struct{})
|
||||
go em.receiveRoutine()
|
||||
go em.disconnectRoutine()
|
||||
|
||||
return em.resubscribe()
|
||||
err := em.subscribe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
em.subscribed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the EventMeter.
|
||||
// Stop stops event meter.
|
||||
func (em *EventMeter) Stop() {
|
||||
close(em.quit)
|
||||
|
||||
@@ -155,88 +140,70 @@ func (em *EventMeter) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
// StopAndCallDisconnectCallback stops the EventMeter and calls
|
||||
// disconnectCallback if present.
|
||||
func (em *EventMeter) StopAndCallDisconnectCallback() {
|
||||
if em.wsc.IsRunning() {
|
||||
em.wsc.Stop()
|
||||
}
|
||||
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
if em.disconnectCallback != nil {
|
||||
go em.disconnectCallback()
|
||||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error {
|
||||
// Subscribe for the given event type. Callback function will be called upon
|
||||
// receiving an event.
|
||||
func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
if _, ok := em.events[eventID]; ok {
|
||||
if _, ok := em.events[eventType]; ok {
|
||||
return fmt.Errorf("subscribtion already exists")
|
||||
}
|
||||
if err := em.wsc.Subscribe(eventID); err != nil {
|
||||
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metric := &EventMetric{
|
||||
ID: eventID,
|
||||
Started: time.Now(),
|
||||
MinDuration: 1 << 62,
|
||||
meter: metrics.NewMeter(),
|
||||
callback: cb,
|
||||
meter: metrics.NewMeter(),
|
||||
callback: cb,
|
||||
}
|
||||
em.events[eventID] = metric
|
||||
em.events[eventType] = metric
|
||||
return nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) Unsubscribe(eventID string) error {
|
||||
// Unsubscribe from the given event type.
|
||||
func (em *EventMeter) Unsubscribe(eventType string) error {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
if err := em.wsc.Unsubscribe(eventID); err != nil {
|
||||
if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil {
|
||||
return err
|
||||
}
|
||||
// XXX: should we persist or save this info first?
|
||||
delete(em.events, eventID)
|
||||
delete(em.events, eventType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fill in the latest data for an event and return a copy
|
||||
func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) {
|
||||
// GetMetric fills in the latest data for an event and return a copy.
|
||||
func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
metric, ok := em.events[eventID]
|
||||
metric, ok := em.events[eventType]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unknown event %s", eventID)
|
||||
return nil, fmt.Errorf("unknown event: %s", eventType)
|
||||
}
|
||||
return metric.fillMetric().Copy(), nil
|
||||
}
|
||||
|
||||
// Return the average latency over the websocket
|
||||
func (em *EventMeter) Latency() float64 {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
return em.timer.Mean()
|
||||
}
|
||||
|
||||
// RegisterLatencyCallback allows you to set latency callback.
|
||||
func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.latencyCallback = f
|
||||
}
|
||||
|
||||
// RegisterDisconnectCallback allows you to set disconnect callback.
|
||||
func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.disconnectCallback = f
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Private
|
||||
|
||||
func (em *EventMeter) resubscribe() error {
|
||||
for eventID, _ := range em.events {
|
||||
if err := em.wsc.Subscribe(eventID); err != nil {
|
||||
func (em *EventMeter) subscribe() error {
|
||||
for eventType, _ := range em.events {
|
||||
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -244,40 +211,31 @@ func (em *EventMeter) resubscribe() error {
|
||||
}
|
||||
|
||||
func (em *EventMeter) receiveRoutine() {
|
||||
pingTime := time.Second * 1
|
||||
pingTicker := time.NewTicker(pingTime)
|
||||
pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
|
||||
|
||||
var err error
|
||||
latencyTicker := time.NewTicker(latencyPeriod)
|
||||
for {
|
||||
select {
|
||||
case <-pingTicker.C:
|
||||
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
|
||||
em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket"))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
} else if pingAttempts >= maxPingsPerPong {
|
||||
em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
}
|
||||
case r := <-em.wsc.ResultsCh:
|
||||
if r == nil {
|
||||
em.logger.Error("err", errors.New("Expected some event, received nil"))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
}
|
||||
eventID, data, err := em.unmarshalEvent(r)
|
||||
if err != nil {
|
||||
em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event"))
|
||||
case rawEvent := <-em.wsc.ResultsCh:
|
||||
if rawEvent == nil {
|
||||
em.logger.Error("expected some event, got nil")
|
||||
continue
|
||||
}
|
||||
if eventID != "" {
|
||||
em.updateMetric(eventID, data)
|
||||
eventType, data, err := em.unmarshalEvent(rawEvent)
|
||||
if err != nil {
|
||||
em.logger.Error("failed to unmarshal event", "err", err)
|
||||
continue
|
||||
}
|
||||
if eventType != "" { // FIXME how can it be an empty string?
|
||||
em.updateMetric(eventType, data)
|
||||
}
|
||||
case err := <-em.wsc.ErrorsCh:
|
||||
if err != nil {
|
||||
em.logger.Error("expected some event, got error", "err", err)
|
||||
}
|
||||
case <-latencyTicker.C:
|
||||
if em.wsc.IsActive() {
|
||||
em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean())
|
||||
}
|
||||
case <-em.wsc.Quit:
|
||||
em.logger.Error("err", errors.New("WSClient closed unexpectedly"))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
case <-em.quit:
|
||||
return
|
||||
@@ -285,29 +243,31 @@ func (em *EventMeter) receiveRoutine() {
|
||||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
// ping to record latency
|
||||
if !em.receivedPong {
|
||||
return pingAttempts + 1, nil
|
||||
func (em *EventMeter) disconnectRoutine() {
|
||||
ticker := time.NewTicker(connectionCheckPeriod)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
|
||||
em.callDisconnectCallback()
|
||||
em.subscribed = false
|
||||
} else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
|
||||
em.subscribe()
|
||||
em.subscribed = true
|
||||
}
|
||||
case <-em.wsc.Quit:
|
||||
return
|
||||
case <-em.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
em.lastPing = time.Now()
|
||||
em.receivedPong = false
|
||||
err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
|
||||
if err != nil {
|
||||
return pingAttempts, err
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) updateMetric(eventID string, data events.EventData) {
|
||||
func (em *EventMeter) updateMetric(eventType string, data events.EventData) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
metric, ok := em.events[eventID]
|
||||
metric, ok := em.events[eventType]
|
||||
if !ok {
|
||||
// we already unsubscribed, or got an unexpected event
|
||||
return
|
||||
@@ -328,3 +288,19 @@ func (em *EventMeter) updateMetric(eventID string, data events.EventData) {
|
||||
go metric.callback(metric.Copy(), data)
|
||||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) callDisconnectCallback() {
|
||||
em.mtx.Lock()
|
||||
if em.disconnectCallback != nil {
|
||||
go em.disconnectCallback()
|
||||
}
|
||||
em.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) {
|
||||
em.mtx.Lock()
|
||||
if em.latencyCallback != nil {
|
||||
go em.latencyCallback(meanLatencyNanoSeconds)
|
||||
}
|
||||
em.mtx.Unlock()
|
||||
}
|
||||
|
||||
22
tm-monitor/glide.lock
generated
22
tm-monitor/glide.lock
generated
@@ -1,5 +1,5 @@
|
||||
hash: 80c204190057df1e74d32ecd7095e8a1a865c3a06671f1a31d5240e1e3ff2c64
|
||||
updated: 2017-05-20T17:49:23.646798165-04:00
|
||||
hash: 1a38134bef18f688b42d6d52fcb02682604e8c1c9e308f6e2ce8c4a461c903a9
|
||||
updated: 2017-10-06T06:57:56.777237539Z
|
||||
imports:
|
||||
- name: github.com/btcsuite/btcd
|
||||
version: 583684b21bfbde9b5fc4403916fd7c807feb0289
|
||||
@@ -13,6 +13,12 @@ imports:
|
||||
- log/term
|
||||
- name: github.com/go-logfmt/logfmt
|
||||
version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
|
||||
- name: github.com/go-playground/locales
|
||||
version: 1e5f1161c6416a5ff48840eb8724a394e48cc534
|
||||
subpackages:
|
||||
- currency
|
||||
- name: github.com/go-playground/universal-translator
|
||||
version: 71201497bace774495daed26a3874fd339e0b538
|
||||
- name: github.com/go-stack/stack
|
||||
version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82
|
||||
- name: github.com/golang/protobuf
|
||||
@@ -24,11 +30,11 @@ imports:
|
||||
- name: github.com/kr/logfmt
|
||||
version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0
|
||||
- name: github.com/pkg/errors
|
||||
version: bfd5150e4e41705ded2129ec33379de1cb90b513
|
||||
version: 645ef00459ed84a119197bfb8d8205042c6df63d
|
||||
- name: github.com/rcrowley/go-metrics
|
||||
version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c
|
||||
- name: github.com/tendermint/abci
|
||||
version: 864d1f80b36b440bde030a5c18d8ac3aa8c2949d
|
||||
version: 191c4b6d176169ffc7f9972d490fa362a3b7d940
|
||||
subpackages:
|
||||
- client
|
||||
- example/dummy
|
||||
@@ -39,13 +45,13 @@ imports:
|
||||
- edwards25519
|
||||
- extra25519
|
||||
- name: github.com/tendermint/go-crypto
|
||||
version: 7dff40942a64cdeefefa9446b2d104750b349f8a
|
||||
version: 311e8c1bf00fa5868daad4f8ea56dcad539182c0
|
||||
- name: github.com/tendermint/go-wire
|
||||
version: 5f88da3dbc1a72844e6dfaf274ce87f851d488eb
|
||||
subpackages:
|
||||
- data
|
||||
- name: github.com/tendermint/tendermint
|
||||
version: 267f134d44e76efb2adef5f0c993da8a5d5bd1b8
|
||||
version: 7682ad9a60162dd17fd6f61aeed7049a8635ac78
|
||||
subpackages:
|
||||
- config
|
||||
- p2p
|
||||
@@ -56,7 +62,7 @@ imports:
|
||||
- rpc/lib/types
|
||||
- types
|
||||
- name: github.com/tendermint/tmlibs
|
||||
version: 306795ae1d8e4f4a10dcc8bdb32a00455843c9d5
|
||||
version: 7dd6b3d3f8a7a998a79bdd0d8222252b309570f3
|
||||
subpackages:
|
||||
- common
|
||||
- events
|
||||
@@ -97,6 +103,8 @@ imports:
|
||||
- stats
|
||||
- tap
|
||||
- transport
|
||||
- name: gopkg.in/go-playground/validator.v9
|
||||
version: a021b2ec9a8a8bb970f3f15bc42617cb520e8a64
|
||||
testImports:
|
||||
- name: github.com/davecgh/go-spew
|
||||
version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9
|
||||
|
||||
@@ -1,21 +1,20 @@
|
||||
package: github.com/tendermint/tools/tm-monitor
|
||||
import:
|
||||
- package: github.com/go-kit/kit
|
||||
subpackages:
|
||||
- log
|
||||
- package: github.com/gorilla/websocket
|
||||
- package: github.com/pkg/errors
|
||||
- package: github.com/rcrowley/go-metrics
|
||||
- package: github.com/tendermint/go-crypto
|
||||
- package: github.com/tendermint/tendermint
|
||||
version: develop
|
||||
version: v0.11.0
|
||||
subpackages:
|
||||
- rpc/core/types
|
||||
- rpc/lib/client
|
||||
- rpc/lib/server
|
||||
- types
|
||||
- package: github.com/tendermint/tmlibs
|
||||
version: develop
|
||||
subpackages:
|
||||
- common
|
||||
- events
|
||||
- log
|
||||
testImport:
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
monitor "github.com/tendermint/tools/tm-monitor/monitor"
|
||||
)
|
||||
|
||||
var version = "0.2.1"
|
||||
var version = "0.3.0"
|
||||
|
||||
var logger = log.NewNopLogger()
|
||||
|
||||
@@ -47,7 +47,7 @@ Examples:
|
||||
}
|
||||
|
||||
if noton {
|
||||
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "tm-monitor")
|
||||
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
|
||||
}
|
||||
|
||||
m := startMonitor(flag.Arg(0))
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"log"
|
||||
stdlog "log"
|
||||
"reflect"
|
||||
|
||||
gokitlog "github.com/go-kit/kit/log"
|
||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
em "github.com/tendermint/tools/tm-monitor/eventmeter"
|
||||
)
|
||||
|
||||
@@ -17,7 +16,7 @@ type EventMeter struct {
|
||||
|
||||
func (e *EventMeter) Start() error { return nil }
|
||||
func (e *EventMeter) Stop() {}
|
||||
func (e *EventMeter) SetLogger(l gokitlog.Logger) {}
|
||||
func (e *EventMeter) SetLogger(l log.Logger) {}
|
||||
func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.latencyCallback = cb }
|
||||
func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) {
|
||||
e.disconnectCallback = cb
|
||||
@@ -43,13 +42,13 @@ func (e *EventMeter) Call(callback string, args ...interface{}) {
|
||||
}
|
||||
|
||||
type RpcClient struct {
|
||||
Stubs map[string]ctypes.TMResult
|
||||
Stubs map[string]interface{}
|
||||
}
|
||||
|
||||
func (c *RpcClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
|
||||
s, ok := c.Stubs[method]
|
||||
if !ok {
|
||||
log.Fatalf("Call to %s, but no stub is defined for it", method)
|
||||
stdlog.Fatalf("Call to %s, but no stub is defined for it", method)
|
||||
}
|
||||
|
||||
rv, rt := reflect.ValueOf(result), reflect.TypeOf(result)
|
||||
@@ -61,10 +61,10 @@ func startMonitor(t *testing.T) *monitor.Monitor {
|
||||
func createValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) {
|
||||
emMock = &mock.EventMeter{}
|
||||
|
||||
stubs := make(map[string]ctypes.TMResult)
|
||||
stubs := make(map[string]interface{})
|
||||
pubKey := crypto.GenPrivKeyEd25519().PubKey()
|
||||
stubs["validators"] = &ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}}
|
||||
stubs["status"] = &ctypes.ResultStatus{PubKey: pubKey}
|
||||
stubs["validators"] = ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}}
|
||||
stubs["status"] = ctypes.ResultStatus{PubKey: pubKey}
|
||||
rpcClientMock := &mock.RpcClient{stubs}
|
||||
|
||||
n = monitor.NewNodeWithEventMeterAndRpcClient("tcp://127.0.0.1:46657", emMock, rpcClientMock)
|
||||
|
||||
@@ -125,11 +125,11 @@ func (n *Node) Stop() {
|
||||
|
||||
// implements eventmeter.EventCallbackFunc
|
||||
func newBlockCallback(n *Node) em.EventCallbackFunc {
|
||||
return func(metric *em.EventMetric, data events.EventData) {
|
||||
return func(metric *em.EventMetric, data interface{}) {
|
||||
block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header
|
||||
|
||||
n.Height = uint64(block.Height)
|
||||
n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs)
|
||||
n.logger.Info("new block", "height", block.Height, "numTxs", block.NumTxs)
|
||||
|
||||
if n.blockCh != nil {
|
||||
n.blockCh <- *block
|
||||
@@ -141,7 +141,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc {
|
||||
func latencyCallback(n *Node) em.LatencyCallbackFunc {
|
||||
return func(latency float64) {
|
||||
n.BlockLatency = latency / 1000000.0 // ns to ms
|
||||
n.logger.Info("event", "new block latency", "latency", n.BlockLatency)
|
||||
n.logger.Info("new block latency", "latency", n.BlockLatency)
|
||||
|
||||
if n.blockLatencyCh != nil {
|
||||
n.blockLatencyCh <- latency
|
||||
@@ -158,17 +158,6 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
|
||||
if n.disconnectCh != nil {
|
||||
n.disconnectCh <- true
|
||||
}
|
||||
|
||||
if err := n.RestartEventMeterBackoff(); err != nil {
|
||||
n.logger.Info("err", errors.Wrap(err, "restart failed"))
|
||||
} else {
|
||||
n.Online = true
|
||||
n.logger.Info("status", "online")
|
||||
|
||||
if n.disconnectCh != nil {
|
||||
n.disconnectCh <- false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,7 +169,7 @@ func (n *Node) RestartEventMeterBackoff() error {
|
||||
time.Sleep(d * time.Second)
|
||||
|
||||
if err := n.em.Start(); err != nil {
|
||||
n.logger.Info("err", errors.Wrap(err, "restart failed"))
|
||||
n.logger.Info("restart failed", "err", err)
|
||||
} else {
|
||||
// TODO: authenticate pubkey
|
||||
return nil
|
||||
@@ -231,7 +220,7 @@ func (n *Node) checkIsValidator() {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
n.logger.Info("err", errors.Wrap(err, "check is validator failed"))
|
||||
n.logger.Info("check is validator failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ func TestNodeNewBlockReceived(t *testing.T) {
|
||||
n.SendBlocksTo(blockCh)
|
||||
|
||||
blockHeader := &tmtypes.Header{Height: 5}
|
||||
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader})
|
||||
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.TMEventData{tmtypes.EventDataNewBlockHeader{blockHeader}})
|
||||
|
||||
assert.Equal(uint64(5), n.Height)
|
||||
assert.Equal(*blockHeader, <-blockCh)
|
||||
@@ -68,10 +68,7 @@ func TestNodeConnectionLost(t *testing.T) {
|
||||
emMock.Call("disconnectCallback")
|
||||
|
||||
assert.Equal(true, <-disconnectCh)
|
||||
assert.Equal(false, <-disconnectCh)
|
||||
|
||||
// we're back in a race
|
||||
assert.Equal(true, n.Online)
|
||||
assert.Equal(false, n.Online)
|
||||
}
|
||||
|
||||
func TestNumValidators(t *testing.T) {
|
||||
@@ -89,10 +86,10 @@ func TestNumValidators(t *testing.T) {
|
||||
func startValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) {
|
||||
emMock = &mock.EventMeter{}
|
||||
|
||||
stubs := make(map[string]ctypes.TMResult)
|
||||
stubs := make(map[string]interface{})
|
||||
pubKey := crypto.GenPrivKeyEd25519().PubKey()
|
||||
stubs["validators"] = &ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}}
|
||||
stubs["status"] = &ctypes.ResultStatus{PubKey: pubKey}
|
||||
stubs["validators"] = ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}}
|
||||
stubs["status"] = ctypes.ResultStatus{PubKey: pubKey}
|
||||
rpcClientMock := &mock.RpcClient{stubs}
|
||||
|
||||
n = monitor.NewNodeWithEventMeterAndRpcClient("tcp://127.0.0.1:46657", emMock, rpcClientMock)
|
||||
|
||||
@@ -12,9 +12,8 @@ import (
|
||||
func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) {
|
||||
routes := routes(m)
|
||||
|
||||
// serve http and ws
|
||||
mux := http.NewServeMux()
|
||||
wm := rpc.NewWebsocketManager(routes, nil) // TODO: evsw
|
||||
wm := rpc.NewWebsocketManager(routes, nil)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
rpc.RegisterRPCFuncs(mux, routes, logger)
|
||||
if _, err := rpc.StartHTTPServer(listenAddr, mux, logger); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user