From be6d74e6579bad8709def9716268f12f460df93d Mon Sep 17 00:00:00 2001 From: yihuang Date: Thu, 7 Jul 2022 02:05:48 +0800 Subject: [PATCH] Work around indexing problem for duplicate transactions (forward port: #8625) (#8945) Port the bug fix terra-money#76 to upstream. This is critical for ethermint json-rpc to work. fix: prevent duplicate tx index if it succeeded before fix: use CodeTypeOk instead of 0 fix: handle duplicate txs within the same block Co-authored-by: jess jesse@soob.co ref: #5281 Co-authored-by: M. J. Fromberger --- CHANGELOG_PENDING.md | 1 + internal/state/indexer/indexer_service.go | 52 +++++- .../state/indexer/indexer_service_test.go | 159 ++++++++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 13eeaef16..9936e1309 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -101,3 +101,4 @@ Special thanks to external contributors on this release: - [cli] \#8276 scmigrate: ensure target key is correctly renamed. (@creachadair) - [cli] \#8294 keymigrate: ensure block hash keys are correctly translated. (@creachadair) - [cli] \#8352 keymigrate: ensure transaction hash keys are correctly translated. (@creachadair) +- (indexer) \#8625 Fix overriding tx index of duplicated txs. diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index e73e4a3ba..d6db82806 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -4,6 +4,7 @@ import ( "context" "time" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/libs/log" @@ -96,7 +97,14 @@ func (is *Service) publish(msg pubsub.Message) error { if curr.Size() != 0 { start := time.Now() - err := sink.IndexTxEvents(curr.Ops) + + var err error + curr.Ops, err = DeduplicateBatch(curr.Ops, sink) + if err != nil { + is.logger.Error("failed to deduplicate batch", "height", is.currentBlock.height, "error", err) + } + + err = sink.IndexTxEvents(curr.Ops) if err != nil { is.logger.Error("failed to index block txs", "height", is.currentBlock.height, "err", err) @@ -169,3 +177,45 @@ func IndexingEnabled(sinks []EventSink) bool { return false } + +// DeduplicateBatch consider the case of duplicate txs. +// if the current one under investigation is NOT OK, then we need to check +// whether there's a previously indexed tx. +// SKIP the current tx if the previously indexed record is found and successful. +func DeduplicateBatch(ops []*abci.TxResult, sink EventSink) ([]*abci.TxResult, error) { + result := make([]*abci.TxResult, 0, len(ops)) + + // keep track of successful txs in this block in order to suppress latter ones being indexed. + var successfulTxsInThisBlock = make(map[string]struct{}) + + for _, txResult := range ops { + hash := types.Tx(txResult.Tx).Hash() + + if txResult.Result.IsOK() { + successfulTxsInThisBlock[string(hash)] = struct{}{} + } else { + // if it already appeared in current block and was successful, skip. + if _, found := successfulTxsInThisBlock[string(hash)]; found { + continue + } + + // check if this tx hash is already indexed + old, err := sink.GetTxByHash(hash) + + // if db op errored + // Not found is not an error + if err != nil { + return nil, err + } + + // if it's already indexed in an older block and was successful, skip. + if old != nil && old.Result.Code == abci.CodeTypeOK { + continue + } + } + + result = append(result, txResult) + } + + return result, nil +} diff --git a/internal/state/indexer/indexer_service_test.go b/internal/state/indexer/indexer_service_test.go index 6126ae259..6dc1bdf50 100644 --- a/internal/state/indexer/indexer_service_test.go +++ b/internal/state/indexer/indexer_service_test.go @@ -109,6 +109,165 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { assert.Nil(t, teardown(t, pool)) } +func TestTxIndexDuplicatedTx(t *testing.T) { + var mockTx = types.Tx("MOCK_TX_HASH") + + testCases := []struct { + name string + tx1 abci.TxResult + tx2 abci.TxResult + expSkip bool // do we expect the second tx to be skipped by tx indexer + }{ + {"skip, previously successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + {"not skip, previously unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + {"not skip, both successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK, + }, + }, + false, + }, + {"not skip, both unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + {"skip, same block, previously successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + {"not skip, same block, previously unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ExecTxResult{ + Code: abci.CodeTypeOK, + }, + }, + false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sink := kv.NewEventSink(dbm.NewMemDB()) + + if tc.tx1.Height != tc.tx2.Height { + // index the first tx + err := sink.IndexTxEvents([]*abci.TxResult{&tc.tx1}) + require.NoError(t, err) + + // check if the second one should be skipped. + ops, err := indexer.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, sink) + require.NoError(t, err) + + if tc.expSkip { + require.Empty(t, ops) + } else { + require.Equal(t, []*abci.TxResult{&tc.tx2}, ops) + } + } else { + // same block + ops := []*abci.TxResult{&tc.tx1, &tc.tx2} + ops, err := indexer.DeduplicateBatch(ops, sink) + require.NoError(t, err) + if tc.expSkip { + // the second one is skipped + require.Equal(t, []*abci.TxResult{&tc.tx1}, ops) + } else { + require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops) + } + } + }) + } +} + func readSchema() ([]*schema.Migration, error) { filename := "./sink/psql/schema.sql" contents, err := os.ReadFile(filename)