indexer: move deduplication functionality purely to the kvindexer (backport #9473) (#9521)

This commit is contained in:
mergify[bot]
2022-10-10 10:47:31 +02:00
committed by GitHub
parent 430afb23e7
commit 1d160a5a86
9 changed files with 196 additions and 219 deletions

View File

@@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
// Register the Postgres database driver.
@@ -197,6 +198,55 @@ func TestIndexing(t *testing.T) {
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
})
t.Run("IndexerService", func(t *testing.T) {
indexer := &EventSink{store: testDB(), chainID: chainID}
// event bus
eventBus := types.NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})
service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true)
err = service.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := service.Stop(); err != nil {
t.Error(err)
}
})
// publish block with txs
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
NumTxs: int64(2),
})
require.NoError(t, err)
txResult1 := &abci.TxResult{
Height: 1,
Index: uint32(0),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
require.NoError(t, err)
txResult2 := &abci.TxResult{
Height: 1,
Index: uint32(1),
Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 1},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
require.True(t, service.IsRunning())
})
}
func TestStop(t *testing.T) {

View File

@@ -3,7 +3,6 @@ package txindex
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
@@ -20,9 +19,10 @@ const (
type IndexerService struct {
service.BaseService
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
terminateOnError bool
}
// NewIndexerService returns a new service instance.
@@ -30,9 +30,10 @@ func NewIndexerService(
txIdxr TxIndexer,
blockIdxr indexer.BlockIndexer,
eventBus *types.EventBus,
terminateOnError bool,
) *IndexerService {
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
@@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error {
"index", txResult.Index,
"err", err,
)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
}
}
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Info("indexed block", "height", height)
}
batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
if err != nil {
is.Logger.Error("deduplicate batch", "height", height)
is.Logger.Info("indexed block exents", "height", height)
}
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
}
}
}()
@@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}
// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))
// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})
for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()
if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}
// check if this tx hash is already indexed
old, err := txIdxr.Get(hash)
// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}
// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}
result = append(result, txResult)
}
return result, nil
}

View File

@@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
@@ -79,164 +79,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, txResult2, res)
}
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := kv.NewTxIndex(db.NewMemDB())
if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := indexer.AddBatch(&txindex.Batch{
Ops: []*abci.TxResult{&tc.tx1},
})
require.NoError(t, err)
// check if the second one should be skipped.
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
require.NoError(t, err)
if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := txindex.DeduplicateBatch(ops, indexer)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}

View File

@@ -101,12 +101,30 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
// that indexed from the tx's events is a composite of the event type and the
// respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
//
// If a transaction is indexed with the same hash as a previous transaction, it will
// be overwritten unless the tx result was NOT OK and the prior result was OK i.e.
// more transactions that successfully executed overwrite transactions that failed
// or successful yet older transactions.
func (txi *TxIndex) Index(result *abci.TxResult) error {
b := txi.store.NewBatch()
defer b.Close()
hash := types.Tx(result.Tx).Hash()
if !result.Result.IsOK() {
oldResult, err := txi.Get(hash)
if err != nil {
return err
}
// if the new transaction failed and it's already indexed in an older block and was successful
// we skip it as we want users to get the older successful transaction when they query.
if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK {
return nil
}
}
// index tx by events
err := txi.indexEvents(result, hash, b)
if err != nil {

View File

@@ -259,6 +259,103 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
}
}
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 *abci.TxResult
tx2 *abci.TxResult
expOverwrite bool // do we expect the second tx to overwrite the first tx
}{
{
"don't overwrite as a non-zero code was returned and the previous tx was successful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{
"overwrite as the previous tx was also unsuccessful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{
"overwrite as the most recent tx was successful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
true,
},
}
hash := mockTx.Hash()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())
// index the first tx
err := indexer.Index(tc.tx1)
require.NoError(t, err)
// index the same tx with different results
err = indexer.Index(tc.tx2)
require.NoError(t, err)
res, err := indexer.Get(hash)
require.NoError(t, err)
if tc.expOverwrite {
require.Equal(t, tc.tx2, res)
} else {
require.Equal(t, tc.tx1, res)
}
})
}
}
func TestTxSearchMultipleTxs(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())