mirror of
https://github.com/tendermint/tendermint.git
synced 2025-12-23 14:25:19 +00:00
indexer: move deduplication functionality purely to the kvindexer (#9473)
(cherry picked from commit 4fd19a275e)
This commit is contained in:
@@ -303,7 +303,7 @@ func createAndStartIndexerService(
|
||||
blockIndexer = &blockidxnull.BlockerIndexer{}
|
||||
}
|
||||
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
indexerService.SetLogger(logger.With("module", "txindex"))
|
||||
|
||||
if err := indexerService.Start(); err != nil {
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
// Register the Postgres database driver.
|
||||
@@ -196,6 +197,55 @@ func TestIndexing(t *testing.T) {
|
||||
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("IndexerService", func(t *testing.T) {
|
||||
indexer := &EventSink{store: testDB(), chainID: chainID}
|
||||
|
||||
// event bus
|
||||
eventBus := types.NewEventBus()
|
||||
err := eventBus.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := eventBus.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true)
|
||||
err = service.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := service.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// publish block with txs
|
||||
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
|
||||
Header: types.Header{Height: 1},
|
||||
NumTxs: int64(2),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
txResult1 := &abci.TxResult{
|
||||
Height: 1,
|
||||
Index: uint32(0),
|
||||
Tx: types.Tx("foo"),
|
||||
Result: abci.ResponseDeliverTx{Code: 0},
|
||||
}
|
||||
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
|
||||
require.NoError(t, err)
|
||||
txResult2 := &abci.TxResult{
|
||||
Height: 1,
|
||||
Index: uint32(1),
|
||||
Tx: types.Tx("bar"),
|
||||
Result: abci.ResponseDeliverTx{Code: 1},
|
||||
}
|
||||
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.True(t, service.IsRunning())
|
||||
})
|
||||
}
|
||||
|
||||
func TestStop(t *testing.T) {
|
||||
|
||||
@@ -3,7 +3,6 @@ package txindex
|
||||
import (
|
||||
"context"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -20,9 +19,10 @@ const (
|
||||
type IndexerService struct {
|
||||
service.BaseService
|
||||
|
||||
txIdxr TxIndexer
|
||||
blockIdxr indexer.BlockIndexer
|
||||
eventBus *types.EventBus
|
||||
txIdxr TxIndexer
|
||||
blockIdxr indexer.BlockIndexer
|
||||
eventBus *types.EventBus
|
||||
terminateOnError bool
|
||||
}
|
||||
|
||||
// NewIndexerService returns a new service instance.
|
||||
@@ -30,9 +30,10 @@ func NewIndexerService(
|
||||
txIdxr TxIndexer,
|
||||
blockIdxr indexer.BlockIndexer,
|
||||
eventBus *types.EventBus,
|
||||
terminateOnError bool,
|
||||
) *IndexerService {
|
||||
|
||||
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
|
||||
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError}
|
||||
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
|
||||
return is
|
||||
}
|
||||
@@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error {
|
||||
"index", txResult.Index,
|
||||
"err", err,
|
||||
)
|
||||
|
||||
if is.terminateOnError {
|
||||
if err := is.Stop(); err != nil {
|
||||
is.Logger.Error("failed to stop", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
|
||||
is.Logger.Error("failed to index block", "height", height, "err", err)
|
||||
if is.terminateOnError {
|
||||
if err := is.Stop(); err != nil {
|
||||
is.Logger.Error("failed to stop", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
} else {
|
||||
is.Logger.Info("indexed block", "height", height)
|
||||
}
|
||||
|
||||
batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
|
||||
if err != nil {
|
||||
is.Logger.Error("deduplicate batch", "height", height)
|
||||
is.Logger.Info("indexed block exents", "height", height)
|
||||
}
|
||||
|
||||
if err = is.txIdxr.AddBatch(batch); err != nil {
|
||||
is.Logger.Error("failed to index block txs", "height", height, "err", err)
|
||||
if is.terminateOnError {
|
||||
if err := is.Stop(); err != nil {
|
||||
is.Logger.Error("failed to stop", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
} else {
|
||||
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
|
||||
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() {
|
||||
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
// DeduplicateBatch consider the case of duplicate txs.
|
||||
// if the current one under investigation is NOT OK, then we need to check
|
||||
// whether there's a previously indexed tx.
|
||||
// SKIP the current tx if the previously indexed record is found and successful.
|
||||
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
|
||||
result := make([]*abci.TxResult, 0, len(ops))
|
||||
|
||||
// keep track of successful txs in this block in order to suppress latter ones being indexed.
|
||||
var successfulTxsInThisBlock = make(map[string]struct{})
|
||||
|
||||
for _, txResult := range ops {
|
||||
hash := types.Tx(txResult.Tx).Hash()
|
||||
|
||||
if txResult.Result.IsOK() {
|
||||
successfulTxsInThisBlock[string(hash)] = struct{}{}
|
||||
} else {
|
||||
// if it already appeared in current block and was successful, skip.
|
||||
if _, found := successfulTxsInThisBlock[string(hash)]; found {
|
||||
continue
|
||||
}
|
||||
|
||||
// check if this tx hash is already indexed
|
||||
old, err := txIdxr.Get(hash)
|
||||
|
||||
// if db op errored
|
||||
// Not found is not an error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if it's already indexed in an older block and was successful, skip.
|
||||
if old != nil && old.Result.Code == abci.CodeTypeOK {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
result = append(result, txResult)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
|
||||
txIndexer := kv.NewTxIndex(store)
|
||||
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
|
||||
|
||||
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
|
||||
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
service.SetLogger(log.TestingLogger())
|
||||
err = service.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -79,164 +79,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, txResult2, res)
|
||||
}
|
||||
|
||||
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
|
||||
var mockTx = types.Tx("MOCK_TX_HASH")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tx1 abci.TxResult
|
||||
tx2 abci.TxResult
|
||||
expSkip bool // do we expect the second tx to be skipped by tx indexer
|
||||
}{
|
||||
{"skip, previously successful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{"not skip, previously unsuccessful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{"not skip, both successful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{"not skip, both unsuccessful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{"skip, same block, previously successful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{"not skip, same block, previously unsuccessful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
indexer := kv.NewTxIndex(db.NewMemDB())
|
||||
|
||||
if tc.tx1.Height != tc.tx2.Height {
|
||||
// index the first tx
|
||||
err := indexer.AddBatch(&txindex.Batch{
|
||||
Ops: []*abci.TxResult{&tc.tx1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if the second one should be skipped.
|
||||
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
|
||||
require.NoError(t, err)
|
||||
|
||||
if tc.expSkip {
|
||||
require.Empty(t, ops)
|
||||
} else {
|
||||
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
|
||||
}
|
||||
} else {
|
||||
// same block
|
||||
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
|
||||
ops, err := txindex.DeduplicateBatch(ops, indexer)
|
||||
require.NoError(t, err)
|
||||
if tc.expSkip {
|
||||
// the second one is skipped
|
||||
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
|
||||
} else {
|
||||
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,12 +101,30 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
|
||||
// that indexed from the tx's events is a composite of the event type and the
|
||||
// respective attribute's key delimited by a "." (eg. "account.number").
|
||||
// Any event with an empty type is not indexed.
|
||||
//
|
||||
// If a transaction is indexed with the same hash as a previous transaction, it will
|
||||
// be overwritten unless the tx result was NOT OK and the prior result was OK i.e.
|
||||
// more transactions that successfully executed overwrite transactions that failed
|
||||
// or successful yet older transactions.
|
||||
func (txi *TxIndex) Index(result *abci.TxResult) error {
|
||||
b := txi.store.NewBatch()
|
||||
defer b.Close()
|
||||
|
||||
hash := types.Tx(result.Tx).Hash()
|
||||
|
||||
if !result.Result.IsOK() {
|
||||
oldResult, err := txi.Get(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if the new transaction failed and it's already indexed in an older block and was successful
|
||||
// we skip it as we want users to get the older successful transaction when they query.
|
||||
if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// index tx by events
|
||||
err := txi.indexEvents(result, hash, b)
|
||||
if err != nil {
|
||||
|
||||
@@ -258,6 +258,103 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
|
||||
var mockTx = types.Tx("MOCK_TX_HASH")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tx1 *abci.TxResult
|
||||
tx2 *abci.TxResult
|
||||
expOverwrite bool // do we expect the second tx to overwrite the first tx
|
||||
}{
|
||||
{
|
||||
"don't overwrite as a non-zero code was returned and the previous tx was successful",
|
||||
&abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
&abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"overwrite as the previous tx was also unsuccessful",
|
||||
&abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
&abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"overwrite as the most recent tx was successful",
|
||||
&abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
&abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
}
|
||||
|
||||
hash := mockTx.Hash()
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
indexer := NewTxIndex(db.NewMemDB())
|
||||
|
||||
// index the first tx
|
||||
err := indexer.Index(tc.tx1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// index the same tx with different results
|
||||
err = indexer.Index(tc.tx2)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := indexer.Get(hash)
|
||||
require.NoError(t, err)
|
||||
|
||||
if tc.expOverwrite {
|
||||
require.Equal(t, tc.tx2, res)
|
||||
} else {
|
||||
require.Equal(t, tc.tx1, res)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxSearchMultipleTxs(t *testing.T) {
|
||||
indexer := NewTxIndex(db.NewMemDB())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user