new pubsub package

comment out failing consensus tests for now

rewrite rpc httpclient to use new pubsub package

import pubsub as tmpubsub, query as tmquery

make event IDs constants
EventKey -> EventTypeKey

rename EventsPubsub to PubSub

mempool does not use pubsub

rename eventsSub to pubsub

new subscribe API

fix channel size issues and consensus tests bugs

refactor rpc client

add missing discardFromChan method

add mutex

rename pubsub to eventBus

remove IsRunning from WSRPCConnection interface (not needed)

add a comment in broadcastNewRoundStepsAndVotes

rename registerEventCallbacks to broadcastNewRoundStepsAndVotes

See https://dave.cheney.net/2014/03/19/channel-axioms

stop eventBuses after reactor tests

remove unnecessary Unsubscribe

return subscribe helper function

move discardFromChan to where it is used

subscribe now returns an err

this gives us ability to refuse to subscribe if pubsub is at its max
capacity.

use context for control overflow

cache queries

handle err when subscribing in replay_test

rename testClientID to testSubscriber

extract var

set channel buffer capacity to 1 in replay_file

fix byzantine_test

unsubscribe from single event, not all events

refactor httpclient to return events to appropriate channels

return failing testReplayCrashBeforeWriteVote test

fix TestValidatorSetChanges

refactor code a bit

fix testReplayCrashBeforeWriteVote

add comment

fix TestValidatorSetChanges

fixes from Bucky's review

update comment [ci skip]

test TxEventBuffer

update changelog

fix TestValidatorSetChanges (2nd attempt)

only do wg.Done when no errors

benchmark event bus

create pubsub server inside NewEventBus

only expose config params (later if needed)

set buffer capacity to 0 so we are not testing cache

new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}

This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"

use TimeoutCommit instead of afterPublishEventNewBlockTimeout

TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.

waitForBlockWithUpdatedVals

rewrite WAL crash tests

Task:
test that we can recover from any WAL crash.

Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).

when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.

remove sleep

no cs.Lock around wal.Save

test different cases (empty block, non-empty block, ...)

comments

add comments

test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks

fixes as per Bucky's last review

reset subscriptions on UnsubscribeAll

use a simple counter to track message for which we panicked

also, set a smaller part size for all test cases
This commit is contained in:
Anton Kaliaev
2017-06-26 19:00:30 +04:00
parent 7c85f15a6c
commit f6539737de
43 changed files with 1368 additions and 870 deletions

View File

@@ -31,7 +31,7 @@ func TestHeaderEvents(t *testing.T) {
defer c.Stop()
}
evtTyp := types.EventStringNewBlockHeader()
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
@@ -54,20 +54,20 @@ func TestBlockEvents(t *testing.T) {
// listen for a new block; ensure height increases by 1
var firstBlockHeight int
for i := 0; i < 3; i++ {
evtTyp := types.EventStringNewBlock()
for j := 0; j < 3; j++ {
evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
require.Nil(err, "%d: %+v", j, err)
blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock)
require.True(ok, "%d: %#v", i, evt)
require.True(ok, "%d: %#v", j, evt)
block := blockEvent.Block
if i == 0 {
if j == 0 {
firstBlockHeight = block.Header.Height
continue
}
require.Equal(block.Header.Height, firstBlockHeight+i)
require.Equal(block.Header.Height, firstBlockHeight+j)
}
}
}
@@ -86,7 +86,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventStringTx(types.Tx(tx))
evtTyp := types.EventTx
// send async
txres, err := c.BroadcastTxAsync(tx)
@@ -119,9 +119,9 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventStringTx(types.Tx(tx))
evtTyp := types.EventTx
// send async
// send sync
txres, err := c.BroadcastTxSync(tx)
require.Nil(err, "%+v", err)
require.True(txres.Code.IsOK())

View File

