From 85661de2930b25fec4bb02375c9bba15681db646 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 13 Apr 2015 18:26:41 -0700 Subject: [PATCH] fire events, event urls --- consensus/state.go | 5 ++-- rpc/http_server.go | 2 +- state/execution.go | 45 +++++++++++++++++++++++++++++++++++- state/state.go | 9 ++++++++ types/events.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 types/events.go diff --git a/consensus/state.go b/consensus/state.go index 0a5431de8..ede0487a3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -440,8 +440,8 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. - // newblock event! - cs.evsw.FireEvent("newblock", cs.state.LastBlockHash) + newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) + cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) scheduleNextAction() continue ACTION_LOOP } else { @@ -1115,6 +1115,7 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty // implements events.Eventable func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { cs.evsw = evsw + cs.state.SetEventSwitch(evsw) } //----------------------------------------------------------------------------- diff --git a/rpc/http_server.go b/rpc/http_server.go index 14ccb6521..3b09ac373 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -10,10 +10,10 @@ import ( "runtime/debug" "time" + "github.com/tendermint/tendermint/alert" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" - "github.com/tendermint/tendermint/alert" ) func StartHTTPServer(listenAddr string, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) { diff --git a/state/execution.go b/state/execution.go index d7b3b78f7..dc0cb8c3f 100644 --- a/state/execution.go +++ b/state/execution.go @@ -295,7 +295,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // TODO: do something with fees fees := uint64(0) - _s := blockCache.State() // hack to access validators. + _s := blockCache.State() // hack to access validators and event switch. // Exec tx switch tx := tx_.(type) { @@ -325,6 +325,19 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { for _, acc := range accounts { blockCache.UpdateAccount(acc) } + + // If we're in a block (not mempool), + // fire event on all inputs and outputs + // see types/events.go for spec + if runCall { + for _, i := range tx.Inputs { + _s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx) + } + + for _, o := range tx.Outputs { + _s.evsw.FireEvent(types.EventStringAccOutput(o.Address), tx) + } + } return nil case *types.CallTx: @@ -413,14 +426,18 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { txCache.UpdateAccount(callee) // because we adjusted by input above. vmach := vm.NewVM(txCache, params, caller.Address) // NOTE: Call() transfers the value from caller to callee iff call succeeds. + ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) + var exception string if err != nil { + exception = err.Error() // Failure. Charge the gas fee. The 'value' was otherwise not transferred. log.Debug(Fmt("Error on execution: %v", err)) inAcc.Balance -= tx.Fee blockCache.UpdateAccount(inAcc) // Throw away 'txCache' which holds incomplete updates (don't sync it). } else { + exception = "" log.Debug("Successful execution") // Success if createAccount { @@ -431,6 +448,20 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { } // Create a receipt from the ret and whether errored. log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) + + // Fire Events for sender and receiver + // a separate event will be fired from vm for each + _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), struct { + Tx types.Tx + Return []byte + Exception string + }{tx, ret, exception}) + + _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), struct { + Tx types.Tx + Return []byte + Exception string + }{tx, ret, exception}) } else { // The mempool does not call txs until // the proposer determines the order of txs. @@ -498,6 +529,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { if !added { panic("Failed to add validator") } + if runCall { + _s.evsw.FireEvent(types.EventStringBond(), tx) + } return nil case *types.UnbondTx: @@ -520,6 +554,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.unbondValidator(val) + if runCall { + _s.evsw.FireEvent(types.EventStringUnbond(), tx) + } return nil case *types.RebondTx: @@ -542,6 +579,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! _s.rebondValidator(val) + if runCall { + _s.evsw.FireEvent(types.EventStringRebond(), tx) + } return nil case *types.DupeoutTx: @@ -585,6 +625,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error { // Good! (Bad validator!) _s.destroyValidator(accused) + if runCall { + _s.evsw.FireEvent(types.EventStringDupeout(), tx) + } return nil default: diff --git a/state/state.go b/state/state.go index 700202b87..c8d7037c1 100644 --- a/state/state.go +++ b/state/state.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" dbm "github.com/tendermint/tendermint/db" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/types" ) @@ -34,6 +35,8 @@ type State struct { UnbondingValidators *ValidatorSet accounts merkle.Tree // Shouldn't be accessed directly. validatorInfos merkle.Tree // Shouldn't be accessed directly. + + evsw *events.EventSwitch } func LoadState(db dbm.DB) *State { @@ -98,6 +101,7 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), + evsw: s.evsw, } } @@ -264,6 +268,11 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) { // State.storage //------------------------------------- +// implements events.Eventable +func (s *State) SetEventSwitch(evsw *events.EventSwitch) { + s.evsw = evsw +} + //----------------------------------------------------------------------------- type InvalidTxError struct { diff --git a/types/events.go b/types/events.go new file mode 100644 index 000000000..28965ecc9 --- /dev/null +++ b/types/events.go @@ -0,0 +1,57 @@ +package types + +import ( + "fmt" +) + +func EventStringAccInput(addr []byte) string { + return fmt.Sprintf("Acc/%x/Input", addr) +} + +func EventStringAccOutput(addr []byte) string { + return fmt.Sprintf("Acc/%x/Output", addr) +} + +func EventStringAccReceive(addr []byte) string { + return fmt.Sprintf("Acc/%x/Receive", addr) +} + +func EventStringBond() string { + return "Bond" +} + +func EventStringUnbond() string { + return "Unbond" +} + +func EventStringRebond() string { + return "Rebond" +} + +func EventStringDupeout() string { + return "Dupeout" +} + +func EventStringNewBlock() string { + return "NewBlock" +} + +func EventStringFork() string { + return "Fork" +} + +/* +Acc/XYZ/Input -> full tx or {full tx, return value, exception} +Acc/XYZ/Output -> full tx +Acc/XYZ/Receive -> full tx, return value, exception, (optionally?) calldata +Bond -> full tx +Unbond -> full tx +Rebond -> full tx +Dupeout -> full tx +NewBlock -> full block +Fork -> block A, block B + +Log -> Fuck this +NewPeer -> peer +Alert -> alert msg +*/