diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 06eed7004..b02ab5343 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -24,4 +24,6 @@ Special thanks to external contributors on this release: ### IMPROVEMENTS +- (indexer) \#8625 Fix overriding tx index of duplicated txs. + ### BUG FIXES diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index c00b1e54b..9715be876 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -4,6 +4,7 @@ import ( "context" "time" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" @@ -108,7 +109,15 @@ func (is *Service) OnStart() error { if len(batch.Ops) > 0 { start := time.Now() - err := sink.IndexTxEvents(batch.Ops) + + var err error + batch.Ops, err = DeduplicateBatch(batch.Ops, sink) + if err != nil { + is.Logger.Error("failed to deduplicate batch", "height", height, "error", err, "sink", sink.Type()) + continue + } + + err = sink.IndexTxEvents(batch.Ops) if err != nil { is.Logger.Error("failed to index block txs", "height", height, "err", err) } else { @@ -167,3 +176,45 @@ func IndexingEnabled(sinks []EventSink) bool { return false } + +// DeduplicateBatch consider the case of duplicate txs. +// if the current one under investigation is NOT OK, then we need to check +// whether there's a previously indexed tx. +// SKIP the current tx if the previously indexed record is found and successful. +func DeduplicateBatch(ops []*abci.TxResult, sink EventSink) ([]*abci.TxResult, error) { + result := make([]*abci.TxResult, 0, len(ops)) + + // keep track of successful txs in this block in order to suppress latter ones being indexed. + var successfulTxsInThisBlock = make(map[string]struct{}) + + for _, txResult := range ops { + hash := types.Tx(txResult.Tx).Hash() + + if txResult.Result.IsOK() { + successfulTxsInThisBlock[string(hash)] = struct{}{} + } else { + // if it already appeared in current block and was successful, skip. + if _, found := successfulTxsInThisBlock[string(hash)]; found { + continue + } + + // check if this tx hash is already indexed + old, err := sink.GetTxByHash(hash) + + // if db op errored + // Not found is not an error + if err != nil { + return nil, err + } + + // if it's already indexed in an older block and was successful, skip. + if old != nil && old.Result.Code == abci.CodeTypeOK { + continue + } + } + + result = append(result, txResult) + } + + return result, nil +} diff --git a/internal/state/indexer/indexer_service_test.go b/internal/state/indexer/indexer_service_test.go index 2ca6d168b..20fbedd8f 100644 --- a/internal/state/indexer/indexer_service_test.go +++ b/internal/state/indexer/indexer_service_test.go @@ -112,6 +112,165 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { assert.Nil(t, teardown(t, pool)) } +func TestTxIndexDuplicatedTx(t *testing.T) { + var mockTx = types.Tx("MOCK_TX_HASH") + + testCases := []struct { + name string + tx1 abci.TxResult + tx2 abci.TxResult + expSkip bool // do we expect the second tx to be skipped by tx indexer + }{ + {"skip, previously successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + {"not skip, previously unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + {"not skip, both successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + false, + }, + {"not skip, both unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + {"skip, same block, previously successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + {"not skip, same block, previously unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sink := kv.NewEventSink(dbm.NewMemDB()) + + if tc.tx1.Height != tc.tx2.Height { + // index the first tx + err := sink.IndexTxEvents([]*abci.TxResult{&tc.tx1}) + require.NoError(t, err) + + // check if the second one should be skipped. + ops, err := indexer.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, sink) + require.NoError(t, err) + + if tc.expSkip { + require.Empty(t, ops) + } else { + require.Equal(t, []*abci.TxResult{&tc.tx2}, ops) + } + } else { + // same block + ops := []*abci.TxResult{&tc.tx1, &tc.tx2} + ops, err := indexer.DeduplicateBatch(ops, sink) + require.NoError(t, err) + if tc.expSkip { + // the second one is skipped + require.Equal(t, []*abci.TxResult{&tc.tx1}, ops) + } else { + require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops) + } + } + }) + } +} + func readSchema() ([]*schema.Migration, error) { filename := "./sink/psql/schema.sql" contents, err := ioutil.ReadFile(filename)