diff --git a/account/account.go b/account/account.go index 520afd70c..235da8583 100644 --- a/account/account.go +++ b/account/account.go @@ -6,6 +6,7 @@ import ( "io" "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/merkle" ) // Signable is an interface for all signable things. @@ -24,6 +25,11 @@ func SignBytes(o Signable) []byte { return buf.Bytes() } +// HashSignBytes is a convenience method for getting the hash of the bytes of a signable +func HashSignBytes(o Signable) []byte { + return merkle.HashFromBinary(SignBytes(o)) +} + //----------------------------------------------------------------------------- // Account resides in the application state, and is mutated by transactions diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 33edb4179..9bdfe27cf 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -43,7 +43,7 @@ type BlockchainReactor struct { quit chan struct{} running uint32 - evsw *events.EventSwitch + evsw events.Fireable } func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor { @@ -244,7 +244,7 @@ func (bcR *BlockchainReactor) BroadcastStatus() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { bcR.evsw = evsw } diff --git a/common/word.go b/common/word.go index d67730e13..7c5c8a81a 100644 --- a/common/word.go +++ b/common/word.go @@ -13,10 +13,11 @@ var ( type Word256 [32]byte -func (w Word256) String() string { return string(w[:]) } -func (w Word256) Copy() Word256 { return w } -func (w Word256) Bytes() []byte { return w[:] } // copied. -func (w Word256) Prefix(n int) []byte { return w[:n] } +func (w Word256) String() string { return string(w[:]) } +func (w Word256) Copy() Word256 { return w } +func (w Word256) Bytes() []byte { return w[:] } // copied. +func (w Word256) Prefix(n int) []byte { return w[:n] } +func (w Word256) Postfix(n int) []byte { return w[32-n:] } func (w Word256) IsZero() bool { accum := byte(0) for _, byt := range w { diff --git a/consensus/reactor.go b/consensus/reactor.go index 1e8766458..7c3033d6e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -41,7 +41,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState - evsw *events.EventSwitch + evsw events.Fireable } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { @@ -235,8 +235,9 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) { } // implements events.Eventable -func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { conR.evsw = evsw + conR.conS.SetFireable(evsw) } //-------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index c036fa3b4..c1579c7e9 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -66,17 +66,21 @@ import ( . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" . "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) const ( - roundDuration0 = 10 * time.Second // The first round is 60 seconds long. - roundDurationDelta = 3 * time.Second // Each successive round lasts 15 seconds longer. roundDeadlinePrevote = float64(1.0 / 3.0) // When the prevote is due. roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due. - newHeightDelta = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds. +) + +var ( + RoundDuration0 = 10 * time.Second // The first round is 60 seconds long. + RoundDurationDelta = 3 * time.Second // Each successive round lasts 15 seconds longer. + newHeightDelta = RoundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds. ) var ( @@ -246,6 +250,9 @@ type ConsensusState struct { stagedBlock *types.Block // Cache last staged block. stagedState *sm.State // Cache result of staged block. lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. + + evsw events.Fireable + evc *events.EventCache // set in stageBlock and passed into state } func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { @@ -315,14 +322,14 @@ func (cs *ConsensusState) stepTransitionRoutine() { // NOTE: We can push directly to runActionCh because // we're running in a separate goroutine, which avoids deadlocks. rs := cs.getRoundState() - round, roundStartTime, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) + round, roundStartTime, RoundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) log.Debug("Scheduling next action", "height", rs.Height, "round", round, "step", rs.Step, "roundStartTime", roundStartTime, "elapsedRatio", elapsedRatio) switch rs.Step { case RoundStepNewHeight: // We should run RoundActionPropose when rs.StartTime passes. if elapsedRatio < 0 { // startTime is in the future. - time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(RoundDuration))) } cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose} case RoundStepNewRound: @@ -330,15 +337,15 @@ func (cs *ConsensusState) stepTransitionRoutine() { cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose} case RoundStepPropose: // Wake up when it's time to vote. - time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(RoundDuration))) cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote} case RoundStepPrevote: // Wake up when it's time to precommit. - time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(RoundDuration))) cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit} case RoundStepPrecommit: // Wake up when the round is over. - time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(RoundDuration))) cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionTryCommit} case RoundStepCommit: // There's nothing to scheudle, we're waiting for @@ -437,6 +444,12 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. + // fire some events! + go func() { + newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) + cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) + cs.evc.Flush() + }() scheduleNextAction() continue ACTION_LOOP } else { @@ -1023,6 +1036,9 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS // Create a copy of the state for staging stateCopy := cs.state.Copy() + // reset the event cache and pass it into the state + cs.evc = events.NewEventCache(cs.evsw) + stateCopy.SetFireable(cs.evc) // Commit block onto the copied state. // NOTE: Basic validation is done in state.AppendBlock(). @@ -1107,17 +1123,22 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty } } +// implements events.Eventable +func (cs *ConsensusState) SetFireable(evsw events.Fireable) { + cs.evsw = evsw +} + //----------------------------------------------------------------------------- // total duration of given round func calcRoundDuration(round uint) time.Duration { - return roundDuration0 + roundDurationDelta*time.Duration(round) + return RoundDuration0 + RoundDurationDelta*time.Duration(round) } // startTime is when round zero started. func calcRoundStartTime(round uint, startTime time.Time) time.Time { - return startTime.Add(roundDuration0*time.Duration(round) + - roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2))) + return startTime.Add(RoundDuration0*time.Duration(round) + + RoundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2))) } // calculates the current round given startTime of round zero. @@ -1131,8 +1152,8 @@ func calcRound(startTime time.Time) uint { // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0. // AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now). // R = Floor((-B + Sqrt(B^2 - 4AC))/2A) - A := float64(roundDurationDelta) - B := 2.0*float64(roundDuration0) - float64(roundDurationDelta) + A := float64(RoundDurationDelta) + B := 2.0*float64(RoundDuration0) - float64(RoundDurationDelta) C := 2.0 * float64(startTime.Sub(now)) R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)) / (2 * A)) if math.IsNaN(R) { @@ -1149,12 +1170,12 @@ func calcRound(startTime time.Time) uint { // convenience // NOTE: elapsedRatio can be negative if startTime is in the future. -func calcRoundInfo(startTime time.Time) (round uint, roundStartTime time.Time, roundDuration time.Duration, +func calcRoundInfo(startTime time.Time) (round uint, roundStartTime time.Time, RoundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) { round = calcRound(startTime) roundStartTime = calcRoundStartTime(round, startTime) - roundDuration = calcRoundDuration(round) + RoundDuration = calcRoundDuration(round) roundElapsed = time.Now().Sub(roundStartTime) - elapsedRatio = float64(roundElapsed) / float64(roundDuration) + elapsedRatio = float64(roundElapsed) / float64(RoundDuration) return } diff --git a/events/event_cache.go b/events/event_cache.go new file mode 100644 index 000000000..3231fe0d4 --- /dev/null +++ b/events/event_cache.go @@ -0,0 +1,41 @@ +package events + +const ( + eventsBufferSize = 1000 +) + +// An EventCache buffers events for a Fireable +// All events are cached. Filtering happens on Flush +type EventCache struct { + evsw Fireable + events []eventInfo +} + +// Create a new EventCache with an EventSwitch as backend +func NewEventCache(evsw Fireable) *EventCache { + return &EventCache{ + evsw: evsw, + events: make([]eventInfo, eventsBufferSize), + } +} + +// a cached event +type eventInfo struct { + event string + msg interface{} +} + +// Cache an event to be fired upon finality. +func (evc *EventCache) FireEvent(event string, msg interface{}) { + // append to list + evc.events = append(evc.events, eventInfo{event, msg}) +} + +// Fire events by running evsw.FireEvent on all cached events. Blocks. +// Clears cached events +func (evc *EventCache) Flush() { + for _, ei := range evc.events { + evc.evsw.FireEvent(ei.event, ei.msg) + } + evc.events = make([]eventInfo, eventsBufferSize) +} diff --git a/events/events.go b/events/events.go index 46f01cb57..13aca2dac 100644 --- a/events/events.go +++ b/events/events.go @@ -8,7 +8,12 @@ import ( // reactors and other modules should export // this interface to become eventable type Eventable interface { - SetEventSwitch(*EventSwitch) + SetFireable(Fireable) +} + +// an event switch or cache implements fireable +type Fireable interface { + FireEvent(event string, msg interface{}) } type EventSwitch struct { diff --git a/mempool/mempool.go b/mempool/mempool.go index 819198778..51cfab139 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache { func (mem *Mempool) AddTx(tx types.Tx) (err error) { mem.mtx.Lock() defer mem.mtx.Unlock() - err = sm.ExecTx(mem.cache, tx, false) + err = sm.ExecTx(mem.cache, tx, false, nil) if err != nil { log.Debug("AddTx() error", "tx", tx, "error", err) return err @@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { // Next, filter all txs that aren't valid given new state. validTxs := []types.Tx{} for _, tx := range txs { - err := sm.ExecTx(mem.cache, tx, false) + err := sm.ExecTx(mem.cache, tx, false, nil) if err == nil { log.Debug("Filter in, valid", "tx", tx) validTxs = append(validTxs, tx) diff --git a/mempool/reactor.go b/mempool/reactor.go index 1b87a26f4..28af17438 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -26,7 +26,7 @@ type MempoolReactor struct { Mempool *Mempool - evsw *events.EventSwitch + evsw events.Fireable } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -116,7 +116,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { } // implements events.Eventable -func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index f7749a800..a4071752a 100644 --- a/node/node.go +++ b/node/node.go @@ -82,7 +82,7 @@ func NewNode() *Node { // add the event switch to all services // they should all satisfy events.Eventable - SetEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) + SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) return &Node{ sw: sw, @@ -116,9 +116,9 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { for _, e := range eventables { - e.SetEventSwitch(evsw) + e.SetFireable(evsw) } } @@ -167,6 +167,10 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +func (n *Node) EventSwitch() *events.EventSwitch { + return n.evsw +} + //------------------------------------------------------------------------------ func RunNode() { diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index a9b0bbd80..8a3ba6444 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -34,7 +34,7 @@ type PEXReactor struct { book *AddrBook - evsw *events.EventSwitch + evsw events.Fireable } func NewPEXReactor(book *AddrBook) *PEXReactor { @@ -211,7 +211,7 @@ func (pexR *PEXReactor) ensurePeers() { } // implements events.Eventable -func (pexR *PEXReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (pexR *PEXReactor) SetFireable(evsw events.Fireable) { pexR.evsw = evsw } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 62630fb5a..395ad82a4 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -4,10 +4,6 @@ import ( "github.com/tendermint/tendermint/rpc" ) -/* -TODO: support Call && GetStorage. -*/ - var Routes = map[string]*rpc.RPCFunc{ "status": rpc.NewRPCFunc(Status, []string{}), "net_info": rpc.NewRPCFunc(NetInfo, []string{}), diff --git a/rpc/core/txs.go b/rpc/core/txs.go index f16a3ae70..628bc6584 100644 --- a/rpc/core/txs.go +++ b/rpc/core/txs.go @@ -43,7 +43,7 @@ func Call(address, data []byte) (*ctypes.ResponseCall, error) { GasLimit: 10000000, } - vmach := vm.NewVM(txCache, params, caller.Address) + vmach := vm.NewVM(txCache, params, caller.Address, nil) gas := uint64(1000000000) ret, err := vmach.Call(caller, callee, callee.Code, data, 0, &gas) if err != nil { @@ -68,7 +68,7 @@ func CallCode(code, data []byte) (*ctypes.ResponseCall, error) { GasLimit: 10000000, } - vmach := vm.NewVM(txCache, params, caller.Address) + vmach := vm.NewVM(txCache, params, caller.Address, nil) gas := uint64(1000000000) ret, err := vmach.Call(caller, callee, code, data, 0, &gas) if err != nil { diff --git a/rpc/handlers.go b/rpc/handlers.go index 51dd7e5e2..2d0bb1eaf 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -5,12 +5,13 @@ import ( "encoding/json" "errors" "fmt" + "github.com/gorilla/websocket" "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/events" - "golang.org/x/net/websocket" "io/ioutil" "net/http" "reflect" + "sync/atomic" "time" ) @@ -26,8 +27,8 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { // websocket endpoint - w := NewWebsocketManager(evsw) - mux.Handle("/events", websocket.Handler(w.eventsHandler)) + wm := NewWebsocketManager(evsw) + mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler)) } //------------------------------------- @@ -198,156 +199,192 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { //----------------------------------------------------------------------------- // rpc.websocket +const ( + WSConnectionReaperSeconds = 5 + MaxFailedSends = 10 + WriteChanBufferSize = 10 +) + // for requests coming in -type WsRequest struct { +type WSRequest struct { Type string // subscribe or unsubscribe Event string } // for responses going out -type WsResponse struct { +type WSResponse struct { Event string Data interface{} Error string } // a single websocket connection -// contains the listeners id -type Connection struct { +// contains listener id, underlying ws connection, +// and the event switch for subscribing to events +type WSConnection struct { id string - wsCon *websocket.Conn - writeChan chan WsResponse - quitChan chan struct{} + wsConn *websocket.Conn + writeChan chan WSResponse failedSends uint + started uint32 + stopped uint32 + + evsw *events.EventSwitch } // new websocket connection wrapper -func NewConnection(con *websocket.Conn) *Connection { - return &Connection{ - id: con.RemoteAddr().String(), - wsCon: con, - writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full +func NewWSConnection(wsConn *websocket.Conn) *WSConnection { + return &WSConnection{ + id: wsConn.RemoteAddr().String(), + wsConn: wsConn, + writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full + } +} + +// start the connection and hand her the event switch +func (con *WSConnection) Start(evsw *events.EventSwitch) { + if atomic.CompareAndSwapUint32(&con.started, 0, 1) { + con.evsw = evsw + + // read subscriptions/unsubscriptions to events + go con.read() + // write responses + con.write() } } // close the connection -func (c *Connection) Close() { - c.wsCon.Close() - close(c.writeChan) - close(c.quitChan) +func (con *WSConnection) Stop() { + if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { + con.wsConn.Close() + close(con.writeChan) + } +} + +// attempt to write response to writeChan and record failures +func (con *WSConnection) safeWrite(resp WSResponse) { + select { + case con.writeChan <- resp: + // yay + con.failedSends = 0 + default: + // channel is full + // if this happens too many times in a row, + // close connection + con.failedSends += 1 + } +} + +// read from the socket and subscribe to or unsubscribe from events +func (con *WSConnection) read() { + reaper := time.Tick(time.Second * WSConnectionReaperSeconds) + for { + select { + case <-reaper: + if con.failedSends > MaxFailedSends { + // sending has failed too many times. + // kill the connection + con.Stop() + return + } + default: + var in []byte + _, in, err := con.wsConn.ReadMessage() + if err != nil { + // an error reading the connection, + // kill the connection + con.Stop() + return + } + var req WSRequest + err = json.Unmarshal(in, &req) + if err != nil { + errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) + con.safeWrite(WSResponse{Error: errStr}) + continue + } + switch req.Type { + case "subscribe": + log.Info("New event subscription", "con id", con.id, "event", req.Event) + con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { + resp := WSResponse{ + Event: req.Event, + Data: msg, + } + con.safeWrite(resp) + }) + case "unsubscribe": + if req.Event != "" { + con.evsw.RemoveListenerForEvent(req.Event, con.id) + } else { + con.evsw.RemoveListener(con.id) + } + default: + con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type}) + } + + } + } +} + +// receives on a write channel and writes out on the socket +func (con *WSConnection) write() { + n, err := new(int64), new(error) + for { + msg, more := <-con.writeChan + if !more { + // the channel was closed, so ensure + // connection is stopped and return + con.Stop() + return + } + buf := new(bytes.Buffer) + binary.WriteJSON(msg, buf, n, err) + if *err != nil { + log.Error("Failed to write JSON WSResponse", "error", err) + } else { + if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { + log.Error("Failed to write response on websocket", "error", err) + con.Stop() + return + } + } + } } // main manager for all websocket connections // holds the event switch type WebsocketManager struct { - ew *events.EventSwitch - cons map[string]*Connection + websocket.Upgrader + evsw *events.EventSwitch } -func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager { +func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager { return &WebsocketManager{ - ew: ew, - cons: make(map[string]*Connection), + evsw: evsw, + Upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + // TODO + return true + }, + }, } } -func (w *WebsocketManager) eventsHandler(con *websocket.Conn) { +func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + log.Error("Failed to upgrade to websocket connection", "error", err) + return + } + // register connection - c := NewConnection(con) - w.cons[con.RemoteAddr().String()] = c - - // read subscriptions/unsubscriptions to events - go w.read(c) - // write responses - go w.write(c) -} - -const ( - WsConnectionReaperSeconds = 5 - MaxFailedSendsSeconds = 10 - WriteChanBuffer = 10 -) - -// read from the socket and subscribe to or unsubscribe from events -func (w *WebsocketManager) read(con *Connection) { - reaper := time.Tick(time.Second * WsConnectionReaperSeconds) - for { - select { - case <-reaper: - if con.failedSends > MaxFailedSendsSeconds { - // sending has failed too many times. - // kill the connection - con.quitChan <- struct{}{} - } - default: - var in []byte - if err := websocket.Message.Receive(con.wsCon, &in); err != nil { - // an error reading the connection, - // so kill the connection - con.quitChan <- struct{}{} - } - var req WsRequest - err := json.Unmarshal(in, &req) - if err != nil { - errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - con.writeChan <- WsResponse{Error: errStr} - } - switch req.Type { - case "subscribe": - w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { - resp := WsResponse{ - Event: req.Event, - Data: msg, - } - select { - case con.writeChan <- resp: - // yay - con.failedSends = 0 - default: - // channel is full - // if this happens too many times, - // close connection - con.failedSends += 1 - } - }) - case "unsubscribe": - if req.Event != "" { - w.ew.RemoveListenerForEvent(req.Event, con.id) - } else { - w.ew.RemoveListener(con.id) - } - default: - con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type} - } - - } - } -} - -// receives on a write channel and writes out to the socket -func (w *WebsocketManager) write(con *Connection) { - n, err := new(int64), new(error) - for { - select { - case msg := <-con.writeChan: - buf := new(bytes.Buffer) - binary.WriteJSON(msg, buf, n, err) - if *err != nil { - log.Error("Failed to write JSON WsResponse", "error", err) - } else { - websocket.Message.Send(con.wsCon, buf.Bytes()) - } - case <-con.quitChan: - w.closeConn(con) - return - } - } -} - -// close a connection and delete from manager -func (w *WebsocketManager) closeConn(con *Connection) { - con.Close() - delete(w.cons, con.id) + con := NewWSConnection(wsConn) + log.Info("New websocket connection", "origin", con.id) + con.Start(wm.evsw) } // rpc.websocket diff --git a/rpc/http_server.go b/rpc/http_server.go index 14bcc9eed..5eaff856e 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -2,12 +2,15 @@ package rpc import ( + "bufio" "bytes" "fmt" + "net" "net/http" "runtime/debug" "time" + "github.com/tendermint/tendermint/alert" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" ) @@ -96,3 +99,17 @@ func (w *ResponseWriterWrapper) WriteHeader(status int) { w.Status = status w.ResponseWriter.WriteHeader(status) } + +// implements http.Hijacker +func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return w.ResponseWriter.(http.Hijacker).Hijack() +} + +// Stick it as a deferred statement in gouroutines to prevent the program from crashing. +func Recover(daemonName string) { + if e := recover(); e != nil { + stack := string(debug.Stack()) + errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack) + alert.Alert(errorString) + } +} diff --git a/rpc/test/client_ws_test.go b/rpc/test/client_ws_test.go new file mode 100644 index 000000000..b342d390e --- /dev/null +++ b/rpc/test/client_ws_test.go @@ -0,0 +1,302 @@ +package rpc + +import ( + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/account" + "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/rpc" + "github.com/tendermint/tendermint/types" + "net/http" + "testing" + "time" +) + +//-------------------------------------------------------------------------------- +// Utilities for testing the websocket service + +// create a new connection +func newWSCon(t *testing.T) *websocket.Conn { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, r, err := dialer.Dial(websocketAddr, rHeader) + fmt.Println("response", r) + if err != nil { + t.Fatal(err) + } + return con +} + +// subscribe to an event +func subscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpc.WSRequest{ + Type: "subscribe", + Event: eventid, + }) + if err != nil { + t.Fatal(err) + } +} + +// unsubscribe from an event +func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpc.WSRequest{ + Type: "unsubscribe", + Event: eventid, + }) + if err != nil { + t.Fatal(err) + } +} + +// wait for an event; do things that might trigger events, and check them when they are received +func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { + // go routine to wait for webscoket msg + gch := make(chan []byte) // good channel + ech := make(chan error) // error channel + go func() { + for { + _, p, err := con.ReadMessage() + if err != nil { + ech <- err + break + } else { + // if the event id isnt what we're waiting on + // ignore it + var response struct { + Event string + } + if err := json.Unmarshal(p, &response); err != nil { + ech <- err + break + } + if response.Event == eventid { + gch <- p + break + } + } + } + }() + + // do stuff (transactions) + f() + + // wait for an event or 10 seconds + ticker := time.Tick(10 * time.Second) + select { + case <-ticker: + if dieOnTimeout { + con.Close() + t.Fatalf("%s event was not received in time", eventid) + } + // else that's great, we didn't hear the event + // and we shouldn't have + case p := <-gch: + if dieOnTimeout { + // message was received and expected + // run the check + err := check(eventid, p) + if err != nil { + t.Fatal(err) + } + } else { + con.Close() + t.Fatalf("%s event was not expected", eventid) + } + case err := <-ech: + t.Fatal(err) + } +} + +func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { + // unmarshall and assert somethings + var response struct { + Event string + Data *types.Block + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return nil, err + } + if response.Error != "" { + return nil, fmt.Errorf(response.Error) + } + block := response.Data + return block, nil +} + +//-------------------------------------------------------------------------------- +// Test the websocket service + +// make a simple connection to the server +func TestWSConnect(t *testing.T) { + con := newWSCon(t) + con.Close() +} + +// receive a new block message +func _TestWSNewBlock(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + fmt.Println("Check:", string(b)) + return nil + }) +} + +// receive a few new block messages in a row, with increasing height +func TestWSBlockchainGrowth(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + // listen for NewBlock, ensure height increases by 1 + unmarshalValidateBlockchain(t, con, eid) +} + +// send a transaction and validate the events from listening for both sender and receiver +func TestWSSend(t *testing.T) { + toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} + amt := uint64(100) + + con := newWSCon(t) + eidInput := types.EventStringAccInput(userByteAddr) + eidOutput := types.EventStringAccOutput(toAddr) + subscribe(t, con, eidInput) + subscribe(t, con, eidOutput) + defer func() { + unsubscribe(t, con, eidInput) + unsubscribe(t, con, eidOutput) + con.Close() + }() + waitForEvent(t, con, eidInput, true, func() { + broadcastTx(t, "JSONRPC", userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + }, unmarshalValidateSend(amt, toAddr)) + waitForEvent(t, con, eidOutput, true, func() {}, unmarshalValidateSend(amt, toAddr)) +} + +// ensure events are only fired once for a given transaction +func TestWSDoubleFire(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringAccInput(userByteAddr) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + amt := uint64(100) + toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} + // broadcast the transaction, wait to hear about it + waitForEvent(t, con, eid, true, func() { + broadcastTx(t, "JSONRPC", userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + }, func(eid string, b []byte) error { + return nil + }) + // but make sure we don't hear about it twice + waitForEvent(t, con, eid, false, func() { + }, func(eid string, b []byte) error { + return nil + }) +} + +// create a contract, wait for the event, and send it a msg, validate the return +func TestWSCallWait(t *testing.T) { + con := newWSCon(t) + eid1 := types.EventStringAccInput(userByteAddr) + subscribe(t, con, eid1) + defer func() { + unsubscribe(t, con, eid1) + con.Close() + }() + amt := uint64(10000) + code, returnCode, returnVal := simpleContract() + var contractAddr []byte + // wait for the contract to be created + waitForEvent(t, con, eid1, true, func() { + _, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) + contractAddr = receipt.ContractAddr + + }, unmarshalValidateCall(amt, returnCode)) + + // susbscribe to the new contract + amt = uint64(10001) + eid2 := types.EventStringAccReceive(contractAddr) + subscribe(t, con, eid2) + defer func() { + unsubscribe(t, con, eid2) + }() + // get the return value from a call + data := []byte{0x1} // just needs to be non empty for this to be a CallTx + waitForEvent(t, con, eid2, true, func() { + broadcastTx(t, "JSONRPC", userByteAddr, contractAddr, data, userBytePriv, amt, 1000, 1000) + }, unmarshalValidateCall(amt, returnVal)) +} + +// create a contract and send it a msg without waiting. wait for contract event +// and validate return +func TestWSCallNoWait(t *testing.T) { + con := newWSCon(t) + amt := uint64(10000) + code, _, returnVal := simpleContract() + + _, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) + contractAddr := receipt.ContractAddr + + // susbscribe to the new contract + amt = uint64(10001) + eid := types.EventStringAccReceive(contractAddr) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + // get the return value from a call + data := []byte{0x1} // just needs to be non empty for this to be a CallTx + waitForEvent(t, con, eid, true, func() { + broadcastTx(t, "JSONRPC", userByteAddr, contractAddr, data, userBytePriv, amt, 1000, 1000) + }, unmarshalValidateCall(amt, returnVal)) +} + +// create two contracts, one of which calls the other +func TestWSCallCall(t *testing.T) { + con := newWSCon(t) + amt := uint64(10000) + code, _, returnVal := simpleContract() + txid := new([]byte) + + // deploy the two contracts + _, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) + contractAddr1 := receipt.ContractAddr + code, _, _ = simpleCallContract(contractAddr1) + _, receipt = broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) + contractAddr2 := receipt.ContractAddr + + // susbscribe to the new contracts + amt = uint64(10001) + eid1 := types.EventStringAccReceive(contractAddr1) + eid2 := types.EventStringAccReceive(contractAddr2) + subscribe(t, con, eid1) + subscribe(t, con, eid2) + defer func() { + unsubscribe(t, con, eid1) + unsubscribe(t, con, eid2) + con.Close() + }() + // call contract2, which should call contract1, and wait for ev1 + data := []byte{0x1} // just needs to be non empty for this to be a CallTx + waitForEvent(t, con, eid1, true, func() { + tx, _ := broadcastTx(t, "JSONRPC", userByteAddr, contractAddr2, data, userBytePriv, amt, 1000, 1000) + *txid = account.HashSignBytes(tx) + }, unmarshalValidateCallCall(userByteAddr, returnVal, txid)) +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers_test.go similarity index 68% rename from rpc/test/helpers.go rename to rpc/test/helpers_test.go index 2921303d9..7755277cc 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers_test.go @@ -6,6 +6,7 @@ import ( "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/logger" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/p2p" @@ -14,20 +15,23 @@ import ( "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "testing" + "time" ) // global variables for use across all tests var ( - rpcAddr = "127.0.0.1:8089" - requestAddr = "http://" + rpcAddr + "/" + rpcAddr = "127.0.0.1:8089" + requestAddr = "http://" + rpcAddr + "/" + websocketAddr = "ws://" + rpcAddr + "/events" node *nm.Node mempoolCount = 0 - userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" - userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" - userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" + userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userByteAddr, userBytePriv = initUserBytes() clients = map[string]cclient.Client{ "JSONRPC": cclient.NewClient(requestAddr, "JSONRPC"), @@ -35,6 +39,16 @@ var ( } ) +// returns byte versions of address and private key +// type [64]byte needed by account.GenPrivAccountFromKey +func initUserBytes() ([]byte, [64]byte) { + byteAddr, _ := hex.DecodeString(userAddr) + var byteKey [64]byte + userPrivByteSlice, _ := hex.DecodeString(userPriv) + copy(byteKey[:], userPrivByteSlice) + return byteAddr, byteKey +} + func decodeHex(hexStr string) []byte { bytes, err := hex.DecodeString(hexStr) if err != nil { @@ -83,6 +97,9 @@ func init() { priv.SetFile(rootDir + "/priv_validator.json") priv.Save() + consensus.RoundDuration0 = 3 * time.Second + consensus.RoundDurationDelta = 1 * time.Second + // start a node ready := make(chan struct{}) go newNode(ready) @@ -263,3 +280,47 @@ func checkTx(t *testing.T, fromAddr []byte, priv *account.PrivAccount, tx *types t.Fatal(types.ErrTxInvalidSignature) } } + +// simple contract returns 5 + 6 = 0xb +func simpleContract() ([]byte, []byte, []byte) { + // this is the code we want to run when the contract is called + contractCode := []byte{0x60, 0x5, 0x60, 0x6, 0x1, 0x60, 0x0, 0x52, 0x60, 0x20, 0x60, 0x0, 0xf3} + // the is the code we need to return the contractCode when the contract is initialized + lenCode := len(contractCode) + // push code to the stack + //code := append([]byte{byte(0x60 + lenCode - 1)}, LeftPadWord256(contractCode).Bytes()...) + code := append([]byte{0x7f}, RightPadWord256(contractCode).Bytes()...) + // store it in memory + code = append(code, []byte{0x60, 0x0, 0x52}...) + // return whats in memory + //code = append(code, []byte{0x60, byte(32 - lenCode), 0x60, byte(lenCode), 0xf3}...) + code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) + // return init code, contract code, expected return + return code, contractCode, LeftPadBytes([]byte{0xb}, 32) +} + +// simple call contract calls another contract +func simpleCallContract(addr []byte) ([]byte, []byte, []byte) { + gas1, gas2 := byte(0x1), byte(0x1) + value := byte(0x1) + inOff, inSize := byte(0x0), byte(0x0) // no call data + retOff, retSize := byte(0x0), byte(0x20) + // this is the code we want to run (call a contract and return) + contractCode := []byte{0x60, retSize, 0x60, retOff, 0x60, inSize, 0x60, inOff, 0x60, value, 0x73} + contractCode = append(contractCode, addr...) + contractCode = append(contractCode, []byte{0x61, gas1, gas2, 0xf1, 0x60, 0x20, 0x60, 0x0, 0xf3}...) + + // the is the code we need to return; the contractCode when the contract is initialized + // it should copy the code from the input into memory + lenCode := len(contractCode) + memOff := byte(0x0) + inOff = byte(0xc) // length of code before codeContract + length := byte(lenCode) + + code := []byte{0x60, length, 0x60, inOff, 0x60, memOff, 0x37} + // return whats in memory + code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) + code = append(code, contractCode...) + // return init code, contract code, expected return + return code, contractCode, LeftPadBytes([]byte{0xb}, 32) +} diff --git a/rpc/test/tests.go b/rpc/test/tests_test.go similarity index 65% rename from rpc/test/tests.go rename to rpc/test/tests_test.go index 8fbd81e30..f95a42d4b 100644 --- a/rpc/test/tests.go +++ b/rpc/test/tests_test.go @@ -2,14 +2,12 @@ package rpc import ( "bytes" - "encoding/hex" "fmt" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "testing" - "time" ) func testStatus(t *testing.T, typ string) { @@ -38,45 +36,34 @@ func testGenPriv(t *testing.T, typ string) { } func testGetAccount(t *testing.T, typ string) { - byteAddr, _ := hex.DecodeString(userAddr) - acc := getAccount(t, typ, byteAddr) + acc := getAccount(t, typ, userByteAddr) if acc == nil { t.Fatalf("Account was nil") } - if bytes.Compare(acc.Address, byteAddr) != 0 { - t.Fatalf("Failed to get correct account. Got %x, expected %x", acc.Address, byteAddr) + if bytes.Compare(acc.Address, userByteAddr) != 0 { + t.Fatalf("Failed to get correct account. Got %x, expected %x", acc.Address, userByteAddr) } } func testSignedTx(t *testing.T, typ string) { - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - amt := uint64(100) toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} - tx, priv := signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) - checkTx(t, byteAddr, priv, tx.(*types.SendTx)) + tx, priv := signTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + checkTx(t, userByteAddr, priv, tx.(*types.SendTx)) toAddr = []byte{20, 143, 24, 63, 16, 17, 83, 29, 90, 91, 52, 2, 0, 41, 190, 121, 122, 34, 86, 54} - tx, priv = signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) - checkTx(t, byteAddr, priv, tx.(*types.SendTx)) + tx, priv = signTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + checkTx(t, userByteAddr, priv, tx.(*types.SendTx)) toAddr = []byte{0, 0, 4, 0, 0, 4, 0, 0, 4, 91, 52, 2, 0, 41, 190, 121, 122, 34, 86, 54} - tx, priv = signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) - checkTx(t, byteAddr, priv, tx.(*types.SendTx)) + tx, priv = signTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + checkTx(t, userByteAddr, priv, tx.(*types.SendTx)) } func testBroadcastTx(t *testing.T, typ string) { - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - amt := uint64(100) toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} - tx, receipt := broadcastTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) + tx, receipt := broadcastTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) if receipt.CreatesContract > 0 { t.Fatal("This tx does not create a contract") } @@ -99,18 +86,20 @@ func testBroadcastTx(t *testing.T, typ string) { } func testGetStorage(t *testing.T, typ string) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() priv := state.LoadPrivValidator(".tendermint/priv_validator.json") _ = priv //core.SetPrivValidator(priv) - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - amt := uint64(1100) code := []byte{0x60, 0x5, 0x60, 0x1, 0x55} - _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt := broadcastTx(t, typ, userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) if receipt.CreatesContract == 0 { t.Fatal("This tx creates a contract") } @@ -123,7 +112,10 @@ func testGetStorage(t *testing.T, typ string) { } // allow it to get mined - time.Sleep(time.Second * 20) + waitForEvent(t, con, eid, true, func() { + }, func(eid string, b []byte) error { + return nil + }) mempoolCount = 0 v := getStorage(t, typ, contractAddr, []byte{0x1}) @@ -151,33 +143,20 @@ func testCallCode(t *testing.T, typ string) { } func testCall(t *testing.T, typ string) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + client := clients[typ] - priv := state.LoadPrivValidator(".tendermint/priv_validator.json") - _ = priv - //core.SetPrivValidator(priv) - - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - // create the contract amt := uint64(6969) - // this is the code we want to run when the contract is called - contractCode := []byte{0x60, 0x5, 0x60, 0x6, 0x1, 0x60, 0x0, 0x52, 0x60, 0x20, 0x60, 0x0, 0xf3} - // the is the code we need to return the contractCode when the contract is initialized - lenCode := len(contractCode) - // push code to the stack - //code := append([]byte{byte(0x60 + lenCode - 1)}, LeftPadWord256(contractCode).Bytes()...) - code := append([]byte{0x7f}, RightPadWord256(contractCode).Bytes()...) - // store it in memory - code = append(code, []byte{0x60, 0x0, 0x52}...) - // return whats in memory - //code = append(code, []byte{0x60, byte(32 - lenCode), 0x60, byte(lenCode), 0xf3}...) - code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) - - _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) + code, _, _ := simpleContract() + _, receipt := broadcastTx(t, typ, userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) if receipt.CreatesContract == 0 { t.Fatal("This tx creates a contract") } @@ -190,7 +169,10 @@ func testCall(t *testing.T, typ string) { } // allow it to get mined - time.Sleep(time.Second * 20) + waitForEvent(t, con, eid, true, func() { + }, func(eid string, b []byte) error { + return nil + }) mempoolCount = 0 // run a call through the contract diff --git a/rpc/test/ws_helpers_test.go b/rpc/test/ws_helpers_test.go new file mode 100644 index 000000000..880220a32 --- /dev/null +++ b/rpc/test/ws_helpers_test.go @@ -0,0 +1,136 @@ +package rpc + +import ( + "bytes" + "fmt" + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/types" + "testing" +) + +func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { + var initBlockN uint + for i := 0; i < 2; i++ { + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + block, err := unmarshalResponseNewBlock(b) + if err != nil { + return err + } + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+uint(i) { + return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) + } + } + + return nil + }) + } +} + +func unmarshalValidateSend(amt uint64, toAddr []byte) func(string, []byte) error { + return func(eid string, b []byte) error { + // unmarshal and assert correctness + var response struct { + Event string + Data types.SendTx + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if eid != response.Event { + return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid) + } + tx := response.Data + if bytes.Compare(tx.Inputs[0].Address, userByteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, userByteAddr) + } + if tx.Inputs[0].Amount != amt { + return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt) + } + if bytes.Compare(tx.Outputs[0].Address, toAddr) != 0 { + return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, userByteAddr) + } + return nil + } +} + +func unmarshalValidateCall(amt uint64, returnCode []byte) func(string, []byte) error { + return func(eid string, b []byte) error { + // unmarshall and assert somethings + var response struct { + Event string + Data struct { + Tx types.CallTx + Return []byte + Exception string + } + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if response.Data.Exception != "" { + return fmt.Errorf(response.Data.Exception) + } + tx := response.Data.Tx + if bytes.Compare(tx.Input.Address, userByteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, userByteAddr) + } + if tx.Input.Amount != amt { + return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt) + } + ret := response.Data.Return + if bytes.Compare(ret, returnCode) != 0 { + return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode) + } + return nil + } +} + +func unmarshalValidateCallCall(origin, returnCode []byte, txid *[]byte) func(string, []byte) error { + return func(eid string, b []byte) error { + // unmarshall and assert somethings + var response struct { + Event string + Data types.EventMsgCall + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if response.Data.Exception != "" { + return fmt.Errorf(response.Data.Exception) + } + if bytes.Compare(response.Data.Origin, origin) != 0 { + return fmt.Errorf("Origin does not match up! Got %x, expected %x", response.Data.Origin, origin) + } + ret := response.Data.Return + if bytes.Compare(ret, returnCode) != 0 { + return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode) + } + if bytes.Compare(response.Data.TxId, *txid) != 0 { + return fmt.Errorf("TxIds do not match up! Got %x, expected %x", response.Data.TxId, *txid) + } + // calldata := response.Data.CallData + return nil + } +} diff --git a/state/block_cache.go b/state/block_cache.go index c900accc7..277754541 100644 --- a/state/block_cache.go +++ b/state/block_cache.go @@ -91,13 +91,13 @@ func (cache *BlockCache) GetStorage(addr Word256, key Word256) (value Word256) { } // Get or load storage - acc, storage, removed, dirty := cache.accounts[string(addr.Prefix(20))].unpack() + acc, storage, removed, dirty := cache.accounts[string(addr.Postfix(20))].unpack() if removed { panic("GetStorage() on removed account") } if acc != nil && storage == nil { storage = makeStorage(cache.db, acc.StorageRoot) - cache.accounts[string(addr.Prefix(20))] = accountInfo{acc, storage, false, dirty} + cache.accounts[string(addr.Postfix(20))] = accountInfo{acc, storage, false, dirty} } else if acc == nil { return Zero256 } @@ -114,7 +114,7 @@ func (cache *BlockCache) GetStorage(addr Word256, key Word256) (value Word256) { // NOTE: Set value to zero to removed from the trie. func (cache *BlockCache) SetStorage(addr Word256, key Word256, value Word256) { - _, _, removed, _ := cache.accounts[string(addr.Prefix(20))].unpack() + _, _, removed, _ := cache.accounts[string(addr.Postfix(20))].unpack() if removed { panic("SetStorage() on a removed account") } @@ -146,7 +146,7 @@ func (cache *BlockCache) Sync() { for _, storageKey := range storageKeys { addr, key := Tuple256Split(storageKey) if addr != curAddr || curAcc == nil { - acc, storage, removed, _ := cache.accounts[string(addr.Prefix(20))].unpack() + acc, storage, removed, _ := cache.accounts[string(addr.Postfix(20))].unpack() if storage == nil { storage = makeStorage(cache.db, acc.StorageRoot) } @@ -166,7 +166,7 @@ func (cache *BlockCache) Sync() { curStorage.Remove(key.Bytes()) } else { curStorage.Set(key.Bytes(), value.Bytes()) - cache.accounts[string(addr.Prefix(20))] = accountInfo{curAcc, curStorage, false, true} + cache.accounts[string(addr.Postfix(20))] = accountInfo{curAcc, curStorage, false, true} } } @@ -187,7 +187,7 @@ func (cache *BlockCache) Sync() { } } else { if acc == nil { - panic(Fmt("Account should not be nil for addr: %X", acc.Address)) + continue } if storage != nil { newStorageRoot := storage.Save() diff --git a/state/execution.go b/state/execution.go index dcfd55554..5152b16f8 100644 --- a/state/execution.go +++ b/state/execution.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/vm" ) @@ -112,7 +113,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // Commit each tx for _, tx := range block.Data.Txs { - err := ExecTx(blockCache, tx, true) + err := ExecTx(blockCache, tx, true, s.evc) if err != nil { return InvalidTxError{tx, err} } @@ -292,11 +293,11 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu // If the tx is invalid, an error will be returned. // Unlike ExecBlock(), state will not be altered. -func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { +func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) error { // TODO: do something with fees fees := uint64(0) - _s := blockCache.State() // hack to access validators. + _s := blockCache.State() // hack to access validators and event switch. // Exec tx switch tx := tx_.(type) { @@ -326,6 +327,17 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { for _, acc := range accounts { blockCache.UpdateAccount(acc) } + + // if the evc is nil, nothing will happen + if evc != nil { + for _, i := range tx.Inputs { + evc.FireEvent(types.EventStringAccInput(i.Address), tx) + } + + for _, o := range tx.Outputs { + evc.FireEvent(types.EventStringAccOutput(o.Address), tx) + } + } return nil case *types.CallTx: @@ -412,10 +424,14 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe. txCache.UpdateAccount(callee) // because we adjusted by input above. - vmach := vm.NewVM(txCache, params, caller.Address) + vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx)) + vmach.SetFireable(_s.evc) // NOTE: Call() transfers the value from caller to callee iff call succeeds. + ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) + exception := "" if err != nil { + exception = err.Error() // Failure. Charge the gas fee. The 'value' was otherwise not transferred. log.Debug(Fmt("Error on execution: %v", err)) inAcc.Balance -= tx.Fee @@ -432,6 +448,13 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { } // Create a receipt from the ret and whether errored. log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) + + // Fire Events for sender and receiver + // a separate event will be fired from vm for each additional call + if evc != nil { + evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception}) + evc.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception}) + } } else { // The mempool does not call txs until // the proposer determines the order of txs. @@ -499,6 +522,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { if !added { panic("Failed to add validator") } + if evc != nil { + evc.FireEvent(types.EventStringBond(), tx) + } return nil case *types.UnbondTx: @@ -521,6 +547,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.unbondValidator(val) + if evc != nil { + evc.FireEvent(types.EventStringUnbond(), tx) + } return nil case *types.RebondTx: @@ -543,6 +572,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.rebondValidator(val) + if evc != nil { + evc.FireEvent(types.EventStringRebond(), tx) + } return nil case *types.DupeoutTx: @@ -586,6 +618,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! (Bad validator!) _s.destroyValidator(accused) + if evc != nil { + evc.FireEvent(types.EventStringDupeout(), tx) + } return nil default: diff --git a/state/state.go b/state/state.go index 700202b87..182d2692b 100644 --- a/state/state.go +++ b/state/state.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" dbm "github.com/tendermint/tendermint/db" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/types" ) @@ -34,6 +35,8 @@ type State struct { UnbondingValidators *ValidatorSet accounts merkle.Tree // Shouldn't be accessed directly. validatorInfos merkle.Tree // Shouldn't be accessed directly. + + evc events.Fireable // typically an events.EventCache } func LoadState(db dbm.DB) *State { @@ -98,6 +101,7 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), + evc: nil, } } @@ -115,6 +119,7 @@ func (s *State) Hash() []byte { // Mutates the block in place and updates it with new state hash. func (s *State) SetBlockStateHash(block *types.Block) error { sCopy := s.Copy() + // sCopy has no event cache in it, so this won't fire events err := execBlock(sCopy, block, types.PartSetHeader{}) if err != nil { return err @@ -264,6 +269,11 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) { // State.storage //------------------------------------- +// Implements events.Eventable. Typically uses events.EventCache +func (s *State) SetFireable(evc events.Fireable) { + s.evc = evc +} + //----------------------------------------------------------------------------- type InvalidTxError struct { diff --git a/state/state_test.go b/state/state_test.go index 5f2990652..c2d766e31 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -12,7 +12,7 @@ import ( func execTxWithState(state *State, tx types.Tx, runCall bool) error { cache := NewBlockCache(state) - err := ExecTx(cache, tx, runCall) + err := ExecTx(cache, tx, runCall, nil) if err != nil { return err } else { diff --git a/state/tx_cache.go b/state/tx_cache.go index fa6065cb4..d5a23d6c2 100644 --- a/state/tx_cache.go +++ b/state/tx_cache.go @@ -30,9 +30,13 @@ func (cache *TxCache) GetAccount(addr Word256) *vm.Account { acc, removed := vmUnpack(cache.accounts[addr]) if removed { return nil - } else { - return acc + } else if acc == nil { + acc2 := cache.backend.GetAccount(addr.Postfix(20)) + if acc2 != nil { + return toVMAccount(acc2) + } } + return acc } func (cache *TxCache) UpdateAccount(acc *vm.Account) { @@ -64,7 +68,7 @@ func (cache *TxCache) CreateAccount(creator *vm.Account) *vm.Account { nonce := creator.Nonce creator.Nonce += 1 - addr := RightPadWord256(NewContractAddress(creator.Address.Prefix(20), nonce)) + addr := LeftPadWord256(NewContractAddress(creator.Address.Postfix(20), nonce)) // Create account from address. account, removed := vmUnpack(cache.accounts[addr]) @@ -124,7 +128,7 @@ func (cache *TxCache) Sync() { for addr, accInfo := range cache.accounts { acc, removed := vmUnpack(accInfo) if removed { - cache.backend.RemoveAccount(addr.Prefix(20)) + cache.backend.RemoveAccount(addr.Postfix(20)) } else { cache.backend.UpdateAccount(toStateAccount(acc)) } @@ -150,7 +154,7 @@ func NewContractAddress(caller []byte, nonce uint64) []byte { // Converts backend.Account to vm.Account struct. func toVMAccount(acc *ac.Account) *vm.Account { return &vm.Account{ - Address: RightPadWord256(acc.Address), + Address: LeftPadWord256(acc.Address), Balance: acc.Balance, Code: acc.Code, // This is crazy. Nonce: uint64(acc.Sequence), @@ -172,7 +176,7 @@ func toStateAccount(acc *vm.Account) *ac.Account { storageRoot = acc.StorageRoot.Bytes() } return &ac.Account{ - Address: acc.Address.Prefix(20), + Address: acc.Address.Postfix(20), PubKey: pubKey, Balance: acc.Balance, Code: acc.Code, diff --git a/types/events.go b/types/events.go new file mode 100644 index 000000000..0345efcb1 --- /dev/null +++ b/types/events.go @@ -0,0 +1,84 @@ +package types + +import ( + "fmt" +) + +// Functions to generate eventId strings + +func EventStringAccInput(addr []byte) string { + return fmt.Sprintf("Acc/%x/Input", addr) +} + +func EventStringAccOutput(addr []byte) string { + return fmt.Sprintf("Acc/%x/Output", addr) +} + +func EventStringAccReceive(addr []byte) string { + return fmt.Sprintf("Acc/%x/Receive", addr) +} + +func EventStringBond() string { + return "Bond" +} + +func EventStringUnbond() string { + return "Unbond" +} + +func EventStringRebond() string { + return "Rebond" +} + +func EventStringDupeout() string { + return "Dupeout" +} + +func EventStringNewBlock() string { + return "NewBlock" +} + +func EventStringFork() string { + return "Fork" +} + +// Most event messages are basic types (a block, a transaction) +// but some (an input to a call tx or a receive) are more exotic: + +type EventMsgCallTx struct { + Tx Tx + Return []byte + Exception string +} + +type CallData struct { + Caller []byte + Callee []byte + Data []byte + Value uint64 + Gas uint64 +} + +type EventMsgCall struct { + CallData *CallData + Origin []byte + TxId []byte + Return []byte + Exception string +} + +/* +Acc/XYZ/Input -> full tx or {full tx, return value, exception} +Acc/XYZ/Output -> full tx +Acc/XYZ/Receive -> full tx, return value, exception, (optionally?) calldata +Bond -> full tx +Unbond -> full tx +Rebond -> full tx +Dupeout -> full tx +NewBlock -> full block +Fork -> block A, block B + +Log -> Fuck this +NewPeer -> peer +Alert -> alert msg +*/ diff --git a/vm/test/vm_test.go b/vm/test/vm_test.go index ee3fe7e57..740d0063a 100644 --- a/vm/test/vm_test.go +++ b/vm/test/vm_test.go @@ -36,7 +36,7 @@ func makeBytes(n int) []byte { } func TestVM(t *testing.T) { - ourVm := NewVM(newAppState(), newParams(), Zero256) + ourVm := NewVM(newAppState(), newParams(), Zero256, nil) // Create accounts account1 := &Account{ @@ -72,7 +72,7 @@ func TestSubcurrency(t *testing.T) { st.accounts[account1.Address.String()] = account1 st.accounts[account2.Address.String()] = account2 - ourVm := NewVM(st, newParams(), Zero256) + ourVm := NewVM(st, newParams(), Zero256, nil) var gas uint64 = 1000 code_parts := []string{"620f42403355", diff --git a/vm/vm.go b/vm/vm.go index cbf68729d..ba16d846a 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -6,6 +6,8 @@ import ( "math/big" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/vm/sha3" ) @@ -43,19 +45,28 @@ type VM struct { appState AppState params Params origin Word256 + txid []byte callDepth int + + evc events.Fireable } -func NewVM(appState AppState, params Params, origin Word256) *VM { +func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM { return &VM{ appState: appState, params: params, origin: origin, callDepth: 0, + txid: txid, } } +// satisfies events.Eventable +func (vm *VM) SetFireable(evc events.Fireable) { + vm.evc = evc +} + // CONTRACT appState is aware of caller and callee, so we can just mutate them. // value: To be transferred from caller to callee. Refunded upon error. // gas: Available gas. No refunds for gas. @@ -72,12 +83,25 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga vm.callDepth += 1 output, err = vm.call(caller, callee, code, input, value, gas) vm.callDepth -= 1 + exception := "" if err != nil { + exception = err.Error() err := transfer(callee, caller, value) if err != nil { panic("Could not return value to caller") } } + // if callDepth is 0 the event is fired from ExecTx (along with the Input event) + // otherwise, we fire from here. + if vm.callDepth != 0 && vm.evc != nil { + vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Postfix(20)), types.EventMsgCall{ + &types.CallData{caller.Address.Postfix(20), callee.Address.Postfix(20), input, value, *gas}, + vm.origin.Postfix(20), + vm.txid, + output, + exception, + }) + } return } @@ -283,7 +307,6 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga x, y := stack.Pop64(), stack.Pop64() stack.Push64(x & y) dbg.Printf(" %v & %v = %v\n", x, y, x&y) - case OR: // 0x17 x, y := stack.Pop64(), stack.Pop64() stack.Push64(x | y) @@ -330,7 +353,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) // TODO ensure that 20byte lengths are supported. + acc := vm.appState.GetAccount(flipWord(addr)) // TODO ensure that 20byte lengths are supported. if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -357,7 +380,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga return nil, firstErr(err, ErrInputOutOfBounds) } stack.Push(RightPadWord256(data)) - dbg.Printf(" => 0x%X\n", data) + dbg.Printf(" => 0x%X\n", RightPadWord256(data)) case CALLDATASIZE: // 0x36 stack.Push64(uint64(len(input))) @@ -407,7 +430,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) + acc := vm.appState.GetAccount(flipWord(addr)) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -421,7 +444,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) + acc := vm.appState.GetAccount(flipWord(addr)) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -628,7 +651,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) + acc := vm.appState.GetAccount(flipWord(addr)) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -671,7 +694,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga return nil, firstErr(err, ErrInsufficientGas) } // TODO if the receiver is , then make it the fee. - receiver := vm.appState.GetAccount(addr) + receiver := vm.appState.GetAccount(flipWord(addr)) if receiver == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -697,10 +720,12 @@ func subslice(data []byte, offset, length uint64, flip_ bool) (ret []byte, ok bo if size < offset { return nil, false } else if size < offset+length { - ret, ok = data[offset:], false + ret, ok = data[offset:], true + ret = RightPadBytes(ret, 32) } else { ret, ok = data[offset:offset+length], true } + if flip_ { ret = flip(ret) }