Work around indexing problem for duplicate transactions (forward port: #8625) (#8945)

Port the bug fix terra-money#76 to upstream. This is critical for ethermint json-rpc to work.

fix: prevent duplicate tx index if it succeeded before
fix: use CodeTypeOk instead of 0
fix: handle duplicate txs within the same block
Co-authored-by: jess jesse@soob.co

ref: #5281

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
This commit is contained in:
yihuang
2022-07-07 02:05:48 +08:00
committed by GitHub
parent 70e7372c4a
commit be6d74e657
3 changed files with 211 additions and 1 deletions

View File

@@ -101,3 +101,4 @@ Special thanks to external contributors on this release:
- [cli] \#8276 scmigrate: ensure target key is correctly renamed. (@creachadair)
- [cli] \#8294 keymigrate: ensure block hash keys are correctly translated. (@creachadair)
- [cli] \#8352 keymigrate: ensure transaction hash keys are correctly translated. (@creachadair)
- (indexer) \#8625 Fix overriding tx index of duplicated txs.

View File

@@ -4,6 +4,7 @@ import (
"context"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/libs/log"
@@ -96,7 +97,14 @@ func (is *Service) publish(msg pubsub.Message) error {
if curr.Size() != 0 {
start := time.Now()
err := sink.IndexTxEvents(curr.Ops)
var err error
curr.Ops, err = DeduplicateBatch(curr.Ops, sink)
if err != nil {
is.logger.Error("failed to deduplicate batch", "height", is.currentBlock.height, "error", err)
}
err = sink.IndexTxEvents(curr.Ops)
if err != nil {
is.logger.Error("failed to index block txs",
"height", is.currentBlock.height, "err", err)
@@ -169,3 +177,45 @@ func IndexingEnabled(sinks []EventSink) bool {
return false
}
// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, sink EventSink) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))
// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})
for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()
if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}
// check if this tx hash is already indexed
old, err := sink.GetTxByHash(hash)
// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}
// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}
result = append(result, txResult)
}
return result, nil
}

View File

@@ -109,6 +109,165 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
assert.Nil(t, teardown(t, pool))
}
func TestTxIndexDuplicatedTx(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ExecTxResult{
Code: abci.CodeTypeOK,
},
},
false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
sink := kv.NewEventSink(dbm.NewMemDB())
if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := sink.IndexTxEvents([]*abci.TxResult{&tc.tx1})
require.NoError(t, err)
// check if the second one should be skipped.
ops, err := indexer.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, sink)
require.NoError(t, err)
if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := indexer.DeduplicateBatch(ops, sink)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}
func readSchema() ([]*schema.Migration, error) {
filename := "./sink/psql/schema.sql"
contents, err := os.ReadFile(filename)