From 12814db90dc082f3f8ae86ac341b6e46912bfc4e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 7 Apr 2015 02:28:21 -0500 Subject: [PATCH 01/12] rpc: use gorilla websockets Conflicts: rpc/handlers.go rpc/http_server.go --- rpc/handlers.go | 43 ++++++++++++++++++++++++++++++------- rpc/http_server.go | 19 +++++++++++++++- rpc/test/client_rpc_test.go | 39 +++++++++++++++++++++++++++++++++ rpc/test/helpers.go | 5 +++-- 4 files changed, 95 insertions(+), 11 deletions(-) diff --git a/rpc/handlers.go b/rpc/handlers.go index 933329162..4e55e0267 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -4,9 +4,9 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/gorilla/websocket" "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/events" - "golang.org/x/net/websocket" "io/ioutil" "net/http" "reflect" @@ -26,7 +26,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { // websocket endpoint w := NewWebsocketManager(evsw) - mux.Handle("/events", websocket.Handler(w.eventsHandler)) + http.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) } //------------------------------------- @@ -233,6 +233,7 @@ func (c *Connection) Close() { // main manager for all websocket connections // holds the event switch type WebsocketManager struct { + websocket.Upgrader ew *events.EventSwitch cons map[string]*Connection } @@ -241,18 +242,38 @@ func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager { return &WebsocketManager{ ew: ew, cons: make(map[string]*Connection), + Upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + // TODO + return true + }, + }, } } -func (w *WebsocketManager) eventsHandler(con *websocket.Conn) { +func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) { + conn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO + log.Error("Failed to upgrade to websocket connection", "error", err) + return + } + wm.handleWebsocket(conn) + +} + +func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) { // register connection c := NewConnection(con) - w.cons[con.RemoteAddr().String()] = c + w.cons[c.id] = c + log.Info("New websocket connection", "origin", c.id) // read subscriptions/unsubscriptions to events go w.read(c) // write responses - go w.write(c) + w.write(c) } const ( @@ -274,19 +295,22 @@ func (w *WebsocketManager) read(con *Connection) { } default: var in []byte - if err := websocket.Message.Receive(con.wsCon, &in); err != nil { + _, in, err := con.wsCon.ReadMessage() + if err != nil { + //if err := websocket.Message.Receive(con.wsCon, &in); err != nil { // an error reading the connection, // so kill the connection con.quitChan <- struct{}{} } var req WsRequest - err := json.Unmarshal(in, &req) + err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) con.writeChan <- WsResponse{Error: errStr} } switch req.Type { case "subscribe": + log.Info("New event subscription", "con id", con.id, "event", req.Event) w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { resp := WsResponse{ Event: req.Event, @@ -328,7 +352,10 @@ func (w *WebsocketManager) write(con *Connection) { if *err != nil { log.Error("Failed to write JSON WsResponse", "error", err) } else { - websocket.Message.Send(con.wsCon, buf.Bytes()) + //websocket.Message.Send(con.wsCon, buf.Bytes()) + if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { + log.Error("Failed to write response on websocket", "error", err) + } } case <-con.quitChan: w.closeConn(con) diff --git a/rpc/http_server.go b/rpc/http_server.go index 2defdf5d0..1603ab79b 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -2,8 +2,10 @@ package rpc import ( + "bufio" "bytes" "fmt" + "net" "net/http" "runtime/debug" "time" @@ -50,7 +52,7 @@ func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { func RecoverAndLogHandler(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Wrap the ResponseWriter to remember the status - rww := &ResponseWriterWrapper{-1, w} + rww := &ResponseWriterWrapper{-1, w, w.(http.Hijacker)} begin := time.Now() // Common headers @@ -97,9 +99,24 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler { type ResponseWriterWrapper struct { Status int http.ResponseWriter + hj http.Hijacker // necessary for websocket upgrades } func (w *ResponseWriterWrapper) WriteHeader(status int) { w.Status = status w.ResponseWriter.WriteHeader(status) } + +// implements http.Hijacker +func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return w.hj.Hijack() +} + +// Stick it as a deferred statement in gouroutines to prevent the program from crashing. +func Recover(daemonName string) { + if e := recover(); e != nil { + stack := string(debug.Stack()) + errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack) + alert.Alert(errorString) + } +} diff --git a/rpc/test/client_rpc_test.go b/rpc/test/client_rpc_test.go index 0d455c3d3..9c261d9f9 100644 --- a/rpc/test/client_rpc_test.go +++ b/rpc/test/client_rpc_test.go @@ -1,6 +1,10 @@ package rpc import ( + "fmt" + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/rpc" + "net/http" "testing" ) @@ -73,3 +77,38 @@ func TestJSONCallCode(t *testing.T) { func TestJSONCallContract(t *testing.T) { testCall(t, "JSONRPC") } + +//-------------------------------------------------------------------------------- +// Test the websocket client + +func TestWSConnect(t *testing.T) { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + _, r, err := dialer.Dial(websocketAddr, rHeader) + if err != nil { + t.Fatal(err) + } + fmt.Println("respoinse:", r) + +} + +func TestWSSubscribe(t *testing.T) { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, _, err := dialer.Dial(websocketAddr, rHeader) + if err != nil { + t.Fatal(err) + } + err = con.WriteJSON(rpc.WsRequest{ + Type: "subscribe", + Event: "newblock", + }) + if err != nil { + t.Fatal(err) + } + /* + typ, p, err := con.ReadMessage() + fmt.Println("RESPONSE:", typ, string(p), err) + */ + +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 8fb594da4..82650c186 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -18,8 +18,9 @@ import ( // global variables for use across all tests var ( - rpcAddr = "127.0.0.1:8089" - requestAddr = "http://" + rpcAddr + "/" + rpcAddr = "127.0.0.1:8089" + requestAddr = "http://" + rpcAddr + "/" + websocketAddr = "ws://" + rpcAddr + "/events" node *nm.Node From 8e24b128884797a289b1ebb36f973b1eff9de8f0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 7 Apr 2015 15:24:09 -0500 Subject: [PATCH 02/12] rpc: first successful websocket event subscription --- consensus/reactor.go | 1 + consensus/state.go | 10 ++++++++++ rpc/core/routes.go | 4 ---- rpc/handlers.go | 34 +++++++++++++++++----------------- rpc/http_server.go | 1 + rpc/test/client_rpc_test.go | 12 ++++-------- 6 files changed, 33 insertions(+), 29 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 0e264714c..16ef2129f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -236,6 +236,7 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) { // implements events.Eventable func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { conR.evsw = evsw + conR.conS.SetEventSwitch(evsw) } //-------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index c036fa3b4..0a5431de8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -66,6 +66,7 @@ import ( . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" . "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -246,6 +247,8 @@ type ConsensusState struct { stagedBlock *types.Block // Cache last staged block. stagedState *sm.State // Cache result of staged block. lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. + + evsw *events.EventSwitch } func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { @@ -437,6 +440,8 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. + // newblock event! + cs.evsw.FireEvent("newblock", cs.state.LastBlockHash) scheduleNextAction() continue ACTION_LOOP } else { @@ -1107,6 +1112,11 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty } } +// implements events.Eventable +func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { + cs.evsw = evsw +} + //----------------------------------------------------------------------------- // total duration of given round diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 62630fb5a..395ad82a4 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -4,10 +4,6 @@ import ( "github.com/tendermint/tendermint/rpc" ) -/* -TODO: support Call && GetStorage. -*/ - var Routes = map[string]*rpc.RPCFunc{ "status": rpc.NewRPCFunc(Status, []string{}), "net_info": rpc.NewRPCFunc(NetInfo, []string{}), diff --git a/rpc/handlers.go b/rpc/handlers.go index 4e55e0267..aa112971f 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -26,7 +26,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { // websocket endpoint w := NewWebsocketManager(evsw) - http.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) + mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) } //------------------------------------- @@ -191,14 +191,20 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { //----------------------------------------------------------------------------- // rpc.websocket +const ( + WSConnectionReaperSeconds = 5 + MaxFailedSendsSeconds = 10 + WriteChanBufferSize = 10 +) + // for requests coming in -type WsRequest struct { +type WSRequest struct { Type string // subscribe or unsubscribe Event string } // for responses going out -type WsResponse struct { +type WSResponse struct { Event string Data interface{} Error string @@ -209,7 +215,7 @@ type WsResponse struct { type Connection struct { id string wsCon *websocket.Conn - writeChan chan WsResponse + writeChan chan WSResponse quitChan chan struct{} failedSends uint } @@ -219,7 +225,7 @@ func NewConnection(con *websocket.Conn) *Connection { return &Connection{ id: con.RemoteAddr().String(), wsCon: con, - writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full + writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full } } @@ -276,15 +282,9 @@ func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) { w.write(c) } -const ( - WsConnectionReaperSeconds = 5 - MaxFailedSendsSeconds = 10 - WriteChanBuffer = 10 -) - // read from the socket and subscribe to or unsubscribe from events func (w *WebsocketManager) read(con *Connection) { - reaper := time.Tick(time.Second * WsConnectionReaperSeconds) + reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { case <-reaper: @@ -302,17 +302,17 @@ func (w *WebsocketManager) read(con *Connection) { // so kill the connection con.quitChan <- struct{}{} } - var req WsRequest + var req WSRequest err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - con.writeChan <- WsResponse{Error: errStr} + con.writeChan <- WSResponse{Error: errStr} } switch req.Type { case "subscribe": log.Info("New event subscription", "con id", con.id, "event", req.Event) w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { - resp := WsResponse{ + resp := WSResponse{ Event: req.Event, Data: msg, } @@ -334,7 +334,7 @@ func (w *WebsocketManager) read(con *Connection) { w.ew.RemoveListener(con.id) } default: - con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type} + con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type} } } @@ -350,7 +350,7 @@ func (w *WebsocketManager) write(con *Connection) { buf := new(bytes.Buffer) binary.WriteJSON(msg, buf, n, err) if *err != nil { - log.Error("Failed to write JSON WsResponse", "error", err) + log.Error("Failed to write JSON WSResponse", "error", err) } else { //websocket.Message.Send(con.wsCon, buf.Bytes()) if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { diff --git a/rpc/http_server.go b/rpc/http_server.go index 1603ab79b..14ccb6521 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/alert" ) func StartHTTPServer(listenAddr string, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) { diff --git a/rpc/test/client_rpc_test.go b/rpc/test/client_rpc_test.go index 9c261d9f9..0d712192b 100644 --- a/rpc/test/client_rpc_test.go +++ b/rpc/test/client_rpc_test.go @@ -85,11 +85,10 @@ func TestWSConnect(t *testing.T) { dialer := websocket.DefaultDialer rHeader := http.Header{} _, r, err := dialer.Dial(websocketAddr, rHeader) + fmt.Println("respoinse:", r) if err != nil { t.Fatal(err) } - fmt.Println("respoinse:", r) - } func TestWSSubscribe(t *testing.T) { @@ -99,16 +98,13 @@ func TestWSSubscribe(t *testing.T) { if err != nil { t.Fatal(err) } - err = con.WriteJSON(rpc.WsRequest{ + err = con.WriteJSON(rpc.WSRequest{ Type: "subscribe", Event: "newblock", }) if err != nil { t.Fatal(err) } - /* - typ, p, err := con.ReadMessage() - fmt.Println("RESPONSE:", typ, string(p), err) - */ - + typ, p, err := con.ReadMessage() + fmt.Println("RESPONSE:", typ, string(p), err) } From 85661de2930b25fec4bb02375c9bba15681db646 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 13 Apr 2015 18:26:41 -0700 Subject: [PATCH 03/12] fire events, event urls --- consensus/state.go | 5 ++-- rpc/http_server.go | 2 +- state/execution.go | 45 +++++++++++++++++++++++++++++++++++- state/state.go | 9 ++++++++ types/events.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 types/events.go diff --git a/consensus/state.go b/consensus/state.go index 0a5431de8..ede0487a3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -440,8 +440,8 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. - // newblock event! - cs.evsw.FireEvent("newblock", cs.state.LastBlockHash) + newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) + cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) scheduleNextAction() continue ACTION_LOOP } else { @@ -1115,6 +1115,7 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty // implements events.Eventable func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { cs.evsw = evsw + cs.state.SetEventSwitch(evsw) } //----------------------------------------------------------------------------- diff --git a/rpc/http_server.go b/rpc/http_server.go index 14ccb6521..3b09ac373 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -10,10 +10,10 @@ import ( "runtime/debug" "time" + "github.com/tendermint/tendermint/alert" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" - "github.com/tendermint/tendermint/alert" ) func StartHTTPServer(listenAddr string, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) { diff --git a/state/execution.go b/state/execution.go index d7b3b78f7..dc0cb8c3f 100644 --- a/state/execution.go +++ b/state/execution.go @@ -295,7 +295,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // TODO: do something with fees fees := uint64(0) - _s := blockCache.State() // hack to access validators. + _s := blockCache.State() // hack to access validators and event switch. // Exec tx switch tx := tx_.(type) { @@ -325,6 +325,19 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { for _, acc := range accounts { blockCache.UpdateAccount(acc) } + + // If we're in a block (not mempool), + // fire event on all inputs and outputs + // see types/events.go for spec + if runCall { + for _, i := range tx.Inputs { + _s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx) + } + + for _, o := range tx.Outputs { + _s.evsw.FireEvent(types.EventStringAccOutput(o.Address), tx) + } + } return nil case *types.CallTx: @@ -413,14 +426,18 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { txCache.UpdateAccount(callee) // because we adjusted by input above. vmach := vm.NewVM(txCache, params, caller.Address) // NOTE: Call() transfers the value from caller to callee iff call succeeds. + ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) + var exception string if err != nil { + exception = err.Error() // Failure. Charge the gas fee. The 'value' was otherwise not transferred. log.Debug(Fmt("Error on execution: %v", err)) inAcc.Balance -= tx.Fee blockCache.UpdateAccount(inAcc) // Throw away 'txCache' which holds incomplete updates (don't sync it). } else { + exception = "" log.Debug("Successful execution") // Success if createAccount { @@ -431,6 +448,20 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { } // Create a receipt from the ret and whether errored. log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) + + // Fire Events for sender and receiver + // a separate event will be fired from vm for each + _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), struct { + Tx types.Tx + Return []byte + Exception string + }{tx, ret, exception}) + + _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), struct { + Tx types.Tx + Return []byte + Exception string + }{tx, ret, exception}) } else { // The mempool does not call txs until // the proposer determines the order of txs. @@ -498,6 +529,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { if !added { panic("Failed to add validator") } + if runCall { + _s.evsw.FireEvent(types.EventStringBond(), tx) + } return nil case *types.UnbondTx: @@ -520,6 +554,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.unbondValidator(val) + if runCall { + _s.evsw.FireEvent(types.EventStringUnbond(), tx) + } return nil case *types.RebondTx: @@ -542,6 +579,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.rebondValidator(val) + if runCall { + _s.evsw.FireEvent(types.EventStringRebond(), tx) + } return nil case *types.DupeoutTx: @@ -585,6 +625,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! (Bad validator!) _s.destroyValidator(accused) + if runCall { + _s.evsw.FireEvent(types.EventStringDupeout(), tx) + } return nil default: diff --git a/state/state.go b/state/state.go index 700202b87..c8d7037c1 100644 --- a/state/state.go +++ b/state/state.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" dbm "github.com/tendermint/tendermint/db" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/types" ) @@ -34,6 +35,8 @@ type State struct { UnbondingValidators *ValidatorSet accounts merkle.Tree // Shouldn't be accessed directly. validatorInfos merkle.Tree // Shouldn't be accessed directly. + + evsw *events.EventSwitch } func LoadState(db dbm.DB) *State { @@ -98,6 +101,7 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), + evsw: s.evsw, } } @@ -264,6 +268,11 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) { // State.storage //------------------------------------- +// implements events.Eventable +func (s *State) SetEventSwitch(evsw *events.EventSwitch) { + s.evsw = evsw +} + //----------------------------------------------------------------------------- type InvalidTxError struct { diff --git a/types/events.go b/types/events.go new file mode 100644 index 000000000..28965ecc9 --- /dev/null +++ b/types/events.go @@ -0,0 +1,57 @@ +package types + +import ( + "fmt" +) + +func EventStringAccInput(addr []byte) string { + return fmt.Sprintf("Acc/%x/Input", addr) +} + +func EventStringAccOutput(addr []byte) string { + return fmt.Sprintf("Acc/%x/Output", addr) +} + +func EventStringAccReceive(addr []byte) string { + return fmt.Sprintf("Acc/%x/Receive", addr) +} + +func EventStringBond() string { + return "Bond" +} + +func EventStringUnbond() string { + return "Unbond" +} + +func EventStringRebond() string { + return "Rebond" +} + +func EventStringDupeout() string { + return "Dupeout" +} + +func EventStringNewBlock() string { + return "NewBlock" +} + +func EventStringFork() string { + return "Fork" +} + +/* +Acc/XYZ/Input -> full tx or {full tx, return value, exception} +Acc/XYZ/Output -> full tx +Acc/XYZ/Receive -> full tx, return value, exception, (optionally?) calldata +Bond -> full tx +Unbond -> full tx +Rebond -> full tx +Dupeout -> full tx +NewBlock -> full block +Fork -> block A, block B + +Log -> Fuck this +NewPeer -> peer +Alert -> alert msg +*/ From 2e918e8c0ba935628e505eef55e11b6b114994cd Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 13 Apr 2015 21:43:13 -0700 Subject: [PATCH 04/12] rpc: websocket events testing --- consensus/state.go | 35 ++-- mempool/mempool.go | 4 +- rpc/test/client_rpc_test.go | 35 ---- rpc/test/client_ws_test.go | 327 ++++++++++++++++++++++++++++++++++++ rpc/test/helpers.go | 37 +++- rpc/test/tests.go | 38 +---- state/execution.go | 44 ++--- state/state.go | 2 +- vm/vm.go | 8 + 9 files changed, 415 insertions(+), 115 deletions(-) create mode 100644 rpc/test/client_ws_test.go diff --git a/consensus/state.go b/consensus/state.go index ede0487a3..a9db89a5c 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -73,11 +73,14 @@ import ( ) const ( - roundDuration0 = 10 * time.Second // The first round is 60 seconds long. - roundDurationDelta = 3 * time.Second // Each successive round lasts 15 seconds longer. roundDeadlinePrevote = float64(1.0 / 3.0) // When the prevote is due. roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due. - newHeightDelta = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds. +) + +var ( + RoundDuration0 = 10 * time.Second // The first round is 60 seconds long. + RoundDurationDelta = 3 * time.Second // Each successive round lasts 15 seconds longer. + newHeightDelta = RoundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds. ) var ( @@ -318,14 +321,14 @@ func (cs *ConsensusState) stepTransitionRoutine() { // NOTE: We can push directly to runActionCh because // we're running in a separate goroutine, which avoids deadlocks. rs := cs.getRoundState() - round, roundStartTime, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) + round, roundStartTime, RoundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) log.Debug("Scheduling next action", "height", rs.Height, "round", round, "step", rs.Step, "roundStartTime", roundStartTime, "elapsedRatio", elapsedRatio) switch rs.Step { case RoundStepNewHeight: // We should run RoundActionPropose when rs.StartTime passes. if elapsedRatio < 0 { // startTime is in the future. - time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(RoundDuration))) } cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose} case RoundStepNewRound: @@ -333,15 +336,15 @@ func (cs *ConsensusState) stepTransitionRoutine() { cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose} case RoundStepPropose: // Wake up when it's time to vote. - time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(RoundDuration))) cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote} case RoundStepPrevote: // Wake up when it's time to precommit. - time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(RoundDuration))) cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit} case RoundStepPrecommit: // Wake up when the round is over. - time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(roundDuration))) + time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(RoundDuration))) cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionTryCommit} case RoundStepCommit: // There's nothing to scheudle, we're waiting for @@ -1122,13 +1125,13 @@ func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { // total duration of given round func calcRoundDuration(round uint) time.Duration { - return roundDuration0 + roundDurationDelta*time.Duration(round) + return RoundDuration0 + RoundDurationDelta*time.Duration(round) } // startTime is when round zero started. func calcRoundStartTime(round uint, startTime time.Time) time.Time { - return startTime.Add(roundDuration0*time.Duration(round) + - roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2))) + return startTime.Add(RoundDuration0*time.Duration(round) + + RoundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2))) } // calculates the current round given startTime of round zero. @@ -1142,8 +1145,8 @@ func calcRound(startTime time.Time) uint { // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0. // AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now). // R = Floor((-B + Sqrt(B^2 - 4AC))/2A) - A := float64(roundDurationDelta) - B := 2.0*float64(roundDuration0) - float64(roundDurationDelta) + A := float64(RoundDurationDelta) + B := 2.0*float64(RoundDuration0) - float64(RoundDurationDelta) C := 2.0 * float64(startTime.Sub(now)) R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)) / (2 * A)) if math.IsNaN(R) { @@ -1160,12 +1163,12 @@ func calcRound(startTime time.Time) uint { // convenience // NOTE: elapsedRatio can be negative if startTime is in the future. -func calcRoundInfo(startTime time.Time) (round uint, roundStartTime time.Time, roundDuration time.Duration, +func calcRoundInfo(startTime time.Time) (round uint, roundStartTime time.Time, RoundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) { round = calcRound(startTime) roundStartTime = calcRoundStartTime(round, startTime) - roundDuration = calcRoundDuration(round) + RoundDuration = calcRoundDuration(round) roundElapsed = time.Now().Sub(roundStartTime) - elapsedRatio = float64(roundElapsed) / float64(roundDuration) + elapsedRatio = float64(roundElapsed) / float64(RoundDuration) return } diff --git a/mempool/mempool.go b/mempool/mempool.go index 819198778..d5c6dad06 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache { func (mem *Mempool) AddTx(tx types.Tx) (err error) { mem.mtx.Lock() defer mem.mtx.Unlock() - err = sm.ExecTx(mem.cache, tx, false) + err = sm.ExecTx(mem.cache, tx, false, false) if err != nil { log.Debug("AddTx() error", "tx", tx, "error", err) return err @@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { // Next, filter all txs that aren't valid given new state. validTxs := []types.Tx{} for _, tx := range txs { - err := sm.ExecTx(mem.cache, tx, false) + err := sm.ExecTx(mem.cache, tx, false, false) if err == nil { log.Debug("Filter in, valid", "tx", tx) validTxs = append(validTxs, tx) diff --git a/rpc/test/client_rpc_test.go b/rpc/test/client_rpc_test.go index 0d712192b..0d455c3d3 100644 --- a/rpc/test/client_rpc_test.go +++ b/rpc/test/client_rpc_test.go @@ -1,10 +1,6 @@ package rpc import ( - "fmt" - "github.com/gorilla/websocket" - "github.com/tendermint/tendermint/rpc" - "net/http" "testing" ) @@ -77,34 +73,3 @@ func TestJSONCallCode(t *testing.T) { func TestJSONCallContract(t *testing.T) { testCall(t, "JSONRPC") } - -//-------------------------------------------------------------------------------- -// Test the websocket client - -func TestWSConnect(t *testing.T) { - dialer := websocket.DefaultDialer - rHeader := http.Header{} - _, r, err := dialer.Dial(websocketAddr, rHeader) - fmt.Println("respoinse:", r) - if err != nil { - t.Fatal(err) - } -} - -func TestWSSubscribe(t *testing.T) { - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, _, err := dialer.Dial(websocketAddr, rHeader) - if err != nil { - t.Fatal(err) - } - err = con.WriteJSON(rpc.WSRequest{ - Type: "subscribe", - Event: "newblock", - }) - if err != nil { - t.Fatal(err) - } - typ, p, err := con.ReadMessage() - fmt.Println("RESPONSE:", typ, string(p), err) -} diff --git a/rpc/test/client_ws_test.go b/rpc/test/client_ws_test.go new file mode 100644 index 000000000..435c10cf4 --- /dev/null +++ b/rpc/test/client_ws_test.go @@ -0,0 +1,327 @@ +package rpc + +import ( + "bytes" + "encoding/hex" + "fmt" + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/rpc" + "github.com/tendermint/tendermint/types" + "net/http" + "testing" + "time" +) + +//-------------------------------------------------------------------------------- +// Utilities for testing the websocket service + +// create a new connection +func newWSCon(t *testing.T) *websocket.Conn { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, r, err := dialer.Dial(websocketAddr, rHeader) + fmt.Println("response", r) + if err != nil { + t.Fatal(err) + } + return con +} + +// subscribe to an event +func subscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpc.WSRequest{ + Type: "subscribe", + Event: eventid, + }) + if err != nil { + t.Fatal(err) + } +} + +// unsubscribe from an event +func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpc.WSRequest{ + Type: "unsubscribe", + Event: eventid, + }) + if err != nil { + t.Fatal(err) + } +} + +// wait for an event, do things that might trigger events, and check them when they are received +func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { + // go routine to wait for webscoket msg + gch := make(chan []byte) + ech := make(chan error) + go func() { + typ, p, err := con.ReadMessage() + fmt.Println("RESPONSE:", typ, string(p), err) + if err != nil { + ech <- err + } else { + gch <- p + } + }() + + // do stuff (transactions) + f() + + // if the event is not received in 20 seconds, die + ticker := time.Tick(10 * time.Second) + select { + case <-ticker: + if dieOnTimeout { + con.Close() + t.Fatalf("%s event was not received in time", eventid) + } + // else that's great, we didn't hear the event + case p := <-gch: + if dieOnTimeout { + // message was received and expected + // run the check + err := check(eventid, p) + if err != nil { + t.Fatal(err) + } + } else { + con.Close() + t.Fatalf("%s event was not expected", eventid) + } + case err := <-ech: + t.Fatal(err) + } +} + +func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { + // unmarshall and assert somethings + var response struct { + Event string + Data *types.Block + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return nil, err + } + if response.Error != "" { + return nil, fmt.Errorf(response.Error) + } + block := response.Data + return block, nil +} + +//-------------------------------------------------------------------------------- +// Test the websocket service + +// make a simple connection to the server +func TestWSConnect(t *testing.T) { + con := newWSCon(t) + con.Close() +} + +// receive a new block message +func _TestWSNewBlock(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + fmt.Println("Check:", string(b)) + return nil + }) +} + +// receive a few new block messages in a row, with increasing height +func TestWSBlockchainGrowth(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + var initBlockN uint + for i := 0; i < 2; i++ { + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + block, err := unmarshalResponseNewBlock(b) + if err != nil { + return err + } + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+uint(i) { + return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) + } + } + + return nil + }) + } +} + +// send a transaction and validate the events from listening for both sender and receiver +func TestWSSend(t *testing.T) { + toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} + amt := uint64(100) + + con := newWSCon(t) + eidInput := types.EventStringAccInput(byteAddr) + eidOutput := types.EventStringAccOutput(toAddr) + subscribe(t, con, eidInput) + subscribe(t, con, eidOutput) + defer func() { + unsubscribe(t, con, eidInput) + unsubscribe(t, con, eidOutput) + con.Close() + }() + checkerFunc := func(eid string, b []byte) error { + // unmarshal and assert correctness + var response struct { + Event string + Data types.SendTx + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if eid != response.Event { + return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid) + } + tx := response.Data + if bytes.Compare(tx.Inputs[0].Address, byteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, byteAddr) + } + if tx.Inputs[0].Amount != amt { + return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt) + } + if bytes.Compare(tx.Outputs[0].Address, toAddr) != 0 { + return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, byteAddr) + } + return nil + } + waitForEvent(t, con, eidInput, true, func() { + broadcastTx(t, "JSONRPC", byteAddr, toAddr, nil, byteKey, amt, 0, 0) + }, checkerFunc) + waitForEvent(t, con, eidOutput, true, func() {}, checkerFunc) +} + +// ensure events are only fired once for a given transaction +func TestWSDoubleFire(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringAccInput(byteAddr) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + amt := uint64(100) + toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} + // broadcast the transaction, wait to hear about it + waitForEvent(t, con, eid, true, func() { + broadcastTx(t, "JSONRPC", byteAddr, toAddr, nil, byteKey, amt, 0, 0) + }, func(eid string, b []byte) error { + return nil + }) + // but make sure we don't hear about it twice + waitForEvent(t, con, eid, false, func() { + }, func(eid string, b []byte) error { + return nil + }) +} + +// create a contract and send it a msg, validate the return +func TestWSCall(t *testing.T) { + byteAddr, _ := hex.DecodeString(userAddr) + con := newWSCon(t) + eid := types.EventStringAccInput(byteAddr) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + amt := uint64(10000) + code, returnCode, returnVal := simpleCallContract() + var contractAddr []byte + // wait for the contract to be created + waitForEvent(t, con, eid, true, func() { + _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + contractAddr = receipt.ContractAddr + + }, func(eid string, b []byte) error { + // unmarshall and assert somethings + var response struct { + Event string + Data struct { + Tx types.CallTx + Return []byte + Exception string + } + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if response.Data.Exception != "" { + return fmt.Errorf(response.Data.Exception) + } + tx := response.Data.Tx + if bytes.Compare(tx.Input.Address, byteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, byteAddr) + } + if tx.Input.Amount != amt { + return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt) + } + ret := response.Data.Return + if bytes.Compare(ret, returnCode) != 0 { + return fmt.Errorf("Create did not return correct byte code for new contract. Got %x, expected %x", ret, returnCode) + } + return nil + }) + + // get the return value from a call + data := []byte{0x1} // just needs to be non empty for this to be a CallTx + waitForEvent(t, con, eid, true, func() { + broadcastTx(t, "JSONRPC", byteAddr, contractAddr, data, byteKey, amt, 1000, 1000) + }, func(eid string, b []byte) error { + // unmarshall and assert somethings + var response struct { + Event string + Data struct { + Tx types.CallTx + Return []byte + Exception string + } + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + ret := response.Data.Return + if bytes.Compare(ret, returnVal) != 0 { + return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnVal) + } + return nil + }) +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 82650c186..192c132bb 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -6,6 +6,7 @@ import ( "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/logger" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/p2p" @@ -14,6 +15,7 @@ import ( "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "testing" + "time" ) // global variables for use across all tests @@ -26,9 +28,10 @@ var ( mempoolCount = 0 - userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" - userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" - userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" + userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + byteAddr, byteKey = initUserBytes() clients = map[string]cclient.Client{ "JSONRPC": cclient.NewClient(requestAddr, "JSONRPC"), @@ -36,6 +39,14 @@ var ( } ) +func initUserBytes() ([]byte, [64]byte) { + byteAddr, _ := hex.DecodeString(userAddr) + var byteKey [64]byte + oh, _ := hex.DecodeString(userPriv) + copy(byteKey[:], oh) + return byteAddr, byteKey +} + func decodeHex(hexStr string) []byte { bytes, err := hex.DecodeString(hexStr) if err != nil { @@ -84,6 +95,9 @@ func init() { priv.SetFile(rootDir + "/priv_validator.json") priv.Save() + consensus.RoundDuration0 = 3 * time.Second + consensus.RoundDurationDelta = 1 * time.Second + // start a node ready := make(chan struct{}) go newNode(ready) @@ -264,3 +278,20 @@ func checkTx(t *testing.T, fromAddr []byte, priv *account.PrivAccount, tx *types t.Fatal(types.ErrTxInvalidSignature) } } + +// simple contract returns 5 + 6 = 0xb +func simpleCallContract() ([]byte, []byte, []byte) { + // this is the code we want to run when the contract is called + contractCode := []byte{0x60, 0x5, 0x60, 0x6, 0x1, 0x60, 0x0, 0x52, 0x60, 0x20, 0x60, 0x0, 0xf3} + // the is the code we need to return the contractCode when the contract is initialized + lenCode := len(contractCode) + // push code to the stack + //code := append([]byte{byte(0x60 + lenCode - 1)}, LeftPadWord256(contractCode).Bytes()...) + code := append([]byte{0x7f}, RightPadWord256(contractCode).Bytes()...) + // store it in memory + code = append(code, []byte{0x60, 0x0, 0x52}...) + // return whats in memory + //code = append(code, []byte{0x60, byte(32 - lenCode), 0x60, byte(lenCode), 0xf3}...) + code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) + return code, contractCode, LeftPadBytes([]byte{0xb}, 32) +} diff --git a/rpc/test/tests.go b/rpc/test/tests.go index 8fbd81e30..4d8abb842 100644 --- a/rpc/test/tests.go +++ b/rpc/test/tests.go @@ -49,11 +49,6 @@ func testGetAccount(t *testing.T, typ string) { } func testSignedTx(t *testing.T, typ string) { - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - amt := uint64(100) toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} tx, priv := signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) @@ -69,11 +64,6 @@ func testSignedTx(t *testing.T, typ string) { } func testBroadcastTx(t *testing.T, typ string) { - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - amt := uint64(100) toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} tx, receipt := broadcastTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) @@ -103,11 +93,6 @@ func testGetStorage(t *testing.T, typ string) { _ = priv //core.SetPrivValidator(priv) - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - amt := uint64(1100) code := []byte{0x60, 0x5, 0x60, 0x1, 0x55} _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) @@ -153,30 +138,9 @@ func testCallCode(t *testing.T, typ string) { func testCall(t *testing.T, typ string) { client := clients[typ] - priv := state.LoadPrivValidator(".tendermint/priv_validator.json") - _ = priv - //core.SetPrivValidator(priv) - - byteAddr, _ := hex.DecodeString(userAddr) - var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) - // create the contract amt := uint64(6969) - // this is the code we want to run when the contract is called - contractCode := []byte{0x60, 0x5, 0x60, 0x6, 0x1, 0x60, 0x0, 0x52, 0x60, 0x20, 0x60, 0x0, 0xf3} - // the is the code we need to return the contractCode when the contract is initialized - lenCode := len(contractCode) - // push code to the stack - //code := append([]byte{byte(0x60 + lenCode - 1)}, LeftPadWord256(contractCode).Bytes()...) - code := append([]byte{0x7f}, RightPadWord256(contractCode).Bytes()...) - // store it in memory - code = append(code, []byte{0x60, 0x0, 0x52}...) - // return whats in memory - //code = append(code, []byte{0x60, byte(32 - lenCode), 0x60, byte(lenCode), 0xf3}...) - code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) - + code, _, _ := simpleCallContract() _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) if receipt.CreatesContract == 0 { t.Fatal("This tx creates a contract") diff --git a/state/execution.go b/state/execution.go index dc0cb8c3f..0d687768e 100644 --- a/state/execution.go +++ b/state/execution.go @@ -13,7 +13,7 @@ import ( // NOTE: If an error occurs during block execution, state will be left // at an invalid state. Copy the state before calling ExecBlock! func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error { - err := execBlock(s, block, blockPartsHeader) + err := execBlock(s, block, blockPartsHeader, true) if err != nil { return err } @@ -29,7 +29,7 @@ func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // executes transactions of a block, does not check block.StateHash // NOTE: If an error occurs during block execution, state will be left // at an invalid state. Copy the state before calling execBlock! -func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error { +func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader, fireEvents bool) error { // Basic block validation. err := block.ValidateBasic(s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime) if err != nil { @@ -111,7 +111,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // Commit each tx for _, tx := range block.Data.Txs { - err := ExecTx(blockCache, tx, true) + err := ExecTx(blockCache, tx, true, fireEvents) if err != nil { return InvalidTxError{tx, err} } @@ -291,7 +291,7 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu // If the tx is invalid, an error will be returned. // Unlike ExecBlock(), state will not be altered. -func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { +func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) error { // TODO: do something with fees fees := uint64(0) @@ -329,7 +329,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // If we're in a block (not mempool), // fire event on all inputs and outputs // see types/events.go for spec - if runCall { + if fireEvents { for _, i := range tx.Inputs { _s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx) } @@ -449,19 +449,21 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Create a receipt from the ret and whether errored. log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) - // Fire Events for sender and receiver - // a separate event will be fired from vm for each - _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), struct { - Tx types.Tx - Return []byte - Exception string - }{tx, ret, exception}) + if fireEvents { + // Fire Events for sender and receiver + // a separate event will be fired from vm for each + _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), struct { + Tx types.Tx + Return []byte + Exception string + }{tx, ret, exception}) - _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), struct { - Tx types.Tx - Return []byte - Exception string - }{tx, ret, exception}) + _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), struct { + Tx types.Tx + Return []byte + Exception string + }{tx, ret, exception}) + } } else { // The mempool does not call txs until // the proposer determines the order of txs. @@ -529,7 +531,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { if !added { panic("Failed to add validator") } - if runCall { + if fireEvents { _s.evsw.FireEvent(types.EventStringBond(), tx) } return nil @@ -554,7 +556,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.unbondValidator(val) - if runCall { + if fireEvents { _s.evsw.FireEvent(types.EventStringUnbond(), tx) } return nil @@ -579,7 +581,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.rebondValidator(val) - if runCall { + if fireEvents { _s.evsw.FireEvent(types.EventStringRebond(), tx) } return nil @@ -625,7 +627,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! (Bad validator!) _s.destroyValidator(accused) - if runCall { + if fireEvents { _s.evsw.FireEvent(types.EventStringDupeout(), tx) } return nil diff --git a/state/state.go b/state/state.go index c8d7037c1..4bd3da422 100644 --- a/state/state.go +++ b/state/state.go @@ -119,7 +119,7 @@ func (s *State) Hash() []byte { // Mutates the block in place and updates it with new state hash. func (s *State) SetBlockStateHash(block *types.Block) error { sCopy := s.Copy() - err := execBlock(sCopy, block, types.PartSetHeader{}) + err := execBlock(sCopy, block, types.PartSetHeader{}, false) // don't fire events if err != nil { return err } diff --git a/vm/vm.go b/vm/vm.go index cbf68729d..b37d9ca3e 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -6,6 +6,7 @@ import ( "math/big" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/vm/sha3" ) @@ -45,6 +46,8 @@ type VM struct { origin Word256 callDepth int + + evsw *events.EventSwitch } func NewVM(appState AppState, params Params, origin Word256) *VM { @@ -56,6 +59,11 @@ func NewVM(appState AppState, params Params, origin Word256) *VM { } } +// satisfies events.Eventable +func (vm *VM) SetEventSwitch(evsw *events.EventSwitch) { + vm.evsw = evsw +} + // CONTRACT appState is aware of caller and callee, so we can just mutate them. // value: To be transferred from caller to callee. Refunded upon error. // gas: Available gas. No refunds for gas. From 860d5478e93005f4ed38f566ac2ed8b9e8267baa Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 14 Apr 2015 01:42:11 -0700 Subject: [PATCH 05/12] rpc: use NewBlock event in rpc tests --- rpc/test/tests.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/rpc/test/tests.go b/rpc/test/tests.go index 4d8abb842..569294d32 100644 --- a/rpc/test/tests.go +++ b/rpc/test/tests.go @@ -9,7 +9,7 @@ import ( "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "testing" - "time" + //"time" ) func testStatus(t *testing.T, typ string) { @@ -89,6 +89,13 @@ func testBroadcastTx(t *testing.T, typ string) { } func testGetStorage(t *testing.T, typ string) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() priv := state.LoadPrivValidator(".tendermint/priv_validator.json") _ = priv //core.SetPrivValidator(priv) @@ -108,7 +115,11 @@ func testGetStorage(t *testing.T, typ string) { } // allow it to get mined - time.Sleep(time.Second * 20) + //time.Sleep(time.Second * 20) + waitForEvent(t, con, eid, true, func() { + }, func(eid string, b []byte) error { + return nil + }) mempoolCount = 0 v := getStorage(t, typ, contractAddr, []byte{0x1}) @@ -136,6 +147,14 @@ func testCallCode(t *testing.T, typ string) { } func testCall(t *testing.T, typ string) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + client := clients[typ] // create the contract @@ -154,7 +173,11 @@ func testCall(t *testing.T, typ string) { } // allow it to get mined - time.Sleep(time.Second * 20) + //time.Sleep(time.Second * 20) + waitForEvent(t, con, eid, true, func() { + }, func(eid string, b []byte) error { + return nil + }) mempoolCount = 0 // run a call through the contract From b4388ae0106c178494e441c765e6b895995d0c3e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 14 Apr 2015 19:25:49 -0700 Subject: [PATCH 06/12] vm: eventable and flip fix on CALL address --- vm/test/vm_test.go | 4 ++-- vm/types.go | 8 ++++++++ vm/vm.go | 26 +++++++++++++++++++++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/vm/test/vm_test.go b/vm/test/vm_test.go index ee3fe7e57..740d0063a 100644 --- a/vm/test/vm_test.go +++ b/vm/test/vm_test.go @@ -36,7 +36,7 @@ func makeBytes(n int) []byte { } func TestVM(t *testing.T) { - ourVm := NewVM(newAppState(), newParams(), Zero256) + ourVm := NewVM(newAppState(), newParams(), Zero256, nil) // Create accounts account1 := &Account{ @@ -72,7 +72,7 @@ func TestSubcurrency(t *testing.T) { st.accounts[account1.Address.String()] = account1 st.accounts[account2.Address.String()] = account2 - ourVm := NewVM(st, newParams(), Zero256) + ourVm := NewVM(st, newParams(), Zero256, nil) var gas uint64 = 1000 code_parts := []string{"620f42403355", diff --git a/vm/types.go b/vm/types.go index 443b72c63..3d7b9239e 100644 --- a/vm/types.go +++ b/vm/types.go @@ -46,3 +46,11 @@ type Params struct { BlockTime int64 GasLimit uint64 } + +type CallData struct { + Caller []byte + Callee []byte + Data []byte + Value uint64 + Gas uint64 +} diff --git a/vm/vm.go b/vm/vm.go index b37d9ca3e..fe184f212 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -7,6 +7,7 @@ import ( . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/vm/sha3" ) @@ -44,18 +45,20 @@ type VM struct { appState AppState params Params origin Word256 + txid []byte callDepth int evsw *events.EventSwitch } -func NewVM(appState AppState, params Params, origin Word256) *VM { +func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM { return &VM{ appState: appState, params: params, origin: origin, callDepth: 0, + txid: txid, } } @@ -80,12 +83,31 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga vm.callDepth += 1 output, err = vm.call(caller, callee, code, input, value, gas) vm.callDepth -= 1 + exception := "" if err != nil { + exception = err.Error() err := transfer(callee, caller, value) if err != nil { panic("Could not return value to caller") } } + // if callDepth is 0 the event is fired from ExecTx (along with the Input invent) + // otherwise, we fire from here. + if vm.callDepth != 0 && vm.evsw != nil { + vm.evsw.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), struct { + CallData *CallData + Origin []byte + TxId []byte + Return []byte + Exception string + }{ + &CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas}, + vm.origin.Prefix(20), + vm.txid, + output, + exception, + }) + } return } @@ -636,6 +658,8 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } + // :( + addr = RightPadWord256(flip(addr.Prefix(20))) acc := vm.appState.GetAccount(addr) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) From 34098edb93fe3105e0932afa2a38b078417ac754 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 14 Apr 2015 19:26:55 -0700 Subject: [PATCH 07/12] state: fireEvents flag on ExecTx and fixes for GetAccount --- state/block_cache.go | 6 ++++-- state/execution.go | 5 ++++- state/state_test.go | 2 +- state/tx_cache.go | 8 ++++++-- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/state/block_cache.go b/state/block_cache.go index c900accc7..c9e0348bf 100644 --- a/state/block_cache.go +++ b/state/block_cache.go @@ -53,7 +53,9 @@ func (cache *BlockCache) GetAccount(addr []byte) *ac.Account { return acc } else { acc = cache.backend.GetAccount(addr) - cache.accounts[string(addr)] = accountInfo{acc, nil, false, false} + if acc != nil { + cache.accounts[string(addr)] = accountInfo{acc, nil, false, false} + } return acc } } @@ -187,7 +189,7 @@ func (cache *BlockCache) Sync() { } } else { if acc == nil { - panic(Fmt("Account should not be nil for addr: %X", acc.Address)) + panic(Fmt("Account should not be nil for addr: %x", addrStr)) } if storage != nil { newStorageRoot := storage.Save() diff --git a/state/execution.go b/state/execution.go index 0d687768e..1a89fecce 100644 --- a/state/execution.go +++ b/state/execution.go @@ -296,6 +296,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // TODO: do something with fees fees := uint64(0) _s := blockCache.State() // hack to access validators and event switch. + nilSwitch := _s.evsw == nil + fireEvents = fireEvents && !nilSwitch // Exec tx switch tx := tx_.(type) { @@ -424,7 +426,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe. txCache.UpdateAccount(callee) // because we adjusted by input above. - vmach := vm.NewVM(txCache, params, caller.Address) + vmach := vm.NewVM(txCache, params, caller.Address, account.SignBytes(tx)) + vmach.SetEventSwitch(_s.evsw) // NOTE: Call() transfers the value from caller to callee iff call succeeds. ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) diff --git a/state/state_test.go b/state/state_test.go index 5f2990652..58a673195 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -12,7 +12,7 @@ import ( func execTxWithState(state *State, tx types.Tx, runCall bool) error { cache := NewBlockCache(state) - err := ExecTx(cache, tx, runCall) + err := ExecTx(cache, tx, runCall, false) if err != nil { return err } else { diff --git a/state/tx_cache.go b/state/tx_cache.go index d4188db3c..425a1f816 100644 --- a/state/tx_cache.go +++ b/state/tx_cache.go @@ -30,9 +30,13 @@ func (cache *TxCache) GetAccount(addr Word256) *vm.Account { acc, removed := vmUnpack(cache.accounts[addr]) if removed { return nil - } else { - return acc + } else if acc == nil { + acc2 := cache.backend.GetAccount(addr.Prefix(20)) + if acc2 != nil { + return toVMAccount(acc2) + } } + return acc } func (cache *TxCache) UpdateAccount(acc *vm.Account) { From 9b76cfe26a84621f93ae20f42e93c3803ce739c0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 14 Apr 2015 19:29:18 -0700 Subject: [PATCH 08/12] rpc: cleanup tests and test contract calls --- rpc/core/txs.go | 4 +- rpc/handlers.go | 1 - rpc/test/client_ws_test.go | 245 +++++++++++++++++-------------------- rpc/test/helpers.go | 29 ++++- rpc/test/tests.go | 2 +- rpc/test/ws_helpers.go | 143 ++++++++++++++++++++++ 6 files changed, 286 insertions(+), 138 deletions(-) create mode 100644 rpc/test/ws_helpers.go diff --git a/rpc/core/txs.go b/rpc/core/txs.go index f16a3ae70..628bc6584 100644 --- a/rpc/core/txs.go +++ b/rpc/core/txs.go @@ -43,7 +43,7 @@ func Call(address, data []byte) (*ctypes.ResponseCall, error) { GasLimit: 10000000, } - vmach := vm.NewVM(txCache, params, caller.Address) + vmach := vm.NewVM(txCache, params, caller.Address, nil) gas := uint64(1000000000) ret, err := vmach.Call(caller, callee, callee.Code, data, 0, &gas) if err != nil { @@ -68,7 +68,7 @@ func CallCode(code, data []byte) (*ctypes.ResponseCall, error) { GasLimit: 10000000, } - vmach := vm.NewVM(txCache, params, caller.Address) + vmach := vm.NewVM(txCache, params, caller.Address, nil) gas := uint64(1000000000) ret, err := vmach.Call(caller, callee, code, data, 0, &gas) if err != nil { diff --git a/rpc/handlers.go b/rpc/handlers.go index aa112971f..37d817ef8 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -297,7 +297,6 @@ func (w *WebsocketManager) read(con *Connection) { var in []byte _, in, err := con.wsCon.ReadMessage() if err != nil { - //if err := websocket.Message.Receive(con.wsCon, &in); err != nil { // an error reading the connection, // so kill the connection con.quitChan <- struct{}{} diff --git a/rpc/test/client_ws_test.go b/rpc/test/client_ws_test.go index 435c10cf4..b8ae53123 100644 --- a/rpc/test/client_ws_test.go +++ b/rpc/test/client_ws_test.go @@ -1,10 +1,11 @@ package rpc import ( - "bytes" "encoding/hex" + "encoding/json" "fmt" "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/types" @@ -50,25 +51,39 @@ func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { } } -// wait for an event, do things that might trigger events, and check them when they are received +// wait for an event; do things that might trigger events, and check them when they are received func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { // go routine to wait for webscoket msg - gch := make(chan []byte) - ech := make(chan error) + gch := make(chan []byte) // good channel + ech := make(chan error) // error channel go func() { - typ, p, err := con.ReadMessage() - fmt.Println("RESPONSE:", typ, string(p), err) - if err != nil { - ech <- err - } else { - gch <- p + for { + _, p, err := con.ReadMessage() + if err != nil { + ech <- err + break + } else { + // if the event id isnt what we're waiting on + // ignore it + var response struct { + Event string + } + if err := json.Unmarshal(p, &response); err != nil { + ech <- err + break + } + if response.Event == eventid { + gch <- p + break + } + } } }() // do stuff (transactions) f() - // if the event is not received in 20 seconds, die + // wait for an event or 10 seconds ticker := time.Tick(10 * time.Second) select { case <-ticker: @@ -77,6 +92,7 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou t.Fatalf("%s event was not received in time", eventid) } // else that's great, we didn't hear the event + // and we shouldn't have case p := <-gch: if dieOnTimeout { // message was received and expected @@ -146,24 +162,8 @@ func TestWSBlockchainGrowth(t *testing.T) { unsubscribe(t, con, eid) con.Close() }() - var initBlockN uint - for i := 0; i < 2; i++ { - waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { - block, err := unmarshalResponseNewBlock(b) - if err != nil { - return err - } - if i == 0 { - initBlockN = block.Header.Height - } else { - if block.Header.Height != initBlockN+uint(i) { - return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) - } - } - - return nil - }) - } + // listen for NewBlock, ensure height increases by 1 + unmarshalValidateBlockchain(t, con, eid) } // send a transaction and validate the events from listening for both sender and receiver @@ -181,40 +181,10 @@ func TestWSSend(t *testing.T) { unsubscribe(t, con, eidOutput) con.Close() }() - checkerFunc := func(eid string, b []byte) error { - // unmarshal and assert correctness - var response struct { - Event string - Data types.SendTx - Error string - } - var err error - binary.ReadJSON(&response, b, &err) - if err != nil { - return err - } - if response.Error != "" { - return fmt.Errorf(response.Error) - } - if eid != response.Event { - return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid) - } - tx := response.Data - if bytes.Compare(tx.Inputs[0].Address, byteAddr) != 0 { - return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, byteAddr) - } - if tx.Inputs[0].Amount != amt { - return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt) - } - if bytes.Compare(tx.Outputs[0].Address, toAddr) != 0 { - return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, byteAddr) - } - return nil - } waitForEvent(t, con, eidInput, true, func() { broadcastTx(t, "JSONRPC", byteAddr, toAddr, nil, byteKey, amt, 0, 0) - }, checkerFunc) - waitForEvent(t, con, eidOutput, true, func() {}, checkerFunc) + }, unmarshalValidateSend(amt, toAddr)) + waitForEvent(t, con, eidOutput, true, func() {}, unmarshalValidateSend(amt, toAddr)) } // ensure events are only fired once for a given transaction @@ -241,87 +211,96 @@ func TestWSDoubleFire(t *testing.T) { }) } -// create a contract and send it a msg, validate the return -func TestWSCall(t *testing.T) { +// create a contract, wait for the event, and send it a msg, validate the return +func TestWSCallWait(t *testing.T) { byteAddr, _ := hex.DecodeString(userAddr) con := newWSCon(t) - eid := types.EventStringAccInput(byteAddr) + eid1 := types.EventStringAccInput(byteAddr) + subscribe(t, con, eid1) + defer func() { + unsubscribe(t, con, eid1) + con.Close() + }() + amt := uint64(10000) + code, returnCode, returnVal := simpleContract() + var contractAddr []byte + // wait for the contract to be created + waitForEvent(t, con, eid1, true, func() { + _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + contractAddr = receipt.ContractAddr + + }, unmarshalValidateCall(amt, returnCode)) + + // susbscribe to the new contract + amt = uint64(10001) + eid2 := types.EventStringAccReceive(contractAddr) + subscribe(t, con, eid2) + defer func() { + unsubscribe(t, con, eid2) + }() + // get the return value from a call + data := []byte{0x1} // just needs to be non empty for this to be a CallTx + waitForEvent(t, con, eid2, true, func() { + broadcastTx(t, "JSONRPC", byteAddr, contractAddr, data, byteKey, amt, 1000, 1000) + }, unmarshalValidateCall(amt, returnVal)) +} + +// create a contract and send it a msg without waiting. wait for contract event +// and validate return +func TestWSCallNoWait(t *testing.T) { + byteAddr, _ := hex.DecodeString(userAddr) + con := newWSCon(t) + amt := uint64(10000) + code, _, returnVal := simpleContract() + + _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + contractAddr := receipt.ContractAddr + + // susbscribe to the new contract + amt = uint64(10001) + eid := types.EventStringAccReceive(contractAddr) subscribe(t, con, eid) defer func() { unsubscribe(t, con, eid) con.Close() }() - amt := uint64(10000) - code, returnCode, returnVal := simpleCallContract() - var contractAddr []byte - // wait for the contract to be created - waitForEvent(t, con, eid, true, func() { - _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) - contractAddr = receipt.ContractAddr - - }, func(eid string, b []byte) error { - // unmarshall and assert somethings - var response struct { - Event string - Data struct { - Tx types.CallTx - Return []byte - Exception string - } - Error string - } - var err error - binary.ReadJSON(&response, b, &err) - if err != nil { - return err - } - if response.Error != "" { - return fmt.Errorf(response.Error) - } - if response.Data.Exception != "" { - return fmt.Errorf(response.Data.Exception) - } - tx := response.Data.Tx - if bytes.Compare(tx.Input.Address, byteAddr) != 0 { - return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, byteAddr) - } - if tx.Input.Amount != amt { - return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt) - } - ret := response.Data.Return - if bytes.Compare(ret, returnCode) != 0 { - return fmt.Errorf("Create did not return correct byte code for new contract. Got %x, expected %x", ret, returnCode) - } - return nil - }) - // get the return value from a call data := []byte{0x1} // just needs to be non empty for this to be a CallTx waitForEvent(t, con, eid, true, func() { broadcastTx(t, "JSONRPC", byteAddr, contractAddr, data, byteKey, amt, 1000, 1000) - }, func(eid string, b []byte) error { - // unmarshall and assert somethings - var response struct { - Event string - Data struct { - Tx types.CallTx - Return []byte - Exception string - } - Error string - } - var err error - binary.ReadJSON(&response, b, &err) - if err != nil { - return err - } - if response.Error != "" { - return fmt.Errorf(response.Error) - } - ret := response.Data.Return - if bytes.Compare(ret, returnVal) != 0 { - return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnVal) - } - return nil - }) + }, unmarshalValidateCall(amt, returnVal)) +} + +// create two contracts, one of which calls the other +func TestWSCallCall(t *testing.T) { + byteAddr, _ := hex.DecodeString(userAddr) + con := newWSCon(t) + amt := uint64(10000) + code, _, returnVal := simpleContract() + txid := new([]byte) + + // deploy the two contracts + _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + contractAddr1 := receipt.ContractAddr + code, _, _ = simpleCallContract(contractAddr1) + _, receipt = broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + contractAddr2 := receipt.ContractAddr + + // susbscribe to the new contracts + amt = uint64(10001) + eid1 := types.EventStringAccReceive(contractAddr1) + eid2 := types.EventStringAccReceive(contractAddr2) + subscribe(t, con, eid1) + subscribe(t, con, eid2) + defer func() { + unsubscribe(t, con, eid1) + unsubscribe(t, con, eid2) + con.Close() + }() + // call contract2, which should call contract1, and wait for ev1 + data := []byte{0x1} // just needs to be non empty for this to be a CallTx + waitForEvent(t, con, eid1, true, func() { + tx, _ := broadcastTx(t, "JSONRPC", byteAddr, contractAddr2, data, byteKey, amt, 1000, 1000) + *txid = account.SignBytes(tx) + }, unmarshalValidateCallCall(byteAddr, returnVal, txid)) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 192c132bb..9cb481afb 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -280,7 +280,7 @@ func checkTx(t *testing.T, fromAddr []byte, priv *account.PrivAccount, tx *types } // simple contract returns 5 + 6 = 0xb -func simpleCallContract() ([]byte, []byte, []byte) { +func simpleContract() ([]byte, []byte, []byte) { // this is the code we want to run when the contract is called contractCode := []byte{0x60, 0x5, 0x60, 0x6, 0x1, 0x60, 0x0, 0x52, 0x60, 0x20, 0x60, 0x0, 0xf3} // the is the code we need to return the contractCode when the contract is initialized @@ -293,5 +293,32 @@ func simpleCallContract() ([]byte, []byte, []byte) { // return whats in memory //code = append(code, []byte{0x60, byte(32 - lenCode), 0x60, byte(lenCode), 0xf3}...) code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) + // return init code, contract code, expected return + return code, contractCode, LeftPadBytes([]byte{0xb}, 32) +} + +// simple call contract calls another contract +func simpleCallContract(addr []byte) ([]byte, []byte, []byte) { + gas1, gas2 := byte(0x1), byte(0x1) + value := byte(0x1) + inOff, inSize := byte(0x0), byte(0x0) // no call data + retOff, retSize := byte(0x0), byte(0x20) + // this is the code we want to run (call a contract and return) + contractCode := []byte{0x60, retSize, 0x60, retOff, 0x60, inSize, 0x60, inOff, 0x60, value, 0x73} + contractCode = append(contractCode, addr...) + contractCode = append(contractCode, []byte{0x61, gas1, gas2, 0xf1, 0x60, 0x20, 0x60, 0x0, 0xf3}...) + + // the is the code we need to return; the contractCode when the contract is initialized + // it should copy the code from the input into memory + lenCode := len(contractCode) + memOff := byte(0x0) + inOff = byte(0xc) // length of code before codeContract + length := byte(lenCode) + + code := []byte{0x60, length, 0x60, inOff, 0x60, memOff, 0x37} + // return whats in memory + code = append(code, []byte{0x60, byte(lenCode), 0x60, 0x0, 0xf3}...) + code = append(code, contractCode...) + // return init code, contract code, expected return return code, contractCode, LeftPadBytes([]byte{0xb}, 32) } diff --git a/rpc/test/tests.go b/rpc/test/tests.go index 569294d32..35a2b70c1 100644 --- a/rpc/test/tests.go +++ b/rpc/test/tests.go @@ -159,7 +159,7 @@ func testCall(t *testing.T, typ string) { // create the contract amt := uint64(6969) - code, _, _ := simpleCallContract() + code, _, _ := simpleContract() _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) if receipt.CreatesContract == 0 { t.Fatal("This tx creates a contract") diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers.go new file mode 100644 index 000000000..77dd059f9 --- /dev/null +++ b/rpc/test/ws_helpers.go @@ -0,0 +1,143 @@ +package rpc + +import ( + "bytes" + "fmt" + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/vm" + "testing" +) + +func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { + var initBlockN uint + for i := 0; i < 2; i++ { + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + block, err := unmarshalResponseNewBlock(b) + if err != nil { + return err + } + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+uint(i) { + return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) + } + } + + return nil + }) + } +} + +func unmarshalValidateSend(amt uint64, toAddr []byte) func(string, []byte) error { + return func(eid string, b []byte) error { + // unmarshal and assert correctness + var response struct { + Event string + Data types.SendTx + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if eid != response.Event { + return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid) + } + tx := response.Data + if bytes.Compare(tx.Inputs[0].Address, byteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, byteAddr) + } + if tx.Inputs[0].Amount != amt { + return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt) + } + if bytes.Compare(tx.Outputs[0].Address, toAddr) != 0 { + return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, byteAddr) + } + return nil + } +} + +func unmarshalValidateCall(amt uint64, returnCode []byte) func(string, []byte) error { + return func(eid string, b []byte) error { + // unmarshall and assert somethings + var response struct { + Event string + Data struct { + Tx types.CallTx + Return []byte + Exception string + } + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if response.Data.Exception != "" { + return fmt.Errorf(response.Data.Exception) + } + tx := response.Data.Tx + if bytes.Compare(tx.Input.Address, byteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, byteAddr) + } + if tx.Input.Amount != amt { + return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt) + } + ret := response.Data.Return + if bytes.Compare(ret, returnCode) != 0 { + return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode) + } + return nil + } +} + +func unmarshalValidateCallCall(origin, returnCode []byte, txid *[]byte) func(string, []byte) error { + return func(eid string, b []byte) error { + // unmarshall and assert somethings + var response struct { + Event string + Data struct { + CallData *vm.CallData + Origin []byte + TxId []byte + Return []byte + Exception string + } + Error string + } + var err error + binary.ReadJSON(&response, b, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + if response.Data.Exception != "" { + return fmt.Errorf(response.Data.Exception) + } + if bytes.Compare(response.Data.Origin, origin) != 0 { + return fmt.Errorf("Origin does not match up! Got %x, expected %x", response.Data.Origin, origin) + } + ret := response.Data.Return + if bytes.Compare(ret, returnCode) != 0 { + return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode) + } + if bytes.Compare(response.Data.TxId, *txid) != 0 { + return fmt.Errorf("TxIds do not match up! Got %x, expected %x", response.Data.TxId, *txid) + } + // calldata := response.Data.CallData + return nil + } +} From a1c5e32d76aeb48694290f0305235426a2d701aa Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 15 Apr 2015 19:14:08 -0700 Subject: [PATCH 09/12] fixes from review with jae --- account/account.go | 6 ++ consensus/state.go | 1 + rpc/handlers.go | 177 +++++++++++++++++++------------------ rpc/http_server.go | 5 +- rpc/test/client_ws_test.go | 32 +++---- rpc/test/helpers.go | 14 +-- rpc/test/tests.go | 29 +++--- rpc/test/ws_helpers.go | 19 ++-- state/block_cache.go | 6 +- state/execution.go | 17 +--- types/events.go | 27 ++++++ vm/types.go | 8 -- vm/vm.go | 12 +-- 13 files changed, 176 insertions(+), 177 deletions(-) diff --git a/account/account.go b/account/account.go index 520afd70c..235da8583 100644 --- a/account/account.go +++ b/account/account.go @@ -6,6 +6,7 @@ import ( "io" "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/merkle" ) // Signable is an interface for all signable things. @@ -24,6 +25,11 @@ func SignBytes(o Signable) []byte { return buf.Bytes() } +// HashSignBytes is a convenience method for getting the hash of the bytes of a signable +func HashSignBytes(o Signable) []byte { + return merkle.HashFromBinary(SignBytes(o)) +} + //----------------------------------------------------------------------------- // Account resides in the application state, and is mutated by transactions diff --git a/consensus/state.go b/consensus/state.go index a9db89a5c..0b698a311 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -445,6 +445,7 @@ ACTION_LOOP: // cs.Step is at RoundStepNewHeight or RoundStepNewRound. newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) + // TODO: go fire events from event cache scheduleNextAction() continue ACTION_LOOP } else { diff --git a/rpc/handlers.go b/rpc/handlers.go index 37d817ef8..d3b969189 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net/http" "reflect" + "sync/atomic" "time" ) @@ -25,8 +26,8 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { // websocket endpoint - w := NewWebsocketManager(evsw) - mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) + wm := NewWebsocketManager(evsw) + mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler)) } //------------------------------------- @@ -193,7 +194,7 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { const ( WSConnectionReaperSeconds = 5 - MaxFailedSendsSeconds = 10 + MaxFailedSends = 10 WriteChanBufferSize = 10 ) @@ -214,103 +215,76 @@ type WSResponse struct { // contains the listeners id type Connection struct { id string - wsCon *websocket.Conn + wsConn *websocket.Conn writeChan chan WSResponse - quitChan chan struct{} failedSends uint + started uint32 + stopped uint32 + + evsw *events.EventSwitch } // new websocket connection wrapper -func NewConnection(con *websocket.Conn) *Connection { +func NewConnection(wsConn *websocket.Conn) *Connection { return &Connection{ - id: con.RemoteAddr().String(), - wsCon: con, + id: wsConn.RemoteAddr().String(), + wsConn: wsConn, writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full } } +// start the connection and hand her the event switch +func (con *Connection) Start(evsw *events.EventSwitch) { + if atomic.CompareAndSwapUint32(&con.started, 0, 1) { + con.evsw = evsw + + // read subscriptions/unsubscriptions to events + go con.read() + // write responses + con.write() + } +} + // close the connection -func (c *Connection) Close() { - c.wsCon.Close() - close(c.writeChan) - close(c.quitChan) -} - -// main manager for all websocket connections -// holds the event switch -type WebsocketManager struct { - websocket.Upgrader - ew *events.EventSwitch - cons map[string]*Connection -} - -func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager { - return &WebsocketManager{ - ew: ew, - cons: make(map[string]*Connection), - Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // TODO - return true - }, - }, +func (con *Connection) Stop() { + if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { + con.wsConn.Close() + close(con.writeChan) } } -func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) { - conn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - log.Error("Failed to upgrade to websocket connection", "error", err) - return - } - wm.handleWebsocket(conn) - -} - -func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) { - // register connection - c := NewConnection(con) - w.cons[c.id] = c - log.Info("New websocket connection", "origin", c.id) - - // read subscriptions/unsubscriptions to events - go w.read(c) - // write responses - w.write(c) -} - // read from the socket and subscribe to or unsubscribe from events -func (w *WebsocketManager) read(con *Connection) { +func (con *Connection) read() { reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { case <-reaper: - if con.failedSends > MaxFailedSendsSeconds { + if con.failedSends > MaxFailedSends { // sending has failed too many times. // kill the connection - con.quitChan <- struct{}{} + con.Stop() + return } default: var in []byte - _, in, err := con.wsCon.ReadMessage() + _, in, err := con.wsConn.ReadMessage() if err != nil { // an error reading the connection, - // so kill the connection - con.quitChan <- struct{}{} + // kill the connection + con.Stop() + return } var req WSRequest err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) con.writeChan <- WSResponse{Error: errStr} + continue } switch req.Type { case "subscribe": log.Info("New event subscription", "con id", con.id, "event", req.Event) - w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { + con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { resp := WSResponse{ Event: req.Event, Data: msg, @@ -328,9 +302,9 @@ func (w *WebsocketManager) read(con *Connection) { }) case "unsubscribe": if req.Event != "" { - w.ew.RemoveListenerForEvent(req.Event, con.id) + con.evsw.RemoveListenerForEvent(req.Event, con.id) } else { - w.ew.RemoveListener(con.id) + con.evsw.RemoveListener(con.id) } default: con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type} @@ -340,33 +314,64 @@ func (w *WebsocketManager) read(con *Connection) { } } -// receives on a write channel and writes out to the socket -func (w *WebsocketManager) write(con *Connection) { +// receives on a write channel and writes out on the socket +func (con *Connection) write() { n, err := new(int64), new(error) for { - select { - case msg := <-con.writeChan: - buf := new(bytes.Buffer) - binary.WriteJSON(msg, buf, n, err) - if *err != nil { - log.Error("Failed to write JSON WSResponse", "error", err) - } else { - //websocket.Message.Send(con.wsCon, buf.Bytes()) - if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { - log.Error("Failed to write response on websocket", "error", err) - } - } - case <-con.quitChan: - w.closeConn(con) + msg, more := <-con.writeChan + if !more { + // the channel was closed, so ensure + // connection is stopped and return + con.Stop() return } + buf := new(bytes.Buffer) + binary.WriteJSON(msg, buf, n, err) + if *err != nil { + log.Error("Failed to write JSON WSResponse", "error", err) + } else { + if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { + log.Error("Failed to write response on websocket", "error", err) + con.Stop() + return + } + } } } -// close a connection and delete from manager -func (w *WebsocketManager) closeConn(con *Connection) { - con.Close() - delete(w.cons, con.id) +// main manager for all websocket connections +// holds the event switch +type WebsocketManager struct { + websocket.Upgrader + evsw *events.EventSwitch +} + +func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager { + return &WebsocketManager{ + evsw: evsw, + Upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + // TODO + return true + }, + }, + } +} + +func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + log.Error("Failed to upgrade to websocket connection", "error", err) + return + } + + // register connection + con := NewConnection(wsConn) + log.Info("New websocket connection", "origin", con.id) + con.Start(wm.evsw) } // rpc.websocket diff --git a/rpc/http_server.go b/rpc/http_server.go index 3b09ac373..1cb79bc6d 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -53,7 +53,7 @@ func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { func RecoverAndLogHandler(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Wrap the ResponseWriter to remember the status - rww := &ResponseWriterWrapper{-1, w, w.(http.Hijacker)} + rww := &ResponseWriterWrapper{-1, w} begin := time.Now() // Common headers @@ -100,7 +100,6 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler { type ResponseWriterWrapper struct { Status int http.ResponseWriter - hj http.Hijacker // necessary for websocket upgrades } func (w *ResponseWriterWrapper) WriteHeader(status int) { @@ -110,7 +109,7 @@ func (w *ResponseWriterWrapper) WriteHeader(status int) { // implements http.Hijacker func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return w.hj.Hijack() + return w.ResponseWriter.(http.Hijacker).Hijack() } // Stick it as a deferred statement in gouroutines to prevent the program from crashing. diff --git a/rpc/test/client_ws_test.go b/rpc/test/client_ws_test.go index b8ae53123..b342d390e 100644 --- a/rpc/test/client_ws_test.go +++ b/rpc/test/client_ws_test.go @@ -1,7 +1,6 @@ package rpc import ( - "encoding/hex" "encoding/json" "fmt" "github.com/gorilla/websocket" @@ -172,7 +171,7 @@ func TestWSSend(t *testing.T) { amt := uint64(100) con := newWSCon(t) - eidInput := types.EventStringAccInput(byteAddr) + eidInput := types.EventStringAccInput(userByteAddr) eidOutput := types.EventStringAccOutput(toAddr) subscribe(t, con, eidInput) subscribe(t, con, eidOutput) @@ -182,7 +181,7 @@ func TestWSSend(t *testing.T) { con.Close() }() waitForEvent(t, con, eidInput, true, func() { - broadcastTx(t, "JSONRPC", byteAddr, toAddr, nil, byteKey, amt, 0, 0) + broadcastTx(t, "JSONRPC", userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) }, unmarshalValidateSend(amt, toAddr)) waitForEvent(t, con, eidOutput, true, func() {}, unmarshalValidateSend(amt, toAddr)) } @@ -190,7 +189,7 @@ func TestWSSend(t *testing.T) { // ensure events are only fired once for a given transaction func TestWSDoubleFire(t *testing.T) { con := newWSCon(t) - eid := types.EventStringAccInput(byteAddr) + eid := types.EventStringAccInput(userByteAddr) subscribe(t, con, eid) defer func() { unsubscribe(t, con, eid) @@ -200,7 +199,7 @@ func TestWSDoubleFire(t *testing.T) { toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} // broadcast the transaction, wait to hear about it waitForEvent(t, con, eid, true, func() { - broadcastTx(t, "JSONRPC", byteAddr, toAddr, nil, byteKey, amt, 0, 0) + broadcastTx(t, "JSONRPC", userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) }, func(eid string, b []byte) error { return nil }) @@ -213,9 +212,8 @@ func TestWSDoubleFire(t *testing.T) { // create a contract, wait for the event, and send it a msg, validate the return func TestWSCallWait(t *testing.T) { - byteAddr, _ := hex.DecodeString(userAddr) con := newWSCon(t) - eid1 := types.EventStringAccInput(byteAddr) + eid1 := types.EventStringAccInput(userByteAddr) subscribe(t, con, eid1) defer func() { unsubscribe(t, con, eid1) @@ -226,7 +224,7 @@ func TestWSCallWait(t *testing.T) { var contractAddr []byte // wait for the contract to be created waitForEvent(t, con, eid1, true, func() { - _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) contractAddr = receipt.ContractAddr }, unmarshalValidateCall(amt, returnCode)) @@ -241,19 +239,18 @@ func TestWSCallWait(t *testing.T) { // get the return value from a call data := []byte{0x1} // just needs to be non empty for this to be a CallTx waitForEvent(t, con, eid2, true, func() { - broadcastTx(t, "JSONRPC", byteAddr, contractAddr, data, byteKey, amt, 1000, 1000) + broadcastTx(t, "JSONRPC", userByteAddr, contractAddr, data, userBytePriv, amt, 1000, 1000) }, unmarshalValidateCall(amt, returnVal)) } // create a contract and send it a msg without waiting. wait for contract event // and validate return func TestWSCallNoWait(t *testing.T) { - byteAddr, _ := hex.DecodeString(userAddr) con := newWSCon(t) amt := uint64(10000) code, _, returnVal := simpleContract() - _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) contractAddr := receipt.ContractAddr // susbscribe to the new contract @@ -267,23 +264,22 @@ func TestWSCallNoWait(t *testing.T) { // get the return value from a call data := []byte{0x1} // just needs to be non empty for this to be a CallTx waitForEvent(t, con, eid, true, func() { - broadcastTx(t, "JSONRPC", byteAddr, contractAddr, data, byteKey, amt, 1000, 1000) + broadcastTx(t, "JSONRPC", userByteAddr, contractAddr, data, userBytePriv, amt, 1000, 1000) }, unmarshalValidateCall(amt, returnVal)) } // create two contracts, one of which calls the other func TestWSCallCall(t *testing.T) { - byteAddr, _ := hex.DecodeString(userAddr) con := newWSCon(t) amt := uint64(10000) code, _, returnVal := simpleContract() txid := new([]byte) // deploy the two contracts - _, receipt := broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) contractAddr1 := receipt.ContractAddr code, _, _ = simpleCallContract(contractAddr1) - _, receipt = broadcastTx(t, "JSONRPC", byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt = broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) contractAddr2 := receipt.ContractAddr // susbscribe to the new contracts @@ -300,7 +296,7 @@ func TestWSCallCall(t *testing.T) { // call contract2, which should call contract1, and wait for ev1 data := []byte{0x1} // just needs to be non empty for this to be a CallTx waitForEvent(t, con, eid1, true, func() { - tx, _ := broadcastTx(t, "JSONRPC", byteAddr, contractAddr2, data, byteKey, amt, 1000, 1000) - *txid = account.SignBytes(tx) - }, unmarshalValidateCallCall(byteAddr, returnVal, txid)) + tx, _ := broadcastTx(t, "JSONRPC", userByteAddr, contractAddr2, data, userBytePriv, amt, 1000, 1000) + *txid = account.HashSignBytes(tx) + }, unmarshalValidateCallCall(userByteAddr, returnVal, txid)) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 9cb481afb..05d863d53 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -28,10 +28,10 @@ var ( mempoolCount = 0 - userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" - userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" - userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" - byteAddr, byteKey = initUserBytes() + userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" + userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userByteAddr, userBytePriv = initUserBytes() clients = map[string]cclient.Client{ "JSONRPC": cclient.NewClient(requestAddr, "JSONRPC"), @@ -39,11 +39,13 @@ var ( } ) +// returns byte versions of address and private key +// type [64]byte needed by account.GenPrivAccountFromKey func initUserBytes() ([]byte, [64]byte) { byteAddr, _ := hex.DecodeString(userAddr) var byteKey [64]byte - oh, _ := hex.DecodeString(userPriv) - copy(byteKey[:], oh) + userPrivByteSlice, _ := hex.DecodeString(userPriv) + copy(byteKey[:], userPrivByteSlice) return byteAddr, byteKey } diff --git a/rpc/test/tests.go b/rpc/test/tests.go index 35a2b70c1..f95a42d4b 100644 --- a/rpc/test/tests.go +++ b/rpc/test/tests.go @@ -2,14 +2,12 @@ package rpc import ( "bytes" - "encoding/hex" "fmt" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "testing" - //"time" ) func testStatus(t *testing.T, typ string) { @@ -38,35 +36,34 @@ func testGenPriv(t *testing.T, typ string) { } func testGetAccount(t *testing.T, typ string) { - byteAddr, _ := hex.DecodeString(userAddr) - acc := getAccount(t, typ, byteAddr) + acc := getAccount(t, typ, userByteAddr) if acc == nil { t.Fatalf("Account was nil") } - if bytes.Compare(acc.Address, byteAddr) != 0 { - t.Fatalf("Failed to get correct account. Got %x, expected %x", acc.Address, byteAddr) + if bytes.Compare(acc.Address, userByteAddr) != 0 { + t.Fatalf("Failed to get correct account. Got %x, expected %x", acc.Address, userByteAddr) } } func testSignedTx(t *testing.T, typ string) { amt := uint64(100) toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} - tx, priv := signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) - checkTx(t, byteAddr, priv, tx.(*types.SendTx)) + tx, priv := signTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + checkTx(t, userByteAddr, priv, tx.(*types.SendTx)) toAddr = []byte{20, 143, 24, 63, 16, 17, 83, 29, 90, 91, 52, 2, 0, 41, 190, 121, 122, 34, 86, 54} - tx, priv = signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) - checkTx(t, byteAddr, priv, tx.(*types.SendTx)) + tx, priv = signTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + checkTx(t, userByteAddr, priv, tx.(*types.SendTx)) toAddr = []byte{0, 0, 4, 0, 0, 4, 0, 0, 4, 91, 52, 2, 0, 41, 190, 121, 122, 34, 86, 54} - tx, priv = signTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) - checkTx(t, byteAddr, priv, tx.(*types.SendTx)) + tx, priv = signTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) + checkTx(t, userByteAddr, priv, tx.(*types.SendTx)) } func testBroadcastTx(t *testing.T, typ string) { amt := uint64(100) toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54} - tx, receipt := broadcastTx(t, typ, byteAddr, toAddr, nil, byteKey, amt, 0, 0) + tx, receipt := broadcastTx(t, typ, userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0) if receipt.CreatesContract > 0 { t.Fatal("This tx does not create a contract") } @@ -102,7 +99,7 @@ func testGetStorage(t *testing.T, typ string) { amt := uint64(1100) code := []byte{0x60, 0x5, 0x60, 0x1, 0x55} - _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt := broadcastTx(t, typ, userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) if receipt.CreatesContract == 0 { t.Fatal("This tx creates a contract") } @@ -115,7 +112,6 @@ func testGetStorage(t *testing.T, typ string) { } // allow it to get mined - //time.Sleep(time.Second * 20) waitForEvent(t, con, eid, true, func() { }, func(eid string, b []byte) error { return nil @@ -160,7 +156,7 @@ func testCall(t *testing.T, typ string) { // create the contract amt := uint64(6969) code, _, _ := simpleContract() - _, receipt := broadcastTx(t, typ, byteAddr, nil, code, byteKey, amt, 1000, 1000) + _, receipt := broadcastTx(t, typ, userByteAddr, nil, code, userBytePriv, amt, 1000, 1000) if receipt.CreatesContract == 0 { t.Fatal("This tx creates a contract") } @@ -173,7 +169,6 @@ func testCall(t *testing.T, typ string) { } // allow it to get mined - //time.Sleep(time.Second * 20) waitForEvent(t, con, eid, true, func() { }, func(eid string, b []byte) error { return nil diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers.go index 77dd059f9..880220a32 100644 --- a/rpc/test/ws_helpers.go +++ b/rpc/test/ws_helpers.go @@ -6,7 +6,6 @@ import ( "github.com/gorilla/websocket" "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tendermint/vm" "testing" ) @@ -51,14 +50,14 @@ func unmarshalValidateSend(amt uint64, toAddr []byte) func(string, []byte) error return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid) } tx := response.Data - if bytes.Compare(tx.Inputs[0].Address, byteAddr) != 0 { - return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, byteAddr) + if bytes.Compare(tx.Inputs[0].Address, userByteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, userByteAddr) } if tx.Inputs[0].Amount != amt { return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt) } if bytes.Compare(tx.Outputs[0].Address, toAddr) != 0 { - return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, byteAddr) + return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, userByteAddr) } return nil } @@ -88,8 +87,8 @@ func unmarshalValidateCall(amt uint64, returnCode []byte) func(string, []byte) e return fmt.Errorf(response.Data.Exception) } tx := response.Data.Tx - if bytes.Compare(tx.Input.Address, byteAddr) != 0 { - return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, byteAddr) + if bytes.Compare(tx.Input.Address, userByteAddr) != 0 { + return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, userByteAddr) } if tx.Input.Amount != amt { return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt) @@ -107,13 +106,7 @@ func unmarshalValidateCallCall(origin, returnCode []byte, txid *[]byte) func(str // unmarshall and assert somethings var response struct { Event string - Data struct { - CallData *vm.CallData - Origin []byte - TxId []byte - Return []byte - Exception string - } + Data types.EventMsgCall Error string } var err error diff --git a/state/block_cache.go b/state/block_cache.go index c9e0348bf..1fc09f1ac 100644 --- a/state/block_cache.go +++ b/state/block_cache.go @@ -53,9 +53,7 @@ func (cache *BlockCache) GetAccount(addr []byte) *ac.Account { return acc } else { acc = cache.backend.GetAccount(addr) - if acc != nil { - cache.accounts[string(addr)] = accountInfo{acc, nil, false, false} - } + cache.accounts[string(addr)] = accountInfo{acc, nil, false, false} return acc } } @@ -189,7 +187,7 @@ func (cache *BlockCache) Sync() { } } else { if acc == nil { - panic(Fmt("Account should not be nil for addr: %x", addrStr)) + continue } if storage != nil { newStorageRoot := storage.Save() diff --git a/state/execution.go b/state/execution.go index 1a89fecce..918e9122b 100644 --- a/state/execution.go +++ b/state/execution.go @@ -426,12 +426,12 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe. txCache.UpdateAccount(callee) // because we adjusted by input above. - vmach := vm.NewVM(txCache, params, caller.Address, account.SignBytes(tx)) + vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx)) vmach.SetEventSwitch(_s.evsw) // NOTE: Call() transfers the value from caller to callee iff call succeeds. ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) - var exception string + exception := "" if err != nil { exception = err.Error() // Failure. Charge the gas fee. The 'value' was otherwise not transferred. @@ -440,7 +440,6 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro blockCache.UpdateAccount(inAcc) // Throw away 'txCache' which holds incomplete updates (don't sync it). } else { - exception = "" log.Debug("Successful execution") // Success if createAccount { @@ -455,17 +454,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro if fireEvents { // Fire Events for sender and receiver // a separate event will be fired from vm for each - _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), struct { - Tx types.Tx - Return []byte - Exception string - }{tx, ret, exception}) + _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception}) - _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), struct { - Tx types.Tx - Return []byte - Exception string - }{tx, ret, exception}) + _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception}) } } else { // The mempool does not call txs until diff --git a/types/events.go b/types/events.go index 28965ecc9..0345efcb1 100644 --- a/types/events.go +++ b/types/events.go @@ -4,6 +4,8 @@ import ( "fmt" ) +// Functions to generate eventId strings + func EventStringAccInput(addr []byte) string { return fmt.Sprintf("Acc/%x/Input", addr) } @@ -40,6 +42,31 @@ func EventStringFork() string { return "Fork" } +// Most event messages are basic types (a block, a transaction) +// but some (an input to a call tx or a receive) are more exotic: + +type EventMsgCallTx struct { + Tx Tx + Return []byte + Exception string +} + +type CallData struct { + Caller []byte + Callee []byte + Data []byte + Value uint64 + Gas uint64 +} + +type EventMsgCall struct { + CallData *CallData + Origin []byte + TxId []byte + Return []byte + Exception string +} + /* Acc/XYZ/Input -> full tx or {full tx, return value, exception} Acc/XYZ/Output -> full tx diff --git a/vm/types.go b/vm/types.go index 3d7b9239e..443b72c63 100644 --- a/vm/types.go +++ b/vm/types.go @@ -46,11 +46,3 @@ type Params struct { BlockTime int64 GasLimit uint64 } - -type CallData struct { - Caller []byte - Callee []byte - Data []byte - Value uint64 - Gas uint64 -} diff --git a/vm/vm.go b/vm/vm.go index fe184f212..50b95a19d 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -91,17 +91,11 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga panic("Could not return value to caller") } } - // if callDepth is 0 the event is fired from ExecTx (along with the Input invent) + // if callDepth is 0 the event is fired from ExecTx (along with the Input event) // otherwise, we fire from here. if vm.callDepth != 0 && vm.evsw != nil { - vm.evsw.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), struct { - CallData *CallData - Origin []byte - TxId []byte - Return []byte - Exception string - }{ - &CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas}, + vm.evsw.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{ + &types.CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas}, vm.origin.Prefix(20), vm.txid, output, From d27e0bbad578945bd8ece6f0386df8c9e98b2f57 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 15 Apr 2015 23:40:27 -0700 Subject: [PATCH 10/12] event cache and fireable interace --- blockchain/reactor.go | 4 ++-- consensus/reactor.go | 6 +++--- consensus/state.go | 18 ++++++++++------ events/event_cache.go | 39 +++++++++++++++++++++++++++++++++ events/events.go | 7 +++++- mempool/mempool.go | 4 ++-- mempool/reactor.go | 4 ++-- node/node.go | 6 +++--- p2p/pex_reactor.go | 4 ++-- state/execution.go | 50 ++++++++++++++++++++----------------------- state/state.go | 12 +++++------ vm/vm.go | 10 ++++----- 12 files changed, 105 insertions(+), 59 deletions(-) create mode 100644 events/event_cache.go diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7a213b0f0..489059dab 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -42,7 +42,7 @@ type BlockchainReactor struct { quit chan struct{} running uint32 - evsw *events.EventSwitch + evsw events.Fireable } func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor { @@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) BroadcastStatus() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { bcR.evsw = evsw } diff --git a/consensus/reactor.go b/consensus/reactor.go index 16ef2129f..dfad3e61b 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -40,7 +40,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState - evsw *events.EventSwitch + evsw events.Fireable } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { @@ -234,9 +234,9 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) { } // implements events.Eventable -func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { conR.evsw = evsw - conR.conS.SetEventSwitch(evsw) + conR.conS.SetFireable(evsw) } //-------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index 0b698a311..c1579c7e9 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -251,7 +251,8 @@ type ConsensusState struct { stagedState *sm.State // Cache result of staged block. lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. - evsw *events.EventSwitch + evsw events.Fireable + evc *events.EventCache // set in stageBlock and passed into state } func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { @@ -443,9 +444,12 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. - newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) - cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) - // TODO: go fire events from event cache + // fire some events! + go func() { + newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) + cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) + cs.evc.Flush() + }() scheduleNextAction() continue ACTION_LOOP } else { @@ -1032,6 +1036,9 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS // Create a copy of the state for staging stateCopy := cs.state.Copy() + // reset the event cache and pass it into the state + cs.evc = events.NewEventCache(cs.evsw) + stateCopy.SetFireable(cs.evc) // Commit block onto the copied state. // NOTE: Basic validation is done in state.AppendBlock(). @@ -1117,9 +1124,8 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty } // implements events.Eventable -func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { +func (cs *ConsensusState) SetFireable(evsw events.Fireable) { cs.evsw = evsw - cs.state.SetEventSwitch(evsw) } //----------------------------------------------------------------------------- diff --git a/events/event_cache.go b/events/event_cache.go new file mode 100644 index 000000000..e1af09fa4 --- /dev/null +++ b/events/event_cache.go @@ -0,0 +1,39 @@ +package events + +const ( + eventsBufferSize = 1000 +) + +// An EventCache buffers events for a Fireable +// All events are cached. Filtering happens on Flush +type EventCache struct { + evsw Fireable + events []eventInfo +} + +// Create a new EventCache with an EventSwitch as backend +func NewEventCache(evsw Fireable) *EventCache { + return &EventCache{ + evsw: evsw, + events: make([]eventInfo, eventsBufferSize), + } +} + +// a cached event +type eventInfo struct { + event string + msg interface{} +} + +// Cache an event to be fired upon finality. +func (evc *EventCache) FireEvent(event string, msg interface{}) { + // append to list + evc.events = append(evc.events, eventInfo{event, msg}) +} + +// Fire events by running evsw.FireEvent on all cached events. Blocks. +func (evc *EventCache) Flush() { + for _, ei := range evc.events { + evc.evsw.FireEvent(ei.event, ei.msg) + } +} diff --git a/events/events.go b/events/events.go index 46f01cb57..13aca2dac 100644 --- a/events/events.go +++ b/events/events.go @@ -8,7 +8,12 @@ import ( // reactors and other modules should export // this interface to become eventable type Eventable interface { - SetEventSwitch(*EventSwitch) + SetFireable(Fireable) +} + +// an event switch or cache implements fireable +type Fireable interface { + FireEvent(event string, msg interface{}) } type EventSwitch struct { diff --git a/mempool/mempool.go b/mempool/mempool.go index d5c6dad06..51cfab139 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache { func (mem *Mempool) AddTx(tx types.Tx) (err error) { mem.mtx.Lock() defer mem.mtx.Unlock() - err = sm.ExecTx(mem.cache, tx, false, false) + err = sm.ExecTx(mem.cache, tx, false, nil) if err != nil { log.Debug("AddTx() error", "tx", tx, "error", err) return err @@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { // Next, filter all txs that aren't valid given new state. validTxs := []types.Tx{} for _, tx := range txs { - err := sm.ExecTx(mem.cache, tx, false, false) + err := sm.ExecTx(mem.cache, tx, false, nil) if err == nil { log.Debug("Filter in, valid", "tx", tx) validTxs = append(validTxs, tx) diff --git a/mempool/reactor.go b/mempool/reactor.go index ae469da41..fbab16f84 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -24,7 +24,7 @@ type MempoolReactor struct { Mempool *Mempool - evsw *events.EventSwitch + evsw events.Fireable } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -114,7 +114,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { } // implements events.Eventable -func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index 531413402..311828349 100644 --- a/node/node.go +++ b/node/node.go @@ -81,7 +81,7 @@ func NewNode() *Node { // add the event switch to all services // they should all satisfy events.Eventable - SetEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) + SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) return &Node{ sw: sw, @@ -115,9 +115,9 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { for _, e := range eventables { - e.SetEventSwitch(evsw) + e.SetFireable(evsw) } } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 20e175fc5..02826db41 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -33,7 +33,7 @@ type PEXReactor struct { book *AddrBook - evsw *events.EventSwitch + evsw events.Fireable } func NewPEXReactor(book *AddrBook) *PEXReactor { @@ -211,7 +211,7 @@ func (pexR *PEXReactor) ensurePeers() { } // implements events.Eventable -func (pexR *PEXReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (pexR *PEXReactor) SetFireable(evsw events.Fireable) { pexR.evsw = evsw } diff --git a/state/execution.go b/state/execution.go index 918e9122b..f5f047c80 100644 --- a/state/execution.go +++ b/state/execution.go @@ -6,6 +6,7 @@ import ( "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/vm" ) @@ -13,7 +14,7 @@ import ( // NOTE: If an error occurs during block execution, state will be left // at an invalid state. Copy the state before calling ExecBlock! func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error { - err := execBlock(s, block, blockPartsHeader, true) + err := execBlock(s, block, blockPartsHeader) if err != nil { return err } @@ -29,7 +30,7 @@ func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // executes transactions of a block, does not check block.StateHash // NOTE: If an error occurs during block execution, state will be left // at an invalid state. Copy the state before calling execBlock! -func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader, fireEvents bool) error { +func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Basic block validation. err := block.ValidateBasic(s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime) if err != nil { @@ -111,7 +112,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // Commit each tx for _, tx := range block.Data.Txs { - err := ExecTx(blockCache, tx, true, fireEvents) + err := ExecTx(blockCache, tx, true, s.evc) if err != nil { return InvalidTxError{tx, err} } @@ -291,13 +292,11 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu // If the tx is invalid, an error will be returned. // Unlike ExecBlock(), state will not be altered. -func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) error { +func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) error { // TODO: do something with fees fees := uint64(0) _s := blockCache.State() // hack to access validators and event switch. - nilSwitch := _s.evsw == nil - fireEvents = fireEvents && !nilSwitch // Exec tx switch tx := tx_.(type) { @@ -328,16 +327,14 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro blockCache.UpdateAccount(acc) } - // If we're in a block (not mempool), - // fire event on all inputs and outputs - // see types/events.go for spec - if fireEvents { + // if the evc is nil, nothing will happen + if evc != nil { for _, i := range tx.Inputs { - _s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx) + evc.FireEvent(types.EventStringAccInput(i.Address), tx) } for _, o := range tx.Outputs { - _s.evsw.FireEvent(types.EventStringAccOutput(o.Address), tx) + evc.FireEvent(types.EventStringAccOutput(o.Address), tx) } } return nil @@ -427,7 +424,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe. txCache.UpdateAccount(callee) // because we adjusted by input above. vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx)) - vmach.SetEventSwitch(_s.evsw) + vmach.SetFireable(_s.evc) // NOTE: Call() transfers the value from caller to callee iff call succeeds. ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) @@ -451,12 +448,11 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Create a receipt from the ret and whether errored. log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) - if fireEvents { - // Fire Events for sender and receiver - // a separate event will be fired from vm for each - _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception}) - - _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception}) + // Fire Events for sender and receiver + // a separate event will be fired from vm for each additional call + if evc != nil { + evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception}) + evc.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception}) } } else { // The mempool does not call txs until @@ -525,8 +521,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro if !added { panic("Failed to add validator") } - if fireEvents { - _s.evsw.FireEvent(types.EventStringBond(), tx) + if evc != nil { + evc.FireEvent(types.EventStringBond(), tx) } return nil @@ -550,8 +546,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Good! _s.unbondValidator(val) - if fireEvents { - _s.evsw.FireEvent(types.EventStringUnbond(), tx) + if evc != nil { + evc.FireEvent(types.EventStringUnbond(), tx) } return nil @@ -575,8 +571,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Good! _s.rebondValidator(val) - if fireEvents { - _s.evsw.FireEvent(types.EventStringRebond(), tx) + if evc != nil { + evc.FireEvent(types.EventStringRebond(), tx) } return nil @@ -621,8 +617,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Good! (Bad validator!) _s.destroyValidator(accused) - if fireEvents { - _s.evsw.FireEvent(types.EventStringDupeout(), tx) + if evc != nil { + evc.FireEvent(types.EventStringDupeout(), tx) } return nil diff --git a/state/state.go b/state/state.go index 4bd3da422..01270a241 100644 --- a/state/state.go +++ b/state/state.go @@ -36,7 +36,7 @@ type State struct { accounts merkle.Tree // Shouldn't be accessed directly. validatorInfos merkle.Tree // Shouldn't be accessed directly. - evsw *events.EventSwitch + evc events.Fireable // typically an events.EventCache } func LoadState(db dbm.DB) *State { @@ -101,7 +101,6 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), - evsw: s.evsw, } } @@ -119,7 +118,8 @@ func (s *State) Hash() []byte { // Mutates the block in place and updates it with new state hash. func (s *State) SetBlockStateHash(block *types.Block) error { sCopy := s.Copy() - err := execBlock(sCopy, block, types.PartSetHeader{}, false) // don't fire events + // sCopy has no event cache in it, so this won't fire events + err := execBlock(sCopy, block, types.PartSetHeader{}) if err != nil { return err } @@ -268,9 +268,9 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) { // State.storage //------------------------------------- -// implements events.Eventable -func (s *State) SetEventSwitch(evsw *events.EventSwitch) { - s.evsw = evsw +// Implements events.Eventable. Typically uses events.EventCache +func (s *State) SetFireable(evc events.Fireable) { + s.evc = evc } //----------------------------------------------------------------------------- diff --git a/vm/vm.go b/vm/vm.go index 50b95a19d..79b2a3345 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -49,7 +49,7 @@ type VM struct { callDepth int - evsw *events.EventSwitch + evc events.Fireable } func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM { @@ -63,8 +63,8 @@ func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM { } // satisfies events.Eventable -func (vm *VM) SetEventSwitch(evsw *events.EventSwitch) { - vm.evsw = evsw +func (vm *VM) SetFireable(evc events.Fireable) { + vm.evc = evc } // CONTRACT appState is aware of caller and callee, so we can just mutate them. @@ -93,8 +93,8 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga } // if callDepth is 0 the event is fired from ExecTx (along with the Input event) // otherwise, we fire from here. - if vm.callDepth != 0 && vm.evsw != nil { - vm.evsw.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{ + if vm.callDepth != 0 && vm.evc != nil { + vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{ &types.CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas}, vm.origin.Prefix(20), vm.txid, From 75049ec82746f3b14bad7cab9c98da85e1dd71e6 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 16 Apr 2015 00:03:31 -0700 Subject: [PATCH 11/12] major flippage for vm addrs. now left padded words for tx_cache --- common/word.go | 9 +++++---- state/block_cache.go | 10 +++++----- state/tx_cache.go | 10 +++++----- vm/vm.go | 18 ++++++++---------- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/common/word.go b/common/word.go index d67730e13..7c5c8a81a 100644 --- a/common/word.go +++ b/common/word.go @@ -13,10 +13,11 @@ var ( type Word256 [32]byte -func (w Word256) String() string { return string(w[:]) } -func (w Word256) Copy() Word256 { return w } -func (w Word256) Bytes() []byte { return w[:] } // copied. -func (w Word256) Prefix(n int) []byte { return w[:n] } +func (w Word256) String() string { return string(w[:]) } +func (w Word256) Copy() Word256 { return w } +func (w Word256) Bytes() []byte { return w[:] } // copied. +func (w Word256) Prefix(n int) []byte { return w[:n] } +func (w Word256) Postfix(n int) []byte { return w[32-n:] } func (w Word256) IsZero() bool { accum := byte(0) for _, byt := range w { diff --git a/state/block_cache.go b/state/block_cache.go index 1fc09f1ac..277754541 100644 --- a/state/block_cache.go +++ b/state/block_cache.go @@ -91,13 +91,13 @@ func (cache *BlockCache) GetStorage(addr Word256, key Word256) (value Word256) { } // Get or load storage - acc, storage, removed, dirty := cache.accounts[string(addr.Prefix(20))].unpack() + acc, storage, removed, dirty := cache.accounts[string(addr.Postfix(20))].unpack() if removed { panic("GetStorage() on removed account") } if acc != nil && storage == nil { storage = makeStorage(cache.db, acc.StorageRoot) - cache.accounts[string(addr.Prefix(20))] = accountInfo{acc, storage, false, dirty} + cache.accounts[string(addr.Postfix(20))] = accountInfo{acc, storage, false, dirty} } else if acc == nil { return Zero256 } @@ -114,7 +114,7 @@ func (cache *BlockCache) GetStorage(addr Word256, key Word256) (value Word256) { // NOTE: Set value to zero to removed from the trie. func (cache *BlockCache) SetStorage(addr Word256, key Word256, value Word256) { - _, _, removed, _ := cache.accounts[string(addr.Prefix(20))].unpack() + _, _, removed, _ := cache.accounts[string(addr.Postfix(20))].unpack() if removed { panic("SetStorage() on a removed account") } @@ -146,7 +146,7 @@ func (cache *BlockCache) Sync() { for _, storageKey := range storageKeys { addr, key := Tuple256Split(storageKey) if addr != curAddr || curAcc == nil { - acc, storage, removed, _ := cache.accounts[string(addr.Prefix(20))].unpack() + acc, storage, removed, _ := cache.accounts[string(addr.Postfix(20))].unpack() if storage == nil { storage = makeStorage(cache.db, acc.StorageRoot) } @@ -166,7 +166,7 @@ func (cache *BlockCache) Sync() { curStorage.Remove(key.Bytes()) } else { curStorage.Set(key.Bytes(), value.Bytes()) - cache.accounts[string(addr.Prefix(20))] = accountInfo{curAcc, curStorage, false, true} + cache.accounts[string(addr.Postfix(20))] = accountInfo{curAcc, curStorage, false, true} } } diff --git a/state/tx_cache.go b/state/tx_cache.go index 425a1f816..dbf865bab 100644 --- a/state/tx_cache.go +++ b/state/tx_cache.go @@ -31,7 +31,7 @@ func (cache *TxCache) GetAccount(addr Word256) *vm.Account { if removed { return nil } else if acc == nil { - acc2 := cache.backend.GetAccount(addr.Prefix(20)) + acc2 := cache.backend.GetAccount(addr.Postfix(20)) if acc2 != nil { return toVMAccount(acc2) } @@ -68,7 +68,7 @@ func (cache *TxCache) CreateAccount(creator *vm.Account) *vm.Account { nonce := creator.Nonce creator.Nonce += 1 - addr := RightPadWord256(NewContractAddress(creator.Address.Prefix(20), nonce)) + addr := LeftPadWord256(NewContractAddress(creator.Address.Postfix(20), nonce)) // Create account from address. account, removed := vmUnpack(cache.accounts[addr]) @@ -128,7 +128,7 @@ func (cache *TxCache) Sync() { for addr, accInfo := range cache.accounts { acc, removed := vmUnpack(accInfo) if removed { - cache.backend.RemoveAccount(addr.Prefix(20)) + cache.backend.RemoveAccount(addr.Postfix(20)) } else { cache.backend.UpdateAccount(toStateAccount(acc)) } @@ -154,7 +154,7 @@ func NewContractAddress(caller []byte, nonce uint64) []byte { // Converts backend.Account to vm.Account struct. func toVMAccount(acc *ac.Account) *vm.Account { return &vm.Account{ - Address: RightPadWord256(acc.Address), + Address: LeftPadWord256(acc.Address), Balance: acc.Balance, Code: acc.Code, // This is crazy. Nonce: uint64(acc.Sequence), @@ -176,7 +176,7 @@ func toStateAccount(acc *vm.Account) *ac.Account { storageRoot = acc.StorageRoot.Bytes() } return &ac.Account{ - Address: acc.Address.Prefix(20), + Address: acc.Address.Postfix(20), PubKey: pubKey, Balance: acc.Balance, Code: acc.Code, diff --git a/vm/vm.go b/vm/vm.go index 79b2a3345..f2e869cb4 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -94,9 +94,9 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga // if callDepth is 0 the event is fired from ExecTx (along with the Input event) // otherwise, we fire from here. if vm.callDepth != 0 && vm.evc != nil { - vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{ - &types.CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas}, - vm.origin.Prefix(20), + vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Postfix(20)), types.EventMsgCall{ + &types.CallData{caller.Address.Postfix(20), callee.Address.Postfix(20), input, value, *gas}, + vm.origin.Postfix(20), vm.txid, output, exception, @@ -354,7 +354,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) // TODO ensure that 20byte lengths are supported. + acc := vm.appState.GetAccount(flipWord(addr)) // TODO ensure that 20byte lengths are supported. if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -431,7 +431,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) + acc := vm.appState.GetAccount(flipWord(addr)) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -445,7 +445,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - acc := vm.appState.GetAccount(addr) + acc := vm.appState.GetAccount(flipWord(addr)) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -652,9 +652,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga if ok = useGas(gas, GasGetAccount); !ok { return nil, firstErr(err, ErrInsufficientGas) } - // :( - addr = RightPadWord256(flip(addr.Prefix(20))) - acc := vm.appState.GetAccount(addr) + acc := vm.appState.GetAccount(flipWord(addr)) if acc == nil { return nil, firstErr(err, ErrUnknownAddress) } @@ -697,7 +695,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga return nil, firstErr(err, ErrInsufficientGas) } // TODO if the receiver is , then make it the fee. - receiver := vm.appState.GetAccount(addr) + receiver := vm.appState.GetAccount(flipWord(addr)) if receiver == nil { return nil, firstErr(err, ErrUnknownAddress) } From 65f669160fff441a807929ff443f19e3ad454933 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 17 Apr 2015 13:18:50 -0700 Subject: [PATCH 12/12] more fixes from review --- events/event_cache.go | 2 + node/node.go | 4 ++ rpc/handlers.go | 48 +++++++++++-------- rpc/test/{helpers.go => helpers_test.go} | 0 rpc/test/{tests.go => tests_test.go} | 0 .../{ws_helpers.go => ws_helpers_test.go} | 0 state/state.go | 1 + state/state_test.go | 2 +- vm/vm.go | 7 +-- 9 files changed, 39 insertions(+), 25 deletions(-) rename rpc/test/{helpers.go => helpers_test.go} (100%) rename rpc/test/{tests.go => tests_test.go} (100%) rename rpc/test/{ws_helpers.go => ws_helpers_test.go} (100%) diff --git a/events/event_cache.go b/events/event_cache.go index e1af09fa4..3231fe0d4 100644 --- a/events/event_cache.go +++ b/events/event_cache.go @@ -32,8 +32,10 @@ func (evc *EventCache) FireEvent(event string, msg interface{}) { } // Fire events by running evsw.FireEvent on all cached events. Blocks. +// Clears cached events func (evc *EventCache) Flush() { for _, ei := range evc.events { evc.evsw.FireEvent(ei.event, ei.msg) } + evc.events = make([]eventInfo, eventsBufferSize) } diff --git a/node/node.go b/node/node.go index 311828349..2bcc66857 100644 --- a/node/node.go +++ b/node/node.go @@ -161,6 +161,10 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +func (n *Node) EventSwitch() *events.EventSwitch { + return n.evsw +} + //------------------------------------------------------------------------------ func RunNode() { diff --git a/rpc/handlers.go b/rpc/handlers.go index d3b969189..b7c235447 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -212,8 +212,9 @@ type WSResponse struct { } // a single websocket connection -// contains the listeners id -type Connection struct { +// contains listener id, underlying ws connection, +// and the event switch for subscribing to events +type WSConnection struct { id string wsConn *websocket.Conn writeChan chan WSResponse @@ -225,8 +226,8 @@ type Connection struct { } // new websocket connection wrapper -func NewConnection(wsConn *websocket.Conn) *Connection { - return &Connection{ +func NewWSConnection(wsConn *websocket.Conn) *WSConnection { + return &WSConnection{ id: wsConn.RemoteAddr().String(), wsConn: wsConn, writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full @@ -234,7 +235,7 @@ func NewConnection(wsConn *websocket.Conn) *Connection { } // start the connection and hand her the event switch -func (con *Connection) Start(evsw *events.EventSwitch) { +func (con *WSConnection) Start(evsw *events.EventSwitch) { if atomic.CompareAndSwapUint32(&con.started, 0, 1) { con.evsw = evsw @@ -246,15 +247,29 @@ func (con *Connection) Start(evsw *events.EventSwitch) { } // close the connection -func (con *Connection) Stop() { +func (con *WSConnection) Stop() { if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { con.wsConn.Close() close(con.writeChan) } } +// attempt to write response to writeChan and record failures +func (con *WSConnection) safeWrite(resp WSResponse) { + select { + case con.writeChan <- resp: + // yay + con.failedSends = 0 + default: + // channel is full + // if this happens too many times in a row, + // close connection + con.failedSends += 1 + } +} + // read from the socket and subscribe to or unsubscribe from events -func (con *Connection) read() { +func (con *WSConnection) read() { reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { @@ -278,7 +293,7 @@ func (con *Connection) read() { err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - con.writeChan <- WSResponse{Error: errStr} + con.safeWrite(WSResponse{Error: errStr}) continue } switch req.Type { @@ -289,16 +304,7 @@ func (con *Connection) read() { Event: req.Event, Data: msg, } - select { - case con.writeChan <- resp: - // yay - con.failedSends = 0 - default: - // channel is full - // if this happens too many times, - // close connection - con.failedSends += 1 - } + con.safeWrite(resp) }) case "unsubscribe": if req.Event != "" { @@ -307,7 +313,7 @@ func (con *Connection) read() { con.evsw.RemoveListener(con.id) } default: - con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type} + con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type}) } } @@ -315,7 +321,7 @@ func (con *Connection) read() { } // receives on a write channel and writes out on the socket -func (con *Connection) write() { +func (con *WSConnection) write() { n, err := new(int64), new(error) for { msg, more := <-con.writeChan @@ -369,7 +375,7 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ } // register connection - con := NewConnection(wsConn) + con := NewWSConnection(wsConn) log.Info("New websocket connection", "origin", con.id) con.Start(wm.evsw) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers_test.go similarity index 100% rename from rpc/test/helpers.go rename to rpc/test/helpers_test.go diff --git a/rpc/test/tests.go b/rpc/test/tests_test.go similarity index 100% rename from rpc/test/tests.go rename to rpc/test/tests_test.go diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers_test.go similarity index 100% rename from rpc/test/ws_helpers.go rename to rpc/test/ws_helpers_test.go diff --git a/state/state.go b/state/state.go index 01270a241..182d2692b 100644 --- a/state/state.go +++ b/state/state.go @@ -101,6 +101,7 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), + evc: nil, } } diff --git a/state/state_test.go b/state/state_test.go index 58a673195..c2d766e31 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -12,7 +12,7 @@ import ( func execTxWithState(state *State, tx types.Tx, runCall bool) error { cache := NewBlockCache(state) - err := ExecTx(cache, tx, runCall, false) + err := ExecTx(cache, tx, runCall, nil) if err != nil { return err } else { diff --git a/vm/vm.go b/vm/vm.go index f2e869cb4..ba16d846a 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -307,7 +307,6 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga x, y := stack.Pop64(), stack.Pop64() stack.Push64(x & y) dbg.Printf(" %v & %v = %v\n", x, y, x&y) - case OR: // 0x17 x, y := stack.Pop64(), stack.Pop64() stack.Push64(x | y) @@ -381,7 +380,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga return nil, firstErr(err, ErrInputOutOfBounds) } stack.Push(RightPadWord256(data)) - dbg.Printf(" => 0x%X\n", data) + dbg.Printf(" => 0x%X\n", RightPadWord256(data)) case CALLDATASIZE: // 0x36 stack.Push64(uint64(len(input))) @@ -721,10 +720,12 @@ func subslice(data []byte, offset, length uint64, flip_ bool) (ret []byte, ok bo if size < offset { return nil, false } else if size < offset+length { - ret, ok = data[offset:], false + ret, ok = data[offset:], true + ret = RightPadBytes(ret, 32) } else { ret, ok = data[offset:offset+length], true } + if flip_ { ret = flip(ret) }