From efc01cf5822d009c251b70749598cadf4a8593a6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 10 May 2018 14:02:31 +0400 Subject: [PATCH 1/8] stop localnet before starting in order to avoid having to stop it manually --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 4601c970c..05d1889f0 100755 --- a/Makefile +++ b/Makefile @@ -194,7 +194,7 @@ build-linux: GOOS=linux GOARCH=amd64 $(MAKE) build # Run a 4-node testnet locally -localnet-start: +localnet-start: localnet-stop @if ! [ -f build/node0/config/genesis.json ]; then docker run --rm -v $(CURDIR)/build:/tendermint:Z tendermint/localnode testnet --v 4 --o . --populate-persistent-peers --starting-ip-address 192.167.10.2 ; fi docker-compose up From 0d93424c6a186e017c5268dc651753c3fb52ffe2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 10 May 2018 14:03:05 +0400 Subject: [PATCH 2/8] disable indexer by default --- config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index b76f5ed19..b5bd87cee 100644 --- a/config/config.go +++ b/config/config.go @@ -515,7 +515,7 @@ type TxIndexConfig struct { // DefaultTxIndexConfig returns a default configuration for the transaction indexer. func DefaultTxIndexConfig() *TxIndexConfig { return &TxIndexConfig{ - Indexer: "kv", + Indexer: "null", IndexTags: "", IndexAllTags: false, } From 7c14fa820d0817560592c7e476d2c9cd9044a236 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 10 May 2018 14:04:27 +0400 Subject: [PATCH 3/8] do not log txs at info level BEFORE: ``` ./tm-bench -c 5 -r 1000 127.0.0.1:46657 Stats Avg StdDev Max Txs/sec 1826 843 2744 Blocks/sec 1.100 0.300 2 ``` AFTER: ``` ./tm-bench -T 30 -c 5 -r 1000 127.0.0.1:46657 Stats Avg StdDev Max Txs/sec 6120 1970 9776 Blocks/sec 1.000 0.000 1 ``` --- mempool/mempool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index ec4f98478..6bffd42b4 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -255,7 +255,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { tx: tx, } mem.txs.PushBack(memTx) - mem.logger.Info("Added good transaction", "tx", tx, "res", r) + mem.logger.Debug("Added good transaction", "tx", tx, "res", r) mem.notifyTxsAvailable() } else { // ignore bad transaction From bbe135595713764c0f129f3d481519627b3445ea Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 11 May 2018 12:09:16 +0400 Subject: [PATCH 4/8] log only hash, not tx itself --- mempool/mempool.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 6bffd42b4..aa2aa4f41 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -3,6 +3,7 @@ package mempool import ( "bytes" "container/list" + "fmt" "sync" "sync/atomic" "time" @@ -255,11 +256,11 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { tx: tx, } mem.txs.PushBack(memTx) - mem.logger.Debug("Added good transaction", "tx", tx, "res", r) + mem.logger.Info("Added good transaction", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "res", r) mem.notifyTxsAvailable() } else { // ignore bad transaction - mem.logger.Info("Rejected bad transaction", "tx", tx, "res", r) + mem.logger.Info("Rejected bad transaction", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "res", r) // remove from cache (it might be good later) mem.cache.Remove(tx) From 58e3246ffc7d49ce76312278882a4e84ab417311 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 11 May 2018 12:09:41 +0400 Subject: [PATCH 5/8] batch index txs --- config/config.go | 2 +- state/execution.go | 14 ++++-------- state/txindex/indexer.go | 2 +- state/txindex/indexer_service.go | 37 ++++++++++++++++++++++++++------ state/txindex/kv/kv_test.go | 4 ++-- 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index b5bd87cee..b76f5ed19 100644 --- a/config/config.go +++ b/config/config.go @@ -515,7 +515,7 @@ type TxIndexConfig struct { // DefaultTxIndexConfig returns a default configuration for the transaction indexer. func DefaultTxIndexConfig() *TxIndexConfig { return &TxIndexConfig{ - Indexer: "null", + Indexer: "kv", IndexTags: "", IndexAllTags: false, } diff --git a/state/execution.go b/state/execution.go index 0ce5e44f1..3fe35e2fa 100644 --- a/state/execution.go +++ b/state/execution.go @@ -341,23 +341,17 @@ func updateState(s State, blockID types.BlockID, header *types.Header, // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { - // NOTE: do we still need this buffer ? - txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs)) + eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + for i, tx := range block.Data.Txs { - txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{ + eventBus.PublishEventTx(types.EventDataTx{types.TxResult{ Height: block.Height, Index: uint32(i), Tx: tx, Result: *(abciResponses.DeliverTx[i]), }}) } - - eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) - err := txEventBuffer.Flush() - if err != nil { - logger.Error("Failed to flush event buffer", "err", err) - } } //---------------------------------------------------------------------------------------------------- diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index bd51fbb29..e23840f14 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -34,7 +34,7 @@ type Batch struct { } // NewBatch creates a new Batch. -func NewBatch(n int) *Batch { +func NewBatch(n int64) *Batch { return &Batch{ Ops: make([]*types.TxResult, n), } diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index f5420f631..dd12bdf9d 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -11,6 +11,8 @@ const ( subscriber = "IndexerService" ) +// IndexerService connects event bus and transaction indexer together in order +// to index transactions coming from event bus. type IndexerService struct { cmn.BaseService @@ -18,6 +20,7 @@ type IndexerService struct { eventBus *types.EventBus } +// NewIndexerService returns a new service instance. func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService { is := &IndexerService{idr: idr, eventBus: eventBus} is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is) @@ -27,15 +30,37 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService // OnStart implements cmn.Service by subscribing for all transactions // and indexing them by tags. func (is *IndexerService) OnStart() error { - ch := make(chan interface{}) - if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil { + blockHeadersCh := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil { return err } + + txsCh := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil { + return err + } + go func() { - for event := range ch { - // TODO: may be not perfomant to write one event at a time - txResult := event.(types.EventDataTx).TxResult - is.idr.Index(&txResult) + var numTxs, got int64 + var batch *Batch + for { + select { + case e := <-blockHeadersCh: + numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs + batch = NewBatch(numTxs) + case e := <-txsCh: + if batch == nil { + panic("Expected pubsub to send block header first, but got tx event") + } + txResult := e.(types.EventDataTx).TxResult + batch.Add(&txResult) + got++ + if numTxs == got { + is.idr.AddBatch(batch) + batch = nil + got = 0 + } + } } }() return nil diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 74a2dd7cb..a8537219d 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -190,7 +190,7 @@ func txResultWithTags(tags []cmn.KVPair) *types.TxResult { } } -func benchmarkTxIndex(txsCount int, b *testing.B) { +func benchmarkTxIndex(txsCount int64, b *testing.B) { tx := types.Tx("HELLO WORLD") txResult := &types.TxResult{ Height: 1, @@ -215,7 +215,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { indexer := NewTxIndex(store) batch := txindex.NewBatch(txsCount) - for i := 0; i < txsCount; i++ { + for i := int64(0); i < txsCount; i++ { if err := batch.Add(txResult); err != nil { b.Fatal(err) } From 6f7333fd5f764e3664cb146303c447c8fdf1ced6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 11 May 2018 20:26:24 +0400 Subject: [PATCH 6/8] fix tests --- state/txindex/indexer_service.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index dd12bdf9d..edcb362e6 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -45,10 +45,16 @@ func (is *IndexerService) OnStart() error { var batch *Batch for { select { - case e := <-blockHeadersCh: + case e, ok := <-blockHeadersCh: + if !ok { + return + } numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs batch = NewBatch(numTxs) - case e := <-txsCh: + case e, ok := <-txsCh: + if !ok { + return + } if batch == nil { panic("Expected pubsub to send block header first, but got tx event") } From 5e3a23df6d9d8e886970e3e17d65fad523cde30b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 14 May 2018 11:10:59 +0400 Subject: [PATCH 7/8] simplify indexer service main loop --- node/node.go | 1 + state/txindex/indexer_service.go | 30 +++++++++++------------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/node/node.go b/node/node.go index fdc466695..1bd382eb8 100644 --- a/node/node.go +++ b/node/node.go @@ -343,6 +343,7 @@ func NewNode(config *cfg.Config, } indexerService := txindex.NewIndexerService(txIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) // run the profile server profileHost := config.ProfListenAddress diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index edcb362e6..93e6269e8 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -41,32 +41,24 @@ func (is *IndexerService) OnStart() error { } go func() { - var numTxs, got int64 - var batch *Batch for { - select { - case e, ok := <-blockHeadersCh: + e, ok := <-blockHeadersCh + if !ok { + return + } + header := e.(types.EventDataNewBlockHeader).Header + batch := NewBatch(header.NumTxs) + for i := int64(0); i < header.NumTxs; i++ { + e, ok := <-txsCh if !ok { + is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i) return } - numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs - batch = NewBatch(numTxs) - case e, ok := <-txsCh: - if !ok { - return - } - if batch == nil { - panic("Expected pubsub to send block header first, but got tx event") - } txResult := e.(types.EventDataTx).TxResult batch.Add(&txResult) - got++ - if numTxs == got { - is.idr.AddBatch(batch) - batch = nil - got = 0 - } } + is.idr.AddBatch(batch) + is.Logger.Info("Indexed block", "height", header.Height) } }() return nil From d832bde280106dc911ba74b618a95caec81ed728 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 17 May 2018 10:43:38 +0400 Subject: [PATCH 8/8] update Vagrantfile --- Vagrantfile | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/Vagrantfile b/Vagrantfile index ac8da0cc1..095a6b061 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -10,31 +10,37 @@ Vagrant.configure("2") do |config| end config.vm.provision "shell", inline: <<-SHELL - # add docker repo - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - - add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu xenial stable" - - # and golang 1.9 support - # official repo doesn't have race detection runtime... - # add-apt-repository ppa:gophers/archive - add-apt-repository ppa:longsleep/golang-backports + apt-get update # install base requirements - apt-get update apt-get install -y --no-install-recommends wget curl jq zip \ make shellcheck bsdmainutils psmisc - apt-get install -y docker-ce golang-1.9-go apt-get install -y language-pack-en + # install docker + apt-get install -y --no-install-recommends apt-transport-https \ + ca-certificates curl software-properties-common + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - + add-apt-repository \ + "deb [arch=amd64] https://download.docker.com/linux/ubuntu \ + $(lsb_release -cs) \ + stable" + apt-get install -y docker-ce + usermod -a -G docker vagrant + + # install go + wget -q https://dl.google.com/go/go1.10.1.linux-amd64.tar.gz + tar -xvf go1.10.1.linux-amd64.tar.gz + mv go /usr/local + rm -f go1.10.1.linux-amd64.tar.gz + # cleanup apt-get autoremove -y - # needed for docker - usermod -a -G docker vagrant - # set env variables - echo 'export PATH=$PATH:/usr/lib/go-1.9/bin:/home/vagrant/go/bin' >> /home/vagrant/.bash_profile + echo 'export GOROOT=/usr/local/go' >> /home/vagrant/.bash_profile echo 'export GOPATH=/home/vagrant/go' >> /home/vagrant/.bash_profile + echo 'export PATH=$PATH:$GOROOT/bin:$GOPATH/bin' >> /home/vagrant/.bash_profile echo 'export LC_ALL=en_US.UTF-8' >> /home/vagrant/.bash_profile echo 'cd go/src/github.com/tendermint/tendermint' >> /home/vagrant/.bash_profile