diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7e776ab07..f389f3c91 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -9,6 +9,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -19,7 +20,6 @@ const ( defaultChannelCapacity = 100 defaultSleepIntervalMS = 500 trySyncIntervalMS = 100 - // stop syncing when last block's time is // within this much of the system time. stopSyncingDurationMinutes = 10 @@ -41,6 +41,8 @@ type BlockchainReactor struct { lastBlock *types.Block quit chan struct{} running uint32 + + evsw *events.EventSwitch } func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor { @@ -239,6 +241,11 @@ func (bcR *BlockchainReactor) BroadcastStatus() error { return nil } +// implements events.Eventable +func (bcR *BlockchainReactor) AddEventSwitch(evsw *events.EventSwitch) { + bcR.evsw = evsw +} + //----------------------------------------------------------------------------- // Messages diff --git a/consensus/reactor.go b/consensus/reactor.go index a3028b6d5..da358e594 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -12,6 +12,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -38,6 +39,8 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState + + evsw *events.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { @@ -230,6 +233,11 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) { conR.conS.updateToState(state, false) } +// implements events.Eventable +func (conR *ConsensusReactor) AddEventSwitch(evsw *events.EventSwitch) { + conR.evsw = evsw +} + //-------------------------------------- func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { diff --git a/daemon/daemon.go b/daemon/daemon.go index 419c3786f..ceaafaa5c 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -9,6 +9,7 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" dbm "github.com/tendermint/tendermint/db" + "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/rpc" @@ -18,6 +19,7 @@ import ( type Node struct { sw *p2p.Switch + evsw *events.EventSwitch book *p2p.AddrBook blockStore *bc.BlockStore pexReactor *p2p.PEXReactor @@ -50,6 +52,9 @@ func NewNode() *Node { log.Info("No PrivValidator found", "file", config.App().GetString("PrivValidatorFile")) } + eventSwitch := new(events.EventSwitch) + eventSwitch.Start() + // Get PEXReactor book := p2p.NewAddrBook(config.App().GetString("AddrBookFile")) pexReactor := p2p.NewPEXReactor(book) @@ -75,8 +80,13 @@ func NewNode() *Node { sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) + // add the event switch to all services + // they should all satisfy events.Eventable + AddEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) + return &Node{ sw: sw, + evsw: eventSwitch, book: book, blockStore: blockStore, pexReactor: pexReactor, @@ -105,6 +115,13 @@ func (n *Node) Stop() { n.book.Stop() } +// Add the event switch to reactors, mempool, etc. +func AddEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { + for _, e := range eventables { + e.AddEventSwitch(evsw) + } +} + // Add a Listener to accept inbound peer connections. func (n *Node) AddListener(l p2p.Listener) { log.Info(Fmt("Added %v", l)) @@ -130,7 +147,7 @@ func (n *Node) StartRPC() { core.SetConsensusState(n.consensusState) core.SetMempoolReactor(n.mempoolReactor) core.SetSwitch(n.sw) - rpc.StartHTTPServer() + rpc.StartHTTPServer(n.evsw) } func (n *Node) Switch() *p2p.Switch { diff --git a/events/events.go b/events/events.go index 05c6f663f..73d1935fe 100644 --- a/events/events.go +++ b/events/events.go @@ -5,6 +5,12 @@ import ( "sync/atomic" ) +// reactors and other modules should export +// this interface to become eventable +type Eventable interface { + AddEventSwitch(*EventSwitch) +} + type EventSwitch struct { mtx sync.RWMutex eventCells map[string]*eventCell diff --git a/mempool/reactor.go b/mempool/reactor.go index e16cf9332..f6d66f67f 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -22,6 +23,8 @@ type MempoolReactor struct { stopped uint32 Mempool *Mempool + + evsw *events.EventSwitch } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -110,6 +113,11 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return nil } +// implements events.Eventable +func (memR *MempoolReactor) AddEventSwitch(evsw *events.EventSwitch) { + memR.evsw = evsw +} + //----------------------------------------------------------------------------- // Messages diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 6af070589..2c0033958 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -9,6 +9,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" ) var pexErrInvalidMessage = errors.New("Invalid PEX message") @@ -31,6 +32,8 @@ type PEXReactor struct { stopped uint32 book *AddrBook + + evsw *events.EventSwitch } func NewPEXReactor(book *AddrBook) *PEXReactor { @@ -207,6 +210,11 @@ func (pexR *PEXReactor) ensurePeers() { } } +// implements events.Eventable +func (pexR *PEXReactor) AddEventSwitch(evsw *events.EventSwitch) { + pexR.evsw = evsw +} + //----------------------------------------------------------------------------- // Messages diff --git a/rpc/handlers.go b/rpc/handlers.go index 9fc1f7874..fc6a501df 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -226,6 +226,45 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { //----------------------------------------------------------------------------- // rpc.websocket +// for requests coming in +type WsRequest struct { + Type string // subscribe or unsubscribe + Event string +} + +// for responses going out +type WsResponse struct { + Event string + Data interface{} + Error string +} + +// a single websocket connection +// contains the listeners id +type Connection struct { + id string + wsCon *websocket.Conn + writeChan chan WsResponse + quitChan chan struct{} + failedSends uint +} + +// new websocket connection wrapper +func NewConnection(con *websocket.Conn) *Connection { + return &Connection{ + id: con.RemoteAddr().String(), + wsCon: con, + writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full + } +} + +// close the connection +func (c *Connection) Close() { + c.wsCon.Close() + close(c.writeChan) + close(c.quitChan) +} + // main manager for all websocket connections // holds the event switch type WebsocketManager struct { @@ -327,50 +366,16 @@ func (w *WebsocketManager) write(con *Connection) { websocket.Message.Send(con.wsCon, buf.Bytes()) } case <-con.quitChan: - close(con.quitChan) - con.Close() + w.closeConn(con) return } } } -// a single websocket connection -// contains the listeners id -type Connection struct { - id string - wsCon *websocket.Conn - writeChan chan WsResponse - quitChan chan struct{} - failedSends uint -} - -// for requests coming in -type WsRequest struct { - Type string // subscribe or unsubscribe - Event string -} - -// for responses going out -type WsResponse struct { - Event string - Data interface{} - Error string -} - -// new websocket connection wrapper -func NewConnection(con *websocket.Conn) *Connection { - return &Connection{ - id: con.RemoteAddr().String(), - wsCon: con, - writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full - } -} - -// close the channel -// should only be called by firing on c.quitChan -func (c *Connection) Close() { - close(c.writeChan) - c.wsCon.Close() +// close a connection and delete from manager +func (w *WebsocketManager) closeConn(con *Connection) { + con.Close() + delete(w.cons, con.id) } // rpc.websocket