mirror of
https://github.com/tendermint/tendermint.git
synced 2026-06-09 07:42:38 +00:00
test fix
This commit is contained in:
@@ -3,6 +3,7 @@ package inspect
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
@@ -92,13 +93,24 @@ func (inspect *Inspect) Run(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer inspect.eventBus.Stop()
|
||||
defer func() {
|
||||
err := inspect.eventBus.Stop()
|
||||
if err != nil {
|
||||
inspect.logger.Error("event bus stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = inspect.indexerService.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer inspect.indexerService.Stop()
|
||||
defer func() {
|
||||
fmt.Println("stopping indexer")
|
||||
err := inspect.indexerService.Stop()
|
||||
if err != nil {
|
||||
inspect.logger.Error("indexer stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
return startRPCServers(ctx, inspect.rpcConfig, inspect.logger, inspect.routes)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
@@ -44,8 +45,14 @@ func TestInspectRun(t *testing.T) {
|
||||
d, err := inspect.NewFromConfig(config)
|
||||
require.NoError(t, err)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go d.Run(ctx)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, d.Run(ctx))
|
||||
}()
|
||||
cancel()
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
}
|
||||
@@ -63,12 +70,18 @@ func TestInspectServeInfoRPC(t *testing.T) {
|
||||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{})
|
||||
blockStoreMock.On("LoadBlock", testHeight).Return(testBlock)
|
||||
eventSinkMock := &indexer_mocks.EventSink{}
|
||||
eventSinkMock.On("Stop").Return(nil)
|
||||
|
||||
rpcConfig := config.TestRPCConfig()
|
||||
l := log.TestingLogger()
|
||||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go d.Run(ctx)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, d.Run(ctx))
|
||||
}()
|
||||
requireConnect(t, rpcConfig.ListenAddress, 15)
|
||||
cli, err := http_client.New(rpcConfig.ListenAddress)
|
||||
require.NoError(t, err)
|
||||
@@ -77,6 +90,7 @@ func TestInspectServeInfoRPC(t *testing.T) {
|
||||
require.Equal(t, testBlock.Height, resultBlock.Block.Height)
|
||||
require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash)
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
blockStoreMock.AssertExpectations(t)
|
||||
stateStoreMock.AssertExpectations(t)
|
||||
@@ -95,6 +109,7 @@ func TestInspectTxSearch(t *testing.T) {
|
||||
stateStoreMock := &state_mocks.Store{}
|
||||
blockStoreMock := &state_mocks.BlockStore{}
|
||||
eventSinkMock := &indexer_mocks.EventSink{}
|
||||
eventSinkMock.On("Stop").Return(nil)
|
||||
eventSinkMock.On("Type").Return(indexer.KV)
|
||||
eventSinkMock.On("SearchTxEvents", mock.Anything, mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })).
|
||||
Return([]*abci_types.TxResult{testTxResult}, nil)
|
||||
@@ -103,7 +118,12 @@ func TestInspectTxSearch(t *testing.T) {
|
||||
l := log.TestingLogger()
|
||||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go d.Run(ctx)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, d.Run(ctx))
|
||||
}()
|
||||
requireConnect(t, rpcConfig.ListenAddress, 15)
|
||||
cli, err := http_client.New(rpcConfig.ListenAddress)
|
||||
require.NoError(t, err)
|
||||
@@ -115,8 +135,11 @@ func TestInspectTxSearch(t *testing.T) {
|
||||
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx)
|
||||
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
eventSinkMock.AssertExpectations(t)
|
||||
stateStoreMock.AssertExpectations(t)
|
||||
blockStoreMock.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func requireConnect(t testing.TB, addr string, retries int) {
|
||||
|
||||
@@ -20,6 +20,8 @@ type Service struct {
|
||||
|
||||
eventSinks []EventSink
|
||||
eventBus *types.EventBus
|
||||
|
||||
doneChan chan struct{}
|
||||
}
|
||||
|
||||
// NewIndexerService returns a new service instance.
|
||||
@@ -27,6 +29,7 @@ func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service {
|
||||
|
||||
is := &Service{eventSinks: es, eventBus: eventBus}
|
||||
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
|
||||
is.doneChan = make(chan struct{})
|
||||
return is
|
||||
}
|
||||
|
||||
@@ -51,43 +54,47 @@ func (is *Service) OnStart() error {
|
||||
|
||||
go func() {
|
||||
for {
|
||||
msg := <-blockHeadersSub.Out()
|
||||
select {
|
||||
case <-is.doneChan:
|
||||
return
|
||||
case msg := <-blockHeadersSub.Out():
|
||||
|
||||
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
|
||||
height := eventDataHeader.Header.Height
|
||||
batch := NewBatch(eventDataHeader.NumTxs)
|
||||
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
|
||||
height := eventDataHeader.Header.Height
|
||||
batch := NewBatch(eventDataHeader.NumTxs)
|
||||
|
||||
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
|
||||
msg2 := <-txsSub.Out()
|
||||
txResult := msg2.Data().(types.EventDataTx).TxResult
|
||||
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
|
||||
msg2 := <-txsSub.Out()
|
||||
txResult := msg2.Data().(types.EventDataTx).TxResult
|
||||
|
||||
if err = batch.Add(&txResult); err != nil {
|
||||
is.Logger.Error(
|
||||
"failed to add tx to batch",
|
||||
"height", height,
|
||||
"index", txResult.Index,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if !IndexingEnabled(is.eventSinks) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sink := range is.eventSinks {
|
||||
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
|
||||
is.Logger.Error("failed to index block", "height", height, "err", err)
|
||||
} else {
|
||||
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
|
||||
if err = batch.Add(&txResult); err != nil {
|
||||
is.Logger.Error(
|
||||
"failed to add tx to batch",
|
||||
"height", height,
|
||||
"index", txResult.Index,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if len(batch.Ops) > 0 {
|
||||
err := sink.IndexTxEvents(batch.Ops)
|
||||
if err != nil {
|
||||
is.Logger.Error("failed to index block txs", "height", height, "err", err)
|
||||
if !IndexingEnabled(is.eventSinks) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sink := range is.eventSinks {
|
||||
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
|
||||
is.Logger.Error("failed to index block", "height", height, "err", err)
|
||||
} else {
|
||||
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
|
||||
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
|
||||
}
|
||||
|
||||
if len(batch.Ops) > 0 {
|
||||
err := sink.IndexTxEvents(batch.Ops)
|
||||
if err != nil {
|
||||
is.Logger.Error("failed to index block txs", "height", height, "err", err)
|
||||
} else {
|
||||
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -108,6 +115,7 @@ func (is *Service) OnStop() {
|
||||
is.Logger.Error("failed to close eventsink", "eventsink", sink.Type(), "err", err)
|
||||
}
|
||||
}
|
||||
close(is.doneChan)
|
||||
}
|
||||
|
||||
// KVSinkEnabled returns the given eventSinks is containing KVEventSink.
|
||||
|
||||
Reference in New Issue
Block a user