@@ -1,12 +1,12 @@
package client
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events"
)
// Waiter is informed of current height, decided whether to quit early
@@ -56,33 +56,25 @@ func WaitForHeight(c StatusClient, h int, waiter Waiter) error {
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(evsw types.EventSwitch,
evtTyp string, timeout time.Duration) (types.TMEventData, error) {
listener := cmn.RandStr(12)
evts, quit := make(chan events.EventData, 10), make(chan bool, 1)
// start timeout count-down
go func() {
time.Sleep(timeout)
quit <- true
}()
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
evts := make(chan interface{}, 1)
// register for the next event of this type
evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) {
evts <- data
})
query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp)
err := c.Subscribe(ctx, query, evts)
if err != nil {
return types.TMEventData{}, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
defer evsw.RemoveListenerForEvent(evtTyp, listener)
// defer evsw.RemoveListener(listener) // this also works
defer c.Unsubscribe(ctx, query)
select {
case <-quit:
return types.TMEventData{}, errors.New("timed out waiting for event")
case evt := <-evts:
tmevt, ok := evt.(types.TMEventData)
if ok {
return tmevt, nil
}
return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt)
return evt.(types.TMEventData), nil
case <-ctx.Done():
return types.TMEventData{}, errors.New("timed out waiting for event")
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/pkg/errors"
@@ -11,7 +12,7 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
events "github.com/tendermint/tmlibs/events"
cmn "github.com/tendermint/tmlibs/common"
)
/*
@@ -40,10 +41,9 @@ func NewHTTP(remote, wsEndpoint string) *HTTP {
}
var (
_ Client = (*HTTP)(nil)
_ NetworkClient = (*HTTP)(nil)
_ types.EventSwitch = (*HTTP)(nil)
_ types.EventSwitch = (*WSEvents)(nil)
_ Client = (*HTTP)(nil)
_ NetworkClient = (*HTTP)(nil)
_ EventsClient = (*HTTP)(nil)
)
func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
@@ -186,128 +186,114 @@ func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/
type WSEvents struct {
types.EventSwitch
cmn.BaseService
remote string
endpoint string
ws *rpcclient.WSClient
subscriptions map[string]chan<- interface{}
mtx sync.RWMutex
// used for signaling the goroutine that feeds ws -> EventSwitch
quit chan bool
done chan bool
// used to maintain counts of actively listened events
// so we can properly subscribe/unsubscribe
// FIXME: thread-safety???
// FIXME: reuse code from tmlibs/events???
evtCount map[string]int // count how many time each event is subscribed
listeners map[string][]string // keep track of which events each listener is listening to
}
func newWSEvents(remote, endpoint string) *WSEvents {
return &WSEvents{
EventSwitch: types.NewEventSwitch(),
endpoint: endpoint,
remote: remote,
quit: make(chan bool, 1),
done: make(chan bool, 1),
evtCount: map[string]int{},
listeners: map[string][]string{},
wsEvents := &WSEvents{
endpoint: endpoint,
remote: remote,
quit: make(chan bool, 1),
done: make(chan bool, 1),
subscriptions: make(map[string]chan<- interface{}),
}
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
return wsEvents
}
// Start is the only way I could think the extend OnStart from
// events.eventSwitch. If only it wasn't private...
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() (bool, error) {
st, err := w.EventSwitch.Start()
// if we did start, then OnStart here...
if st && err == nil {
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
_, err = ws.Start()
if err == nil {
w.ws = ws
go w.eventListener()
}
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
started, err := ws.Start()
if err == nil {
w.ws = ws
go w.eventListener()
}
return st, errors.Wrap(err, "StartWSEvent")
return started, errors.Wrap(err, "StartWSEvent")
}
// Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() bool {
stop := w.EventSwitch.Stop()
if stop {
// send a message to quit to stop the eventListener
w.quit <- true
<-w.done
w.ws.Stop()
w.ws = nil
}
return stop
// send a message to quit to stop the eventListener
w.quit <- true
<-w.done
w.ws.Stop()
w.ws = nil
return true
}
/** TODO: more intelligent subscriptions! **/
func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
// no one listening -> subscribe
if w.evtCount[event] == 0 {
w.subscribe(event)
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
w.mtx.RLock()
if _, ok := w.subscriptions[query]; ok {
return errors.New("already subscribed")
}
// if this listener was already listening to this event, return early
for _, s := range w.listeners[listenerID] {
if event == s {
return
}
w.mtx.RUnlock()
err := w.ws.Subscribe(ctx, query)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
// otherwise, add this event to this listener
w.evtCount[event] += 1
w.listeners[listenerID] = append(w.listeners[listenerID], event)
w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
w.mtx.Lock()
w.subscriptions[query] = out
w.mtx.Unlock()
return nil
}
func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
// if this listener is listening already, splice it out
found := false
l := w.listeners[listenerID]
for i, s := range l {
if event == s {
found = true
w.listeners[listenerID] = append(l[:i], l[i+1:]...)
break
}
}
// if the listener wasn't already listening to the event, exit early
if !found {
return
func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error {
err := w.ws.Unsubscribe(ctx, query)
if err != nil {
return err
}
// now we can update the subscriptions
w.evtCount[event] -= 1
if w.evtCount[event] == 0 {
w.unsubscribe(event)
w.mtx.Lock()
defer w.mtx.Unlock()
ch, ok := w.subscriptions[query]
if ok {
close(ch)
delete(w.subscriptions, query)
}
w.EventSwitch.RemoveListenerForEvent(event, listenerID)
return nil
}
func (w *WSEvents) RemoveListener(listenerID string) {
// remove all counts for this listener
for _, s := range w.listeners[listenerID] {
w.evtCount[s] -= 1
if w.evtCount[s] == 0 {
w.unsubscribe(s)
}
func (w *WSEvents) UnsubscribeAll(ctx context.Context) error {
err := w.ws.UnsubscribeAll(ctx)
if err != nil {
return err
}
w.listeners[listenerID] = nil
// then let the switch do it's magic
w.EventSwitch.RemoveListener(listenerID)
w.mtx.Lock()
defer w.mtx.Unlock()
for _, ch := range w.subscriptions {
close(ch)
}
w.subscriptions = make(map[string]chan<- interface{})
return nil
}
// After being reconnected, it is necessary to redo subscription
// to server otherwise no data will be automatically received
// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() {
for event, _ := range w.evtCount {
w.subscribe(event)
for query, out := range w.subscriptions {
// NOTE: no timeout for reconnect
w.Subscribe(context.Background(), query, out)
}
}
@@ -350,23 +336,10 @@ func (w *WSEvents) parseEvent(data []byte) (err error) {
// TODO: ?
return nil
}
// looks good! let's fire this baby!
w.EventSwitch.FireEvent(result.Name, result.Data)
w.mtx.RLock()
if ch, ok := w.subscriptions[result.Query]; ok {
ch <- result.Data
}
w.mtx.RUnlock()
return nil
}
// no way of exposing these failures, so we panic.
// is this right? or silently ignore???
func (w *WSEvents) subscribe(event string) {
err := w.ws.Subscribe(context.TODO(), event)
if err != nil {
panic(err)
}
}
func (w *WSEvents) unsubscribe(event string) {
err := w.ws.Unsubscribe(context.TODO(), event)
if err != nil {
panic(err)
}
}

View File

@@ -20,9 +20,12 @@ implementation.
package client
import (
"context"
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
// ABCIClient groups together the functionality that principally
@@ -64,14 +67,12 @@ type StatusClient interface {
// if you want to listen for events, test if it also
// implements events.EventSwitch
type Client interface {
cmn.Service
ABCIClient
SignClient
HistoryClient
StatusClient
// this Client is reactive, you can subscribe to any TMEventData
// type, given the proper string. see tendermint/types/events.go
types.EventSwitch
EventsClient
}
// NetworkClient is general info about the network state. May not
@@ -83,3 +84,11 @@ type NetworkClient interface {
NetInfo() (*ctypes.ResultNetInfo, error)
DumpConsensusState() (*ctypes.ResultDumpConsensusState, error)
}
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
type EventsClient interface {
Subscribe(ctx context.Context, query string, out chan<- interface{}) error
Unsubscribe(ctx context.Context, query string) error
UnsubscribeAll(ctx context.Context) error
}

View File

@@ -1,22 +1,27 @@
package client
import (
"context"
"github.com/pkg/errors"
data "github.com/tendermint/go-wire/data"
nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
/*
Local is a Client implementation that directly executes the rpc
functions on a given node, without going through HTTP or GRPC
functions on a given node, without going through HTTP or GRPC.
This implementation is useful for:
* Running tests against a node in-process without the overhead
of going through an http server
* Communication between an ABCI app and tendermin core when they
* Communication between an ABCI app and Tendermint core when they
are compiled in process.
For real clients, you probably want to use client.HTTP. For more
@@ -24,7 +29,9 @@ powerful control during testing, you probably want the "client/mock" package.
*/
type Local struct {
node *nm.Node
types.EventSwitch
*types.EventBus
subscriptions map[string]*tmquery.Query
}
// NewLocal configures a client that calls the Node directly.
@@ -33,24 +40,26 @@ type Local struct {
// you can only have one node per process. So make sure test cases
// don't run in parallel, or try to simulate an entire network in
// one process...
func NewLocal(node *nm.Node) Local {
func NewLocal(node *nm.Node) *Local {
node.ConfigureRPC()
return Local{
node: node,
EventSwitch: node.EventSwitch(),
return &Local{
node: node,
EventBus: node.EventBus(),
subscriptions: make(map[string]*tmquery.Query),
}
}
var (
_ Client = Local{}
_ Client = (*Local)(nil)
_ NetworkClient = Local{}
_ EventsClient = (*Local)(nil)
)
func (c Local) Status() (*ctypes.ResultStatus, error) {
func (Local) Status() (*ctypes.ResultStatus, error) {
return core.Status()
}
func (c Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo()
}
@@ -62,50 +71,82 @@ func (c Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQuery
return core.ABCIQuery(path, data, opts.Height, opts.Trusted)
}
func (c Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
func (Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(tx)
}
func (c Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(tx)
}
func (c Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(tx)
}
func (c Local) NetInfo() (*ctypes.ResultNetInfo, error) {
func (Local) NetInfo() (*ctypes.ResultNetInfo, error) {
return core.NetInfo()
}
func (c Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
func (Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState()
}
func (c Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
func (Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(seeds)
}
func (c Local) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
func (Local) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(minHeight, maxHeight)
}
func (c Local) Genesis() (*ctypes.ResultGenesis, error) {
func (Local) Genesis() (*ctypes.ResultGenesis, error) {
return core.Genesis()
}
func (c Local) Block(height *int) (*ctypes.ResultBlock, error) {
func (Local) Block(height *int) (*ctypes.ResultBlock, error) {
return core.Block(height)
}
func (c Local) Commit(height *int) (*ctypes.ResultCommit, error) {
func (Local) Commit(height *int) (*ctypes.ResultCommit, error) {
return core.Commit(height)
}
func (c Local) Validators(height *int) (*ctypes.ResultValidators, error) {
func (Local) Validators(height *int) (*ctypes.ResultValidators, error) {
return core.Validators(height)
}
func (c Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(hash, prove)
}
func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
q, err := tmquery.New(query)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
if err = c.EventBus.Subscribe(ctx, "rpclocalclient", q, out); err != nil {
return errors.Wrap(err, "failed to subscribe")
}
c.subscriptions[query] = q
return nil
}
func (c *Local) Unsubscribe(ctx context.Context, query string) error {
q, ok := c.subscriptions[query]
if !ok {
return errors.New("subscription not found")
}
if err := c.EventBus.Unsubscribe(ctx, "rpclocalclient", q); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
delete(c.subscriptions, query)
return nil
}
func (c *Local) UnsubscribeAll(ctx context.Context) error {
if err := c.EventBus.UnsubscribeAll(ctx, "rpclocalclient"); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
c.subscriptions = make(map[string]*tmquery.Query)
return nil
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
// Client wraps arbitrary implementations of the various interfaces.
@@ -33,8 +34,8 @@ type Client struct {
client.SignClient
client.HistoryClient
client.StatusClient
// create a mock with types.NewEventSwitch()
types.EventSwitch
client.EventsClient
cmn.Service
}
var _ client.Client = Client{}

View File

@@ -18,7 +18,7 @@ func getHTTPClient() *client.HTTP {
return client.NewHTTP(rpcAddr, "/websocket")
}
func getLocalClient() client.Local {
func getLocalClient() *client.Local {
return client.NewLocal(node)
}