diff --git a/config/config.go b/config/config.go deleted file mode 100644 index 21c6b81f8..000000000 --- a/config/config.go +++ /dev/null @@ -1,117 +0,0 @@ - -package config - -import ( - "github.com/naoina/toml" - "sync" - "time" - - . "github.com/tendermint/go-common" -) - -type Config interface { - Get(key string) interface{} - GetBool(key string) bool - GetFloat64(key string) float64 - GetInt(key string) int - GetString(key string) string - GetStringMap(key string) map[string]interface{} - GetStringMapString(key string) map[string]string - GetStringSlice(key string) []string - GetTime(key string) time.Time - IsSet(key string) bool - Set(key string, value interface{}) -} - -type MapConfig struct { - required map[string]struct{} // blows up if trying to use before setting. - data map[string]interface{} -} - -func ReadMapConfigFromFile(filePath string) (MapConfig, error) { - var configData = make(map[string]interface{}) - fileBytes := MustReadFile(filePath) - err := toml.Unmarshal(fileBytes, configData) - if err != nil { - return MapConfig{}, err - } - return NewMapConfig(configData), nil -} - -func NewMapConfig(data map[string]interface{}) MapConfig { - if data == nil { - data = make(map[string]interface{}) - } - return MapConfig{ - required: make(map[string]struct{}), - data: data, - } -} - -func (cfg MapConfig) Get(key string) interface{} { - if _, ok := cfg.required[key]; ok { - PanicSanity(Fmt("config key %v is required but was not set.", key)) - } - return cfg.data[key] -} -func (cfg MapConfig) GetBool(key string) bool { return cfg.Get(key).(bool) } -func (cfg MapConfig) GetFloat64(key string) float64 { return cfg.Get(key).(float64) } -func (cfg MapConfig) GetInt(key string) int { return cfg.Get(key).(int) } -func (cfg MapConfig) GetString(key string) string { return cfg.Get(key).(string) } -func (cfg MapConfig) GetStringMap(key string) map[string]interface{} { - return cfg.Get(key).(map[string]interface{}) -} -func (cfg MapConfig) GetStringMapString(key string) map[string]string { - return cfg.Get(key).(map[string]string) -} -func (cfg MapConfig) GetStringSlice(key string) []string { return cfg.Get(key).([]string) } -func (cfg MapConfig) GetTime(key string) time.Time { return cfg.Get(key).(time.Time) } -func (cfg MapConfig) IsSet(key string) bool { _, ok := cfg.data[key]; return ok } -func (cfg MapConfig) Set(key string, value interface{}) { - delete(cfg.required, key) - cfg.data[key] = value -} -func (cfg MapConfig) SetDefault(key string, value interface{}) { - delete(cfg.required, key) - if cfg.IsSet(key) { - return - } - cfg.data[key] = value -} -func (cfg MapConfig) SetRequired(key string) { - if cfg.IsSet(key) { - return - } - cfg.required[key] = struct{}{} -} - -//-------------------------------------------------------------------------------- -// A little convenient hack to notify listeners upon config changes. - -type Configurable func(Config) - -var mtx sync.Mutex -var globalConfig Config -var confs []Configurable - -func OnConfig(conf func(Config)) { - mtx.Lock() - defer mtx.Unlock() - - confs = append(confs, conf) - if globalConfig != nil { - conf(globalConfig) - } -} - -func ApplyConfig(config Config) { - mtx.Lock() - globalConfig = config - confsCopy := make([]Configurable, len(confs)) - copy(confsCopy, confs) - mtx.Unlock() - - for _, conf := range confsCopy { - conf(config) - } -} diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 6378607fe..74d7aefeb 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -70,6 +70,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("cswal_light", false) mapConfig.SetDefault("block_size", 10000) + mapConfig.SetDefault("disable_data_hash", false) mapConfig.SetDefault("timeout_propose", 3000) mapConfig.SetDefault("timeout_propose_delta", 500) mapConfig.SetDefault("timeout_prevote", 1000) diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index b90fe2549..c2690ae90 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -88,6 +88,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("cswal_light", false) mapConfig.SetDefault("block_size", 10000) + mapConfig.SetDefault("disable_data_hash", false) mapConfig.SetDefault("timeout_propose", 100) mapConfig.SetDefault("timeout_propose_delta", 1) mapConfig.SetDefault("timeout_prevote", 1) diff --git a/consensus/reactor.go b/consensus/reactor.go index c2874df25..98f6510dd 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -91,9 +91,9 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { SendQueueCapacity: 100, }, &p2p.ChannelDescriptor{ - ID: DataChannel, - Priority: 2, - SendQueueCapacity: 50, + ID: DataChannel, // maybe split between gossiping current block and catchup stuff + Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round + SendQueueCapacity: 100, RecvBufferCapacity: 50 * 4096, }, &p2p.ChannelDescriptor{ diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 40ec34ca8..d46f68cd1 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -86,13 +86,12 @@ func TestReplayCatchup(t *testing.T) { t.Fatalf("Error on catchup replay %v", err) } - after := time.After(time.Second * 2) + after := time.After(time.Second * 15) select { case <-newBlockCh: case <-after: t.Fatal("Timed out waiting for new block") } - } func openWAL(t *testing.T, cs *ConsensusState, file string) { diff --git a/consensus/state.go b/consensus/state.go index aafef7ec0..fdbfdb6e6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -655,7 +655,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { err = cs.setProposal(msg.Proposal) case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit - _, err = cs.addProposalBlockPart(msg.Height, msg.Part) + _, err = cs.addProposalBlockPart(msg.Height, msg.Part, peerKey != "") if err != nil && msg.Round != cs.Round { err = nil } @@ -835,8 +835,8 @@ func (cs *ConsensusState) decideProposal(height, round int) { part := blockParts.GetPart(i) cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } - log.Info("Signed and sent proposal", "height", height, "round", round, "proposal", proposal) - log.Debug(Fmt("Signed and sent proposal block: %v", block)) + log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) + log.Debug(Fmt("Signed proposal block: %v", block)) } else { log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) } @@ -1206,6 +1206,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Fire off event for new block. // TODO: Handle app failure. See #177 cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) + cs.evsw.FireEvent(types.EventStringNewBlockHeader(), types.EventDataNewBlockHeader{block.Header}) // Create a copy of the state for staging stateCopy := cs.state.Copy() @@ -1291,7 +1292,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { // NOTE: block is not necessarily valid. // Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, once we have the full block. -func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { +func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part, verify bool) (added bool, err error) { // Blocks might be reused, so round mismatch is OK if cs.Height != height { return false, nil @@ -1302,7 +1303,7 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad return false, nil // TODO: bad peer? Return error? } - added, err = cs.ProposalBlockParts.AddPart(part) + added, err = cs.ProposalBlockParts.AddPart(part, verify) if err != nil { return added, err } diff --git a/consensus/state_test.go b/consensus/state_test.go index 277471267..dd2dbe162 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -491,7 +491,7 @@ func TestLockPOLRelock(t *testing.T) { proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) - newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1) + newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlockHeader(), 1) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) @@ -577,14 +577,14 @@ func TestLockPOLRelock(t *testing.T) { _, _ = <-voteCh, <-voteCh be := <-newBlockCh - b := be.(types.EventDataNewBlock) + b := be.(types.EventDataNewBlockHeader) re = <-newRoundCh rs = re.(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } - if !bytes.Equal(b.Block.Hash(), propBlockHash) { + if !bytes.Equal(b.Header.Hash(), propBlockHash) { t.Fatal("Expected new block to be proposal block") } } diff --git a/consensus/version.go b/consensus/version.go new file mode 100644 index 000000000..2acc3b77b --- /dev/null +++ b/consensus/version.go @@ -0,0 +1,13 @@ +package consensus + +import ( + . "github.com/tendermint/go-common" +) + +// kind of arbitrary +var Spec = "1" // async +var Major = "0" // +var Minor = "2" // replay refactor +var Revision = "1" // round state fix + +var Version = Fmt("v%s/%s.%s.%s", Spec, Major, Minor, Revision) diff --git a/consensus/wal.go b/consensus/wal.go index 56bb294e4..5b4747a6f 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -61,21 +61,23 @@ func NewWAL(file string, light bool) (*WAL, error) { } // called in newStep and for each pass in receiveRoutine -func (wal *WAL) Save(msg ConsensusLogMessageInterface) { +func (wal *WAL) Save(clm ConsensusLogMessageInterface) { if wal != nil { if wal.light { - if m, ok := msg.(msgInfo); ok { - if _, ok := m.Msg.(*BlockPartMessage); ok { + // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts) + if mi, ok := clm.(msgInfo); ok { + _ = mi + if mi.PeerKey != "" { return } } } var n int var err error - wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err) + wire.WriteJSON(ConsensusLogMessage{time.Now(), clm}, wal.fp, &n, &err) wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line if err != nil { - PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg)) + PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm)) } } } diff --git a/glide.lock b/glide.lock index dd884dd42..92cf1a87c 100644 --- a/glide.lock +++ b/glide.lock @@ -1,18 +1,18 @@ hash: f3eab3f91c9d2c07574e8ec6f2f5d56bd946af1b061533a0baf9db8765f97a51 -updated: 2016-03-05T17:20:40.721925401-05:00 +updated: 2016-03-24T16:39:27.330201414-04:00 imports: - name: github.com/gogo/protobuf - version: f4cc07910fc38f5b6b8d6e75d7457cf504157b6c + version: 4168943e65a2802828518e95310aeeed6d84c4e5 subpackages: - proto - name: github.com/golang/protobuf - version: c75fbf01dc6cb73649c4cd4326182c3e44aa9dbb + version: 8d92cf5fc15a4382f8964b08e1f42a75c0591aa3 subpackages: - proto - name: github.com/golang/snappy version: 5f1c01d9f64b941dd9582c638279d046eda6ca31 - name: github.com/gorilla/websocket - version: c45a635370221f34fea2d5163fd156fcb4e38e8a + version: e2e3d8414d0fbae04004f151979f4e27c6747fe7 - name: github.com/inconshreveable/log15 version: 210d6fdc4d979ef6579778f1b6ed84571454abb4 subpackages: @@ -33,7 +33,7 @@ imports: - name: github.com/spf13/pflag version: 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7 - name: github.com/syndtr/goleveldb - version: ad0d8b2ab58a55ed5c58073aa46451d5e1ca1280 + version: 917f41c560270110ceb73c5b38be2a9127387071 subpackages: - leveldb - leveldb/errors @@ -59,23 +59,23 @@ imports: - name: github.com/tendermint/go-clist version: 634527f5b60fd7c71ca811262493df2ad65ee0ca - name: github.com/tendermint/go-common - version: 1559ae1ac90c88b1373ff114c409399c5a1cedac + version: dcfa46af1341d03b80d32e4901019d1668b978b9 - name: github.com/tendermint/go-config - version: c077af2c1ecf584fb797fd1956758545b25d952b + version: c47b67203b070d8bea835a928d50cb739972c48a - name: github.com/tendermint/go-crypto - version: 76ba23e4c0c627b8c66d1f97b6a18dc77f4f0297 + version: 3f0d9b3f29f30e5d0cbc2cef04fa45e5a606c622 - name: github.com/tendermint/go-db version: a7878f1d0d8eaebf15f87bc2df15f7a1088cce7f - name: github.com/tendermint/go-events version: 7b75ca7bb55aa25e9ef765eb8c0b69486b227357 - name: github.com/tendermint/go-logger - version: 4901b71ade2b834ca0f4c2ca69edb96792dca05b + version: 84391b36d3f5960e691c688d06b768708f0fa2f3 - name: github.com/tendermint/go-logio version: 04f3aa0a3b38d06dcadefbafd988c8b85e499225 - name: github.com/tendermint/go-merkle - version: 67b535ce9633be7df575dc3a7833fa2301020c25 + version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 - name: github.com/tendermint/go-p2p - version: 7f6aad20fbad6ef1a132d5a8bebd18f3521fff1a + version: 10619248c665dee6b8f81455f7f27ab93d5ec366 subpackages: - upnp - name: github.com/tendermint/go-rpc @@ -85,27 +85,29 @@ imports: - types - server - name: github.com/tendermint/go-wire - version: 9acb294893c790427e2b9abf2877e69690cd5b6c + version: 7a15dd53dfdecc0f967676edcd6b335c59344c83 - name: github.com/tendermint/log15 version: 6e460758f10ef42a4724b8e4a82fee59aaa0e41d - name: github.com/tendermint/tmsp - version: 72540f9cac4840989cb05b147cc89be8cd91f043 + version: 1dfc6950dddf47ff397e670a67d405d25da138ea subpackages: - types + - client - example/dummy - example/nil - - client - name: golang.org/x/crypto - version: 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 + version: c197bcf24cde29d3f73c7b4ac6fd41f4384e8af6 subpackages: - ripemd160 - nacl/box - nacl/secretbox + - openpgp/armor - curve25519 - salsa20/salsa - poly1305 + - openpgp/errors - name: golang.org/x/sys - version: 7a56174f0086b32866ebd746a794417edbc678a1 + version: afce3de5756ca82699128ebae46ac95ad59d6297 subpackages: - unix devImports: [] diff --git a/mempool/mempool.go b/mempool/mempool.go index 0744880a1..6df62feee 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -77,6 +77,10 @@ func NewMempool(proxyAppConn proxy.AppConn) *Mempool { return mempool } +func (mem *Mempool) Size() int { + return mem.txs.Len() +} + // Return the first element of mem.txs for peer goroutines to call .NextWait() on. // Blocks until txs has elements. func (mem *Mempool) TxsFrontWait() *clist.CElement { @@ -197,9 +201,11 @@ func (mem *Mempool) Reap(maxTxs int) []types.Tx { return txs } -// maxTxs: 0 means uncapped +// maxTxs: -1 means uncapped, 0 means none func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { if maxTxs == 0 { + return []types.Tx{} + } else if maxTxs < 0 { maxTxs = mem.txs.Len() } txs := make([]types.Tx, 0, MinInt(mem.txs.Len(), maxTxs)) diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 750f987da..a15496962 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -48,7 +48,7 @@ func TestSerialReap(t *testing.T) { } reapCheck := func(exp int) { - txs := mempool.Reap(0) + txs := mempool.Reap(-1) if len(txs) != exp { t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) } diff --git a/node/node.go b/node/node.go index 5ad301800..96010a1a4 100644 --- a/node/node.go +++ b/node/node.go @@ -224,6 +224,7 @@ func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { Other: []string{ Fmt("wire_version=%v", wire.Version), Fmt("p2p_version=%v", p2p.Version), + Fmt("consensus_version=%v", consensus.Version), Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), }, } diff --git a/rpc/core/dev.go b/rpc/core/dev.go new file mode 100644 index 000000000..2b8dfb8f3 --- /dev/null +++ b/rpc/core/dev.go @@ -0,0 +1,67 @@ +package core + +import ( + "fmt" + "os" + "runtime/pprof" + "strconv" + + ctypes "github.com/tendermint/tendermint/rpc/core/types" +) + +func UnsafeSetConfig(typ, key, value string) (*ctypes.ResultUnsafeSetConfig, error) { + switch typ { + case "string": + config.Set(key, value) + case "int": + val, err := strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("non-integer value found. key:%s; value:%s; err:%v", key, value, err) + } + config.Set(key, val) + case "bool": + switch value { + case "true": + config.Set(key, true) + case "false": + config.Set(key, false) + default: + return nil, fmt.Errorf("bool value must be true or false. got %s", value) + } + default: + return nil, fmt.Errorf("Unknown type %s", typ) + } + return &ctypes.ResultUnsafeSetConfig{}, nil +} + +var profFile *os.File + +func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error) { + var err error + profFile, err = os.Create(filename) + if err != nil { + return nil, err + } + err = pprof.StartCPUProfile(profFile) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsafeProfile{}, nil +} + +func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) { + pprof.StopCPUProfile() + profFile.Close() + return &ctypes.ResultUnsafeProfile{}, nil +} + +func UnsafeWriteHeapProfile(filename string) (*ctypes.ResultUnsafeProfile, error) { + memProfFile, err := os.Create(filename) + if err != nil { + return nil, err + } + pprof.WriteHeapProfile(memProfFile) + memProfFile.Close() + + return &ctypes.ResultUnsafeProfile{}, nil +} diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index e5a34026c..d06fca8b2 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -39,3 +39,7 @@ func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { txs := mempoolReactor.Mempool.Reap(0) return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil } + +func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { + return &ctypes.ResultUnconfirmedTxs{N: mempoolReactor.Mempool.Size()}, nil +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 53604c0fc..585a18cd2 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -22,8 +22,12 @@ var Routes = map[string]*rpc.RPCFunc{ "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), + "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), - "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), + "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), + "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), + "unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfilerResult, ""), + "unsafe_write_heap_profile": rpc.NewRPCFunc(UnsafeWriteHeapProfileResult, "filename"), } func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { @@ -114,6 +118,14 @@ func UnconfirmedTxsResult() (ctypes.TMResult, error) { } } +func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { + if r, err := NumUnconfirmedTxs(); err != nil { + return nil, err + } else { + return r, nil + } +} + func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { if r, err := BroadcastTxSync(tx); err != nil { return nil, err @@ -137,3 +149,27 @@ func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { return r, nil } } + +func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) { + if r, err := UnsafeStartCPUProfiler(filename); err != nil { + return nil, err + } else { + return r, nil + } +} + +func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) { + if r, err := UnsafeStopCPUProfiler(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) { + if r, err := UnsafeWriteHeapProfile(filename); err != nil { + return nil, err + } else { + return r, nil + } +} diff --git a/rpc/core/status.go b/rpc/core/status.go index 8f056fd4e..bf3d69ffe 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -1,9 +1,6 @@ package core import ( - "fmt" - "strconv" - ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -31,28 +28,3 @@ func Status() (*ctypes.ResultStatus, error) { LatestBlockHeight: latestHeight, LatestBlockTime: latestBlockTime}, nil } - -func UnsafeSetConfig(typ, key, value string) (*ctypes.ResultUnsafeSetConfig, error) { - switch typ { - case "string": - config.Set(key, value) - case "int": - val, err := strconv.Atoi(value) - if err != nil { - return nil, fmt.Errorf("non-integer value found. key:%s; value:%s; err:%v", key, value, err) - } - config.Set(key, val) - case "bool": - switch value { - case "true": - config.Set(key, true) - case "false": - config.Set(key, false) - default: - return nil, fmt.Errorf("bool value must be true or false. got %s", value) - } - default: - return nil, fmt.Errorf("Unknown type %s", typ) - } - return &ctypes.ResultUnsafeSetConfig{}, nil -} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 660c6e0c1..d08892e8c 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -70,6 +70,8 @@ type ResultUnconfirmedTxs struct { type ResultUnsafeSetConfig struct{} +type ResultUnsafeProfile struct{} + type ResultSubscribe struct { } @@ -109,7 +111,10 @@ const ( ResultTypeEvent = byte(0x82) // 0xa bytes for testing - ResultTypeUnsafeSetConfig = byte(0xa0) + ResultTypeUnsafeSetConfig = byte(0xa0) + ResultTypeUnsafeStartCPUProfiler = byte(0xa1) + ResultTypeUnsafeStopCPUProfiler = byte(0xa2) + ResultTypeUnsafeWriteHeapProfile = byte(0xa3) ) type TMResult interface { @@ -133,4 +138,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, wire.ConcreteType{&ResultUnsafeSetConfig{}, ResultTypeUnsafeSetConfig}, + wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStartCPUProfiler}, + wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler}, + wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile}, ) diff --git a/rpc/core/version.go b/rpc/core/version.go index 0890da87a..e283de479 100644 --- a/rpc/core/version.go +++ b/rpc/core/version.go @@ -2,4 +2,4 @@ package core // a single integer is sufficient here -const Version = "2" // add DialSeeds; re-organize type bytes +const Version = "3" // rpc routes for profiling, setting config diff --git a/types/block.go b/types/block.go index 1e197c621..9ce45bce0 100644 --- a/types/block.go +++ b/types/block.go @@ -329,19 +329,20 @@ type Data struct { // Txs that will be applied by state @ block.Height+1. // NOTE: not all txs here are valid. We're just agreeing on the order first. // This means that block.AppHash does not include these txs. - Txs []Tx `json:"txs"` + Txs Txs `json:"txs"` // Volatile hash []byte } func (data *Data) Hash() []byte { + if config.GetBool("disable_data_hash") { + // we could use the part_set hash instead + data.hash = []byte{} + return data.hash + } if data.hash == nil { - txs := make([]interface{}, len(data.Txs)) - for i, tx := range data.Txs { - txs[i] = tx - } - data.hash = merkle.SimpleHashFromBinaries(txs) // NOTE: leaves are TxIDs. + data.hash = data.Txs.Hash() // NOTE: leaves of merkle tree are TxIDs } return data.hash } diff --git a/types/events.go b/types/events.go index 247b1e7b9..3328911c1 100644 --- a/types/events.go +++ b/types/events.go @@ -16,6 +16,7 @@ func EventStringDupeout() string { return "Dupeout" } func EventStringFork() string { return "Fork" } func EventStringNewBlock() string { return "NewBlock" } +func EventStringNewBlockHeader() string { return "NewBlockHeader" } func EventStringNewRound() string { return "NewRound" } func EventStringNewRoundStep() string { return "NewRoundStep" } func EventStringTimeoutPropose() string { return "TimeoutPropose" } @@ -36,9 +37,10 @@ type TMEventData interface { } const ( - EventDataTypeNewBlock = byte(0x01) - EventDataTypeFork = byte(0x02) - EventDataTypeTx = byte(0x03) + EventDataTypeNewBlock = byte(0x01) + EventDataTypeFork = byte(0x02) + EventDataTypeTx = byte(0x03) + EventDataTypeNewBlockHeader = byte(0x04) EventDataTypeRoundState = byte(0x11) EventDataTypeVote = byte(0x12) @@ -47,6 +49,7 @@ const ( var _ = wire.RegisterInterface( struct{ TMEventData }{}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, + wire.ConcreteType{EventDataNewBlockHeader{}, EventDataTypeNewBlockHeader}, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState}, @@ -60,6 +63,11 @@ type EventDataNewBlock struct { Block *Block `json:"block"` } +// light weight event for benchmarking +type EventDataNewBlockHeader struct { + Header *Header `json:"header"` +} + // All txs fire EventDataTx type EventDataTx struct { Tx Tx `json:"tx"` @@ -84,7 +92,8 @@ type EventDataVote struct { Vote *Vote } -func (_ EventDataNewBlock) AssertIsTMEventData() {} -func (_ EventDataTx) AssertIsTMEventData() {} -func (_ EventDataRoundState) AssertIsTMEventData() {} -func (_ EventDataVote) AssertIsTMEventData() {} +func (_ EventDataNewBlock) AssertIsTMEventData() {} +func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} +func (_ EventDataTx) AssertIsTMEventData() {} +func (_ EventDataRoundState) AssertIsTMEventData() {} +func (_ EventDataVote) AssertIsTMEventData() {} diff --git a/types/part_set.go b/types/part_set.go index 5940a85d8..bdf198d94 100644 --- a/types/part_set.go +++ b/types/part_set.go @@ -15,7 +15,7 @@ import ( ) const ( - partSize = 4096 // 4KB + partSize = 65536 // 64KB ... 4096 // 4KB ) var ( @@ -188,7 +188,7 @@ func (ps *PartSet) Total() int { return ps.total } -func (ps *PartSet) AddPart(part *Part) (bool, error) { +func (ps *PartSet) AddPart(part *Part, verify bool) (bool, error) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -203,8 +203,10 @@ func (ps *PartSet) AddPart(part *Part) (bool, error) { } // Check hash proof - if !part.Proof.Verify(part.Index, ps.total, part.Hash(), ps.Hash()) { - return false, ErrPartSetInvalidProof + if verify { + if !part.Proof.Verify(part.Index, ps.total, part.Hash(), ps.Hash()) { + return false, ErrPartSetInvalidProof + } } // Add part @@ -228,11 +230,42 @@ func (ps *PartSet) GetReader() io.Reader { if !ps.IsComplete() { PanicSanity("Cannot GetReader() on incomplete PartSet") } - buf := []byte{} - for _, part := range ps.parts { - buf = append(buf, part.Bytes...) + return NewPartSetReader(ps.parts) +} + +type PartSetReader struct { + i int + parts []*Part + reader *bytes.Reader +} + +func NewPartSetReader(parts []*Part) *PartSetReader { + return &PartSetReader{ + i: 0, + parts: parts, + reader: bytes.NewReader(parts[0].Bytes), } - return bytes.NewReader(buf) +} + +func (psr *PartSetReader) Read(p []byte) (n int, err error) { + readerLen := psr.reader.Len() + if readerLen >= len(p) { + return psr.reader.Read(p) + } else if readerLen > 0 { + n1, err := psr.Read(p[:readerLen]) + if err != nil { + return n1, err + } + n2, err := psr.Read(p[readerLen:]) + return n1 + n2, err + } + + psr.i += 1 + if psr.i >= len(psr.parts) { + return 0, io.EOF + } + psr.reader = bytes.NewReader(psr.parts[psr.i].Bytes) + return psr.Read(p) } func (ps *PartSet) StringShort() string { diff --git a/types/part_set_test.go b/types/part_set_test.go index 4e74d7772..bbc3da9ed 100644 --- a/types/part_set_test.go +++ b/types/part_set_test.go @@ -30,7 +30,7 @@ func TestBasicPartSet(t *testing.T) { for i := 0; i < partSet.Total(); i++ { part := partSet.GetPart(i) //t.Logf("\n%v", part) - added, err := partSet2.AddPart(part) + added, err := partSet2.AddPart(part, true) if !added || err != nil { t.Errorf("Failed to add part %v, error: %v", i, err) } @@ -70,7 +70,7 @@ func TestWrongProof(t *testing.T) { // Test adding a part with wrong trail. part := partSet.GetPart(0) part.Proof.Aunts[0][0] += byte(0x01) - added, err := partSet2.AddPart(part) + added, err := partSet2.AddPart(part, true) if added || err == nil { t.Errorf("Expected to fail adding a part with bad trail.") } @@ -78,7 +78,7 @@ func TestWrongProof(t *testing.T) { // Test adding a part with wrong bytes. part = partSet.GetPart(1) part.Bytes[0] += byte(0x01) - added, err = partSet2.AddPart(part) + added, err = partSet2.AddPart(part, true) if added || err == nil { t.Errorf("Expected to fail adding a part with bad bytes.") } diff --git a/types/tx.go b/types/tx.go index a3cb9fc04..60699d534 100644 --- a/types/tx.go +++ b/types/tx.go @@ -1,3 +1,24 @@ package types +import ( + "github.com/tendermint/go-merkle" +) + type Tx []byte + +type Txs []Tx + +func (txs Txs) Hash() []byte { + // Recursive impl. + // Copied from go-merkle to avoid allocations + switch len(txs) { + case 0: + return nil + case 1: + return merkle.SimpleHashFromBinary(txs[0]) + default: + left := Txs(txs[:(len(txs)+1)/2]).Hash() + right := Txs(txs[(len(txs)+1)/2:]).Hash() + return merkle.SimpleHashFromTwoHashes(left, right) + } +} diff --git a/types/vote_set.go b/types/vote_set.go index 8c03f79bf..22fe246b1 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -128,17 +128,16 @@ func (voteSet *VoteSet) addVote(val *Validator, valIndex int, vote *Vote) (bool, return false, 0, ErrVoteUnexpectedStep } - // Check signature. - if !val.PubKey.VerifyBytes(SignBytes(config.GetString("chain_id"), vote), vote.Signature) { - // Bad signature. - return false, 0, ErrVoteInvalidSignature - } - // If vote already exists, return false. if existingVote := voteSet.votes[valIndex]; existingVote != nil { if bytes.Equal(existingVote.BlockHash, vote.BlockHash) { return false, valIndex, nil } else { + // Check signature. + if !val.PubKey.VerifyBytes(SignBytes(config.GetString("chain_id"), vote), vote.Signature) { + // Bad signature. + return false, 0, ErrVoteInvalidSignature + } return false, valIndex, &ErrVoteConflictingSignature{ VoteA: existingVote, VoteB: vote, @@ -146,6 +145,12 @@ func (voteSet *VoteSet) addVote(val *Validator, valIndex int, vote *Vote) (bool, } } + // Check signature. + if !val.PubKey.VerifyBytes(SignBytes(config.GetString("chain_id"), vote), vote.Signature) { + // Bad signature. + return false, 0, ErrVoteInvalidSignature + } + // Add vote. voteSet.votes[valIndex] = vote voteSet.votesBitArray.SetIndex(valIndex, true)