From 165bb2abfe924be7ad02113e2130b11c9cd909fd Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 3 Feb 2019 16:42:54 +0400 Subject: [PATCH] wait 100ms before kicking a subscriber out + a test for indexer_service --- libs/pubsub/pubsub.go | 3 +- state/txindex/indexer_service.go | 22 ++++++--- state/txindex/indexer_service_test.go | 64 +++++++++++++++++++++++++++ types/event_bus_test.go | 3 +- 4 files changed, 82 insertions(+), 10 deletions(-) create mode 100644 state/txindex/indexer_service_test.go diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 9ad634306..b1b648236 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -38,6 +38,7 @@ import ( "context" "errors" "sync" + "time" cmn "github.com/tendermint/tendermint/libs/common" ) @@ -394,7 +395,7 @@ func (state *state) send(msg interface{}, tags map[string]string) { // don't block on buffered channels select { case subscription.out <- Message{msg, tags}: - default: + case <-time.After(100 * time.Millisecond): state.remove(clientID, qStr, ErrOutOfCapacity) } } diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 468b28f8b..7ecfeff17 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -51,22 +51,30 @@ func (is *IndexerService) OnStart() error { select { case msg2 := <-txsSub.Out(): txResult := msg2.Data().(types.EventDataTx).TxResult - batch.Add(&txResult) + if err = batch.Add(&txResult); err != nil { + is.Logger.Error("Can't add tx to batch", + "height", header.Height, + "index", txResult.Index, + "err", err) + } case <-txsSub.Cancelled(): - is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?", - "err", txsSub.Err(), + is.Logger.Error("Failed to index block. txsSub was cancelled. Did the Tendermint stop?", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i, + "err", txsSub.Err(), ) return } } - is.idr.AddBatch(batch) - is.Logger.Info("Indexed block", "height", header.Height) + if err = is.idr.AddBatch(batch); err != nil { + is.Logger.Error("Failed to index block", "height", header.Height, "err", err) + } else { + is.Logger.Info("Indexed block", "height", header.Height) + } case <-blockHeadersSub.Cancelled(): - is.Logger.Error("Failed to index a block. blockHeadersSub was cancelled. Did the Tendermint stop?", - "reason", blockHeadersSub.Err()) + is.Logger.Error("blockHeadersSub was cancelled. Did the Tendermint stop?", + "err", blockHeadersSub.Err()) return } } diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go new file mode 100644 index 000000000..982d7b8c4 --- /dev/null +++ b/state/txindex/indexer_service_test.go @@ -0,0 +1,64 @@ +package txindex_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/state/txindex/kv" + "github.com/tendermint/tendermint/types" +) + +func TestIndexerServiceIndexesBlocks(t *testing.T) { + // event bus + eventBus := types.NewEventBus() + eventBus.SetLogger(log.TestingLogger()) + err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + // tx indexer + store := db.NewMemDB() + txIndexer := kv.NewTxIndex(store, kv.IndexAllTags()) + + service := txindex.NewIndexerService(txIndexer, eventBus) + service.SetLogger(log.TestingLogger()) + err = service.Start() + require.NoError(t, err) + defer service.Stop() + + // publish block with txs + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1, NumTxs: 2}, + }) + txResult1 := &types.TxResult{ + Height: 1, + Index: uint32(0), + Tx: types.Tx("foo"), + Result: abci.ResponseDeliverTx{Code: 0}, + } + eventBus.PublishEventTx(types.EventDataTx{*txResult1}) + txResult2 := &types.TxResult{ + Height: 1, + Index: uint32(1), + Tx: types.Tx("bar"), + Result: abci.ResponseDeliverTx{Code: 0}, + } + eventBus.PublishEventTx(types.EventDataTx{*txResult2}) + + time.Sleep(100 * time.Millisecond) + + // check the result + res, err := txIndexer.Get(types.Tx("foo").Hash()) + assert.NoError(t, err) + assert.Equal(t, txResult1, res) + res, err = txIndexer.Get(types.Tx("bar").Hash()) + assert.NoError(t, err) + assert.Equal(t, txResult2, res) +} diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 0c72a442e..be3f2315f 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -139,8 +139,7 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) defer eventBus.Stop() - // FIXME: the test fails without a buffer - sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, 14) + sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}) require.NoError(t, err) const numEventsExpected = 14