diff --git a/Makefile b/Makefile index 4601c970c..05d1889f0 100755 --- a/Makefile +++ b/Makefile @@ -194,7 +194,7 @@ build-linux: GOOS=linux GOARCH=amd64 $(MAKE) build # Run a 4-node testnet locally -localnet-start: +localnet-start: localnet-stop @if ! [ -f build/node0/config/genesis.json ]; then docker run --rm -v $(CURDIR)/build:/tendermint:Z tendermint/localnode testnet --v 4 --o . --populate-persistent-peers --starting-ip-address 192.167.10.2 ; fi docker-compose up diff --git a/Vagrantfile b/Vagrantfile index ac8da0cc1..095a6b061 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -10,31 +10,37 @@ Vagrant.configure("2") do |config| end config.vm.provision "shell", inline: <<-SHELL - # add docker repo - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - - add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu xenial stable" - - # and golang 1.9 support - # official repo doesn't have race detection runtime... - # add-apt-repository ppa:gophers/archive - add-apt-repository ppa:longsleep/golang-backports + apt-get update # install base requirements - apt-get update apt-get install -y --no-install-recommends wget curl jq zip \ make shellcheck bsdmainutils psmisc - apt-get install -y docker-ce golang-1.9-go apt-get install -y language-pack-en + # install docker + apt-get install -y --no-install-recommends apt-transport-https \ + ca-certificates curl software-properties-common + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - + add-apt-repository \ + "deb [arch=amd64] https://download.docker.com/linux/ubuntu \ + $(lsb_release -cs) \ + stable" + apt-get install -y docker-ce + usermod -a -G docker vagrant + + # install go + wget -q https://dl.google.com/go/go1.10.1.linux-amd64.tar.gz + tar -xvf go1.10.1.linux-amd64.tar.gz + mv go /usr/local + rm -f go1.10.1.linux-amd64.tar.gz + # cleanup apt-get autoremove -y - # needed for docker - usermod -a -G docker vagrant - # set env variables - echo 'export PATH=$PATH:/usr/lib/go-1.9/bin:/home/vagrant/go/bin' >> /home/vagrant/.bash_profile + echo 'export GOROOT=/usr/local/go' >> /home/vagrant/.bash_profile echo 'export GOPATH=/home/vagrant/go' >> /home/vagrant/.bash_profile + echo 'export PATH=$PATH:$GOROOT/bin:$GOPATH/bin' >> /home/vagrant/.bash_profile echo 'export LC_ALL=en_US.UTF-8' >> /home/vagrant/.bash_profile echo 'cd go/src/github.com/tendermint/tendermint' >> /home/vagrant/.bash_profile diff --git a/mempool/mempool.go b/mempool/mempool.go index ec4f98478..aa2aa4f41 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -3,6 +3,7 @@ package mempool import ( "bytes" "container/list" + "fmt" "sync" "sync/atomic" "time" @@ -255,11 +256,11 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { tx: tx, } mem.txs.PushBack(memTx) - mem.logger.Info("Added good transaction", "tx", tx, "res", r) + mem.logger.Info("Added good transaction", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "res", r) mem.notifyTxsAvailable() } else { // ignore bad transaction - mem.logger.Info("Rejected bad transaction", "tx", tx, "res", r) + mem.logger.Info("Rejected bad transaction", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "res", r) // remove from cache (it might be good later) mem.cache.Remove(tx) diff --git a/node/node.go b/node/node.go index fdc466695..1bd382eb8 100644 --- a/node/node.go +++ b/node/node.go @@ -343,6 +343,7 @@ func NewNode(config *cfg.Config, } indexerService := txindex.NewIndexerService(txIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) // run the profile server profileHost := config.ProfListenAddress diff --git a/state/execution.go b/state/execution.go index 0ce5e44f1..3fe35e2fa 100644 --- a/state/execution.go +++ b/state/execution.go @@ -341,23 +341,17 @@ func updateState(s State, blockID types.BlockID, header *types.Header, // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { - // NOTE: do we still need this buffer ? - txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs)) + eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + for i, tx := range block.Data.Txs { - txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{ + eventBus.PublishEventTx(types.EventDataTx{types.TxResult{ Height: block.Height, Index: uint32(i), Tx: tx, Result: *(abciResponses.DeliverTx[i]), }}) } - - eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) - err := txEventBuffer.Flush() - if err != nil { - logger.Error("Failed to flush event buffer", "err", err) - } } //---------------------------------------------------------------------------------------------------- diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index bd51fbb29..e23840f14 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -34,7 +34,7 @@ type Batch struct { } // NewBatch creates a new Batch. -func NewBatch(n int) *Batch { +func NewBatch(n int64) *Batch { return &Batch{ Ops: make([]*types.TxResult, n), } diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index f5420f631..93e6269e8 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -11,6 +11,8 @@ const ( subscriber = "IndexerService" ) +// IndexerService connects event bus and transaction indexer together in order +// to index transactions coming from event bus. type IndexerService struct { cmn.BaseService @@ -18,6 +20,7 @@ type IndexerService struct { eventBus *types.EventBus } +// NewIndexerService returns a new service instance. func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService { is := &IndexerService{idr: idr, eventBus: eventBus} is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is) @@ -27,15 +30,35 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService // OnStart implements cmn.Service by subscribing for all transactions // and indexing them by tags. func (is *IndexerService) OnStart() error { - ch := make(chan interface{}) - if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil { + blockHeadersCh := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil { return err } + + txsCh := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil { + return err + } + go func() { - for event := range ch { - // TODO: may be not perfomant to write one event at a time - txResult := event.(types.EventDataTx).TxResult - is.idr.Index(&txResult) + for { + e, ok := <-blockHeadersCh + if !ok { + return + } + header := e.(types.EventDataNewBlockHeader).Header + batch := NewBatch(header.NumTxs) + for i := int64(0); i < header.NumTxs; i++ { + e, ok := <-txsCh + if !ok { + is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i) + return + } + txResult := e.(types.EventDataTx).TxResult + batch.Add(&txResult) + } + is.idr.AddBatch(batch) + is.Logger.Info("Indexed block", "height", header.Height) } }() return nil diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 74a2dd7cb..a8537219d 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -190,7 +190,7 @@ func txResultWithTags(tags []cmn.KVPair) *types.TxResult { } } -func benchmarkTxIndex(txsCount int, b *testing.B) { +func benchmarkTxIndex(txsCount int64, b *testing.B) { tx := types.Tx("HELLO WORLD") txResult := &types.TxResult{ Height: 1, @@ -215,7 +215,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { indexer := NewTxIndex(store) batch := txindex.NewBatch(txsCount) - for i := 0; i < txsCount; i++ { + for i := int64(0); i < txsCount; i++ { if err := batch.Add(txResult); err != nil { b.Fatal(err) }