From a484d0f3a4a7b5a1c5cc7747409980c6ba0b8fdf Mon Sep 17 00:00:00 2001 From: William Banfield Date: Thu, 12 Aug 2021 13:05:51 -0400 Subject: [PATCH] test fix --- inspect/inspect.go | 16 +++++++- inspect/inspect_test.go | 29 +++++++++++-- state/indexer/indexer_service.go | 70 ++++++++++++++++++-------------- 3 files changed, 79 insertions(+), 36 deletions(-) diff --git a/inspect/inspect.go b/inspect/inspect.go index c43244633..0c57fcbf2 100644 --- a/inspect/inspect.go +++ b/inspect/inspect.go @@ -3,6 +3,7 @@ package inspect import ( "context" "errors" + "fmt" "net" "sync" @@ -92,13 +93,24 @@ func (inspect *Inspect) Run(ctx context.Context) error { if err != nil { return err } - defer inspect.eventBus.Stop() + defer func() { + err := inspect.eventBus.Stop() + if err != nil { + inspect.logger.Error("event bus stopped with error", "err", err) + } + }() err = inspect.indexerService.Start() if err != nil { return err } - defer inspect.indexerService.Stop() + defer func() { + fmt.Println("stopping indexer") + err := inspect.indexerService.Stop() + if err != nil { + inspect.logger.Error("indexer stopped with error", "err", err) + } + }() return startRPCServers(ctx, inspect.rpcConfig, inspect.logger, inspect.routes) } diff --git a/inspect/inspect_test.go b/inspect/inspect_test.go index 377fb7f24..2d9ea8b8f 100644 --- a/inspect/inspect_test.go +++ b/inspect/inspect_test.go @@ -6,6 +6,7 @@ import ( "net" "os" "strings" + "sync" "testing" "github.com/fortytw2/leaktest" @@ -44,8 +45,14 @@ func TestInspectRun(t *testing.T) { d, err := inspect.NewFromConfig(config) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - go d.Run(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() cancel() + wg.Wait() }) } @@ -63,12 +70,18 @@ func TestInspectServeInfoRPC(t *testing.T) { blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}) blockStoreMock.On("LoadBlock", testHeight).Return(testBlock) eventSinkMock := &indexer_mocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) rpcConfig := config.TestRPCConfig() l := log.TestingLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) - go d.Run(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() requireConnect(t, rpcConfig.ListenAddress, 15) cli, err := http_client.New(rpcConfig.ListenAddress) require.NoError(t, err) @@ -77,6 +90,7 @@ func TestInspectServeInfoRPC(t *testing.T) { require.Equal(t, testBlock.Height, resultBlock.Block.Height) require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash) cancel() + wg.Wait() blockStoreMock.AssertExpectations(t) stateStoreMock.AssertExpectations(t) @@ -95,6 +109,7 @@ func TestInspectTxSearch(t *testing.T) { stateStoreMock := &state_mocks.Store{} blockStoreMock := &state_mocks.BlockStore{} eventSinkMock := &indexer_mocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) eventSinkMock.On("Type").Return(indexer.KV) eventSinkMock.On("SearchTxEvents", mock.Anything, mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). Return([]*abci_types.TxResult{testTxResult}, nil) @@ -103,7 +118,12 @@ func TestInspectTxSearch(t *testing.T) { l := log.TestingLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) - go d.Run(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() requireConnect(t, rpcConfig.ListenAddress, 15) cli, err := http_client.New(rpcConfig.ListenAddress) require.NoError(t, err) @@ -115,8 +135,11 @@ func TestInspectTxSearch(t *testing.T) { require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx) cancel() + wg.Wait() eventSinkMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) + blockStoreMock.AssertExpectations(t) } func requireConnect(t testing.TB, addr string, retries int) { diff --git a/state/indexer/indexer_service.go b/state/indexer/indexer_service.go index a429b66a0..01d72bf9a 100644 --- a/state/indexer/indexer_service.go +++ b/state/indexer/indexer_service.go @@ -20,6 +20,8 @@ type Service struct { eventSinks []EventSink eventBus *types.EventBus + + doneChan chan struct{} } // NewIndexerService returns a new service instance. @@ -27,6 +29,7 @@ func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service { is := &Service{eventSinks: es, eventBus: eventBus} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) + is.doneChan = make(chan struct{}) return is } @@ -51,43 +54,47 @@ func (is *Service) OnStart() error { go func() { for { - msg := <-blockHeadersSub.Out() + select { + case <-is.doneChan: + return + case msg := <-blockHeadersSub.Out(): - eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) - height := eventDataHeader.Header.Height - batch := NewBatch(eventDataHeader.NumTxs) + eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) + height := eventDataHeader.Header.Height + batch := NewBatch(eventDataHeader.NumTxs) - for i := int64(0); i < eventDataHeader.NumTxs; i++ { - msg2 := <-txsSub.Out() - txResult := msg2.Data().(types.EventDataTx).TxResult + for i := int64(0); i < eventDataHeader.NumTxs; i++ { + msg2 := <-txsSub.Out() + txResult := msg2.Data().(types.EventDataTx).TxResult - if err = batch.Add(&txResult); err != nil { - is.Logger.Error( - "failed to add tx to batch", - "height", height, - "index", txResult.Index, - "err", err, - ) - } - } - - if !IndexingEnabled(is.eventSinks) { - continue - } - - for _, sink := range is.eventSinks { - if err := sink.IndexBlockEvents(eventDataHeader); err != nil { - is.Logger.Error("failed to index block", "height", height, "err", err) - } else { - is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + if err = batch.Add(&txResult); err != nil { + is.Logger.Error( + "failed to add tx to batch", + "height", height, + "index", txResult.Index, + "err", err, + ) + } } - if len(batch.Ops) > 0 { - err := sink.IndexTxEvents(batch.Ops) - if err != nil { - is.Logger.Error("failed to index block txs", "height", height, "err", err) + if !IndexingEnabled(is.eventSinks) { + continue + } + + for _, sink := range is.eventSinks { + if err := sink.IndexBlockEvents(eventDataHeader); err != nil { + is.Logger.Error("failed to index block", "height", height, "err", err) } else { - is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + } + + if len(batch.Ops) > 0 { + err := sink.IndexTxEvents(batch.Ops) + if err != nil { + is.Logger.Error("failed to index block txs", "height", height, "err", err) + } else { + is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + } } } } @@ -108,6 +115,7 @@ func (is *Service) OnStop() { is.Logger.Error("failed to close eventsink", "eventsink", sink.Type(), "err", err) } } + close(is.doneChan) } // KVSinkEnabled returns the given eventSinks is containing KVEventSink.