backport: performance improvements for the event query API (#7319) (#9334)

* Performance improvements for the event query API (#7319)

Rework the implementation of event query parsing and execution to
improve performance and reduce memory usage.

Previous memory and CPU profiles of the pubsub service showed query
processing as a significant hotspot. While we don't have evidence that
this is visibly hurting users, fixing it is fairly easy and self-contained.

Updates #6439.

Typical benchmark results comparing the original implementation (PEG) with the reworked implementation (Custom):
```
TEST                        TIME/OP  BYTES/OP  ALLOCS/OP  SPEEDUP   MEM SAVING
BenchmarkParsePEG-12       51716 ns  526832    27
BenchmarkParseCustom-12     2167 ns    4616    17         23.8x     99.1%
BenchmarkMatchPEG-12        3086 ns    1097    22
BenchmarkMatchCustom-12    294.2 ns      64     3         10.5x     94.1%
```
This commit is contained in:
mmsqe
2022-09-13 16:42:14 +08:00
committed by GitHub
parent 93ead3d0e5
commit e80dd00894
32 changed files with 2316 additions and 758 deletions

View File

@@ -13,6 +13,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
@@ -91,10 +92,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
default:
}
conditions, err := q.Conditions()
if err != nil {
return nil, fmt.Errorf("failed to parse query conditions: %w", err)
}
conditions := q.Syntax()
// If there is an exact height query, return the result immediately
// (if it exists).
@@ -158,7 +156,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
continue
}
startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand))
startKey, err := orderedcode.Append(nil, c.Tag, c.Arg.Value())
if err != nil {
return nil, err
}
@@ -326,7 +324,7 @@ LOOP:
// matched.
func (idx *BlockerIndexer) match(
ctx context.Context,
c query.Condition,
c syntax.Condition,
startKeyBz []byte,
filteredHeights map[string][]byte,
firstRun bool,
@@ -341,7 +339,7 @@ func (idx *BlockerIndexer) match(
tmpHeights := make(map[string][]byte)
switch {
case c.Op == query.OpEqual:
case c.Op == syntax.TEq:
it, err := dbm.IteratePrefix(idx.store, startKeyBz)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
@@ -360,8 +358,8 @@ func (idx *BlockerIndexer) match(
return nil, err
}
case c.Op == query.OpExists:
prefix, err := orderedcode.Append(nil, c.CompositeKey)
case c.Op == syntax.TExists:
prefix, err := orderedcode.Append(nil, c.Tag)
if err != nil {
return nil, err
}
@@ -387,8 +385,8 @@ func (idx *BlockerIndexer) match(
return nil, err
}
case c.Op == query.OpContains:
prefix, err := orderedcode.Append(nil, c.CompositeKey)
case c.Op == syntax.TContains:
prefix, err := orderedcode.Append(nil, c.Tag)
if err != nil {
return nil, err
}
@@ -405,7 +403,7 @@ func (idx *BlockerIndexer) match(
continue
}
if strings.Contains(eventValue, c.Operand.(string)) {
if strings.Contains(eventValue, c.Arg.Value()) {
tmpHeights[string(it.Value())] = it.Value()
}

View File

@@ -94,39 +94,39 @@ func TestBlockIndexer(t *testing.T) {
results []int64
}{
"block.height = 100": {
q: query.MustParse("block.height = 100"),
q: query.MustCompile(`block.height = 100`),
results: []int64{},
},
"block.height = 5": {
q: query.MustParse("block.height = 5"),
q: query.MustCompile(`block.height = 5`),
results: []int64{5},
},
"begin_event.key1 = 'value1'": {
q: query.MustParse("begin_event.key1 = 'value1'"),
q: query.MustCompile(`begin_event.key1 = 'value1'`),
results: []int64{},
},
"begin_event.proposer = 'FCAA001'": {
q: query.MustParse("begin_event.proposer = 'FCAA001'"),
q: query.MustCompile(`begin_event.proposer = 'FCAA001'`),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
"end_event.foo <= 5": {
q: query.MustParse("end_event.foo <= 5"),
q: query.MustCompile(`end_event.foo <= 5`),
results: []int64{2, 4},
},
"end_event.foo >= 100": {
q: query.MustParse("end_event.foo >= 100"),
q: query.MustCompile(`end_event.foo >= 100`),
results: []int64{1},
},
"block.height > 2 AND end_event.foo <= 8": {
q: query.MustParse("block.height > 2 AND end_event.foo <= 8"),
q: query.MustCompile(`block.height > 2 AND end_event.foo <= 8`),
results: []int64{4, 6, 8},
},
"begin_event.proposer CONTAINS 'FFFFFFF'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"),
q: query.MustCompile(`begin_event.proposer CONTAINS 'FFFFFFF'`),
results: []int64{},
},
"begin_event.proposer CONTAINS 'FCAA001'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"),
q: query.MustCompile(`begin_event.proposer CONTAINS 'FCAA001'`),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}

View File

@@ -6,8 +6,7 @@ import (
"strconv"
"github.com/google/orderedcode"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
"github.com/tendermint/tendermint/types"
)
@@ -86,10 +85,10 @@ func parseValueFromEventKey(key []byte) (string, error) {
return eventValue, nil
}
func lookForHeight(conditions []query.Condition) (int64, bool) {
func lookForHeight(conditions []syntax.Condition) (int64, bool) {
for _, c := range conditions {
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
return c.Operand.(int64), true
if c.Tag == types.BlockHeightKey && c.Op == syntax.TEq {
return int64(c.Arg.Number()), true
}
}

View File

@@ -3,7 +3,7 @@ package indexer
import (
"time"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
// QueryRanges defines a mapping between a composite event key and a QueryRange.
@@ -77,32 +77,32 @@ func (qr QueryRange) UpperBoundValue() interface{} {
// LookForRanges returns a mapping of QueryRanges and the matching indexes in
// the provided query conditions.
func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int) {
func LookForRanges(conditions []syntax.Condition) (ranges QueryRanges, indexes []int) {
ranges = make(QueryRanges)
for i, c := range conditions {
if IsRangeOperation(c.Op) {
r, ok := ranges[c.CompositeKey]
r, ok := ranges[c.Tag]
if !ok {
r = QueryRange{Key: c.CompositeKey}
r = QueryRange{Key: c.Tag}
}
switch c.Op {
case query.OpGreater:
r.LowerBound = c.Operand
case syntax.TGt:
r.LowerBound = conditionArg(c)
case query.OpGreaterEqual:
case syntax.TGeq:
r.IncludeLowerBound = true
r.LowerBound = c.Operand
r.LowerBound = conditionArg(c)
case query.OpLess:
r.UpperBound = c.Operand
case syntax.TLt:
r.UpperBound = conditionArg(c)
case query.OpLessEqual:
case syntax.TLeq:
r.IncludeUpperBound = true
r.UpperBound = c.Operand
r.UpperBound = conditionArg(c)
}
ranges[c.CompositeKey] = r
ranges[c.Tag] = r
indexes = append(indexes, i)
}
}
@@ -112,12 +112,26 @@ func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []
// IsRangeOperation returns a boolean signifying if a query Operator is a range
// operation or not.
func IsRangeOperation(op query.Operator) bool {
func IsRangeOperation(op syntax.Token) bool {
switch op {
case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
case syntax.TGt, syntax.TGeq, syntax.TLt, syntax.TLeq:
return true
default:
return false
}
}
func conditionArg(c syntax.Condition) interface{} {
if c.Arg == nil {
return nil
}
switch c.Arg.Type {
case syntax.TNumber:
return int64(c.Arg.Number())
case syntax.TTime, syntax.TDate:
return c.Arg.Time()
default:
return c.Arg.Value() // string
}
}

View File

@@ -13,6 +13,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
@@ -185,10 +186,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
filteredHashes := make(map[string][]byte)
// get a list of conditions (like "tx.height > 5")
conditions, err := q.Conditions()
if err != nil {
return nil, fmt.Errorf("error during parsing conditions from query: %w", err)
}
conditions := q.Syntax()
// if there is a hash condition, return the result immediately
hash, ok, err := lookForHash(conditions)
@@ -275,10 +273,10 @@ RESULTS_LOOP:
return results, nil
}
func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) {
func lookForHash(conditions []syntax.Condition) (hash []byte, ok bool, err error) {
for _, c := range conditions {
if c.CompositeKey == types.TxHashKey {
decoded, err := hex.DecodeString(c.Operand.(string))
if c.Tag == types.TxHashKey {
decoded, err := hex.DecodeString(c.Arg.Value())
return decoded, true, err
}
}
@@ -286,10 +284,10 @@ func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error)
}
// lookForHeight returns a height if there is an "height=X" condition.
func lookForHeight(conditions []query.Condition) (height int64) {
func lookForHeight(conditions []syntax.Condition) (height int64) {
for _, c := range conditions {
if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual {
return c.Operand.(int64)
if c.Tag == types.TxHeightKey && c.Op == syntax.TEq {
return int64(c.Arg.Number())
}
}
return 0
@@ -302,7 +300,7 @@ func lookForHeight(conditions []query.Condition) (height int64) {
// NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) match(
ctx context.Context,
c query.Condition,
c syntax.Condition,
startKeyBz []byte,
filteredHashes map[string][]byte,
firstRun bool,
@@ -315,8 +313,8 @@ func (txi *TxIndex) match(
tmpHashes := make(map[string][]byte)
switch c.Op {
case query.OpEqual:
switch {
case c.Op == syntax.TEq:
it, err := dbm.IteratePrefix(txi.store, startKeyBz)
if err != nil {
panic(err)
@@ -338,10 +336,10 @@ func (txi *TxIndex) match(
panic(err)
}
case query.OpExists:
case c.Op == syntax.TExists:
// XXX: can't use startKeyBz here because c.Operand is nil
// (e.g. "account.owner/<nil>/" won't match w/ a single row)
it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey))
it, err := dbm.IteratePrefix(txi.store, startKey(c.Tag))
if err != nil {
panic(err)
}
@@ -362,11 +360,11 @@ func (txi *TxIndex) match(
panic(err)
}
case query.OpContains:
case c.Op == syntax.TContains:
// XXX: startKey does not apply here.
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey))
it, err := dbm.IteratePrefix(txi.store, startKey(c.Tag))
if err != nil {
panic(err)
}
@@ -377,8 +375,7 @@ func (txi *TxIndex) match(
if !isTagKey(it.Key()) {
continue
}
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
if strings.Contains(extractValueFromKey(it.Key()), c.Arg.Value()) {
tmpHashes[string(it.Value())] = it.Value()
}
@@ -557,11 +554,11 @@ func keyForHeight(result *abci.TxResult) []byte {
))
}
func startKeyForCondition(c query.Condition, height int64) []byte {
func startKeyForCondition(c syntax.Condition, height int64) []byte {
if height > 0 {
return startKey(c.CompositeKey, c.Operand, height)
return startKey(c.Tag, c.Arg.Value(), height)
}
return startKey(c.CompositeKey, c.Operand)
return startKey(c.Tag, c.Arg.Value())
}
func startKey(fields ...interface{}) []byte {

View File

@@ -60,7 +60,7 @@ func BenchmarkTxSearch(b *testing.B) {
}
}
txQuery := query.MustParse("transfer.address = 'address_43' AND transfer.amount = 50")
txQuery := query.MustCompile(`transfer.address = 'address_43' AND transfer.amount = 50`)
b.ResetTimer()

View File

@@ -126,7 +126,7 @@ func TestTxSearch(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(ctx, query.MustParse(tc.q))
results, err := indexer.Search(ctx, query.MustCompile(tc.q))
assert.NoError(t, err)
assert.Len(t, results, tc.resultsLength)
@@ -152,7 +152,7 @@ func TestTxSearchWithCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
results, err := indexer.Search(ctx, query.MustParse("account.number = 1"))
results, err := indexer.Search(ctx, query.MustCompile(`account.number = 1`))
assert.NoError(t, err)
assert.Empty(t, results)
}
@@ -225,7 +225,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(ctx, query.MustParse(tc.q))
results, err := indexer.Search(ctx, query.MustCompile(tc.q))
require.NoError(t, err)
for _, txr := range results {
for _, tr := range tc.results {
@@ -249,7 +249,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
ctx := context.Background()
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
results, err := indexer.Search(ctx, query.MustCompile(`account.number >= 1`))
assert.NoError(t, err)
assert.Len(t, results, 1)
@@ -306,7 +306,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
ctx := context.Background()
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
results, err := indexer.Search(ctx, query.MustCompile(`account.number >= 1`))
assert.NoError(t, err)
require.Len(t, results, 3)