mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 06:57:24 +00:00
Bugfixes in range queries and result deduplication
This commit is contained in:
@@ -160,8 +160,8 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
|
||||
// Extract ranges. If both upper and lower bounds exist, it's better to get
|
||||
// them in order as to not iterate over kvs that are not within range.
|
||||
ranges, rangeIndexes := indexer.LookForRanges(conditions)
|
||||
var heightRanges indexer.QueryRange
|
||||
ranges, rangeIndexes, heightRange := indexer.LookForRanges(conditions)
|
||||
|
||||
if len(ranges) > 0 {
|
||||
skipIndexes = append(skipIndexes, rangeIndexes...)
|
||||
|
||||
@@ -171,7 +171,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
// blocks in this height range. We remember the height range info
|
||||
// and pass it on to match() to take into account when processing events.
|
||||
if qr.Key == types.BlockHeightKey && matchEvents {
|
||||
heightRanges = qr
|
||||
|
||||
// If the query contains ranges other than the height then we need to treat the height
|
||||
// range when querying the conditions of the other range.
|
||||
// Otherwise we can just return all the blocks within the height range (as there is no
|
||||
@@ -186,7 +186,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
}
|
||||
|
||||
if !heightsInitialized {
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true, matchEvents)
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true, matchEvents, heightRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -199,7 +199,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false, matchEvents)
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false, matchEvents, heightRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -220,7 +220,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
}
|
||||
|
||||
if !heightsInitialized {
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true, matchEvents, height, heightRanges)
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true, matchEvents, height, heightRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -233,7 +233,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false, matchEvents, height, heightRanges)
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false, matchEvents, height, heightRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -242,6 +242,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
|
||||
// fetch matching heights
|
||||
results = make([]int64, 0, len(filteredHeights))
|
||||
resultMap := make(map[int64]struct{})
|
||||
for _, hBz := range filteredHeights {
|
||||
h := int64FromBytes(hBz)
|
||||
|
||||
@@ -249,7 +250,9 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
_, okHeight := resultMap[h]
|
||||
if ok && !okHeight {
|
||||
resultMap[h] = struct{}{}
|
||||
results = append(results, h)
|
||||
}
|
||||
|
||||
@@ -279,6 +282,7 @@ func (idx *BlockerIndexer) matchRange(
|
||||
filteredHeights map[string][]byte,
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
heightRange indexer.QueryRange,
|
||||
) (map[string][]byte, error) {
|
||||
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
@@ -317,6 +321,13 @@ LOOP:
|
||||
if err != nil {
|
||||
continue LOOP
|
||||
}
|
||||
|
||||
if matchEvents && qr.Key != types.BlockHeightKey && heightRange.Key != "" {
|
||||
keyHeight, err := parseHeightFromEventKey(it.Key())
|
||||
if err != nil || !checkBounds(heightRange, keyHeight) {
|
||||
continue LOOP
|
||||
}
|
||||
}
|
||||
if checkBounds(qr, v) {
|
||||
idx.setTmpHeights(tmpHeights, it, matchEvents)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// QueryRanges defines a mapping between a composite event key and a QueryRange.
|
||||
@@ -77,29 +78,48 @@ func (qr QueryRange) UpperBoundValue() interface{} {
|
||||
|
||||
// LookForRanges returns a mapping of QueryRanges and the matching indexes in
|
||||
// the provided query conditions.
|
||||
func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int) {
|
||||
func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int, heightRange QueryRange) {
|
||||
ranges = make(QueryRanges)
|
||||
for i, c := range conditions {
|
||||
heightKey := false
|
||||
if IsRangeOperation(c.Op) {
|
||||
r, ok := ranges[c.CompositeKey]
|
||||
if !ok {
|
||||
r = QueryRange{Key: c.CompositeKey}
|
||||
if c.CompositeKey == types.BlockHeightKey {
|
||||
heightRange = QueryRange{Key: c.CompositeKey}
|
||||
heightKey = true
|
||||
}
|
||||
}
|
||||
|
||||
switch c.Op {
|
||||
case query.OpGreater:
|
||||
if heightKey {
|
||||
heightRange.LowerBound = c.Operand
|
||||
}
|
||||
r.LowerBound = c.Operand
|
||||
|
||||
case query.OpGreaterEqual:
|
||||
r.IncludeLowerBound = true
|
||||
r.LowerBound = c.Operand
|
||||
if heightKey {
|
||||
heightRange.IncludeLowerBound = true
|
||||
heightRange.LowerBound = c.Operand
|
||||
}
|
||||
|
||||
case query.OpLess:
|
||||
r.UpperBound = c.Operand
|
||||
if heightKey {
|
||||
heightRange.UpperBound = c.Operand
|
||||
}
|
||||
|
||||
case query.OpLessEqual:
|
||||
r.IncludeUpperBound = true
|
||||
r.UpperBound = c.Operand
|
||||
if heightKey {
|
||||
heightRange.IncludeUpperBound = true
|
||||
heightRange.UpperBound = c.Operand
|
||||
}
|
||||
}
|
||||
|
||||
ranges[c.CompositeKey] = r
|
||||
@@ -107,7 +127,7 @@ func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []
|
||||
}
|
||||
}
|
||||
|
||||
return ranges, indexes
|
||||
return ranges, indexes, heightRange
|
||||
}
|
||||
|
||||
// IsRangeOperation returns a boolean signifying if a query Operator is a range
|
||||
|
||||
@@ -246,8 +246,8 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
// extract ranges
|
||||
// if both upper and lower bounds exist, it's better to get them in order not
|
||||
// no iterate over kvs that are not within range.
|
||||
ranges, rangeIndexes := indexer.LookForRanges(conditions)
|
||||
var heightRanges indexer.QueryRange
|
||||
ranges, rangeIndexes, heightRange := indexer.LookForRanges(conditions)
|
||||
|
||||
if len(ranges) > 0 {
|
||||
skipIndexes = append(skipIndexes, rangeIndexes...)
|
||||
|
||||
@@ -258,11 +258,11 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
// transactios in this height range. We remember the height range info
|
||||
// and pass it on to match() to take into account when processing events.
|
||||
if qr.Key == types.TxHeightKey && matchEvents {
|
||||
heightRanges = qr
|
||||
|
||||
continue
|
||||
}
|
||||
if !hashesInitialized {
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true, matchEvents)
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true, matchEvents, heightRange)
|
||||
hashesInitialized = true
|
||||
|
||||
// Ignore any remaining conditions if the first condition resulted
|
||||
@@ -271,7 +271,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false, matchEvents)
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false, matchEvents, heightRange)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -297,7 +297,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
}
|
||||
|
||||
if !hashesInitialized {
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true, matchEvents, height, heightRanges)
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true, matchEvents, height, heightRange)
|
||||
hashesInitialized = true
|
||||
|
||||
// Ignore any remaining conditions if the first condition resulted
|
||||
@@ -306,18 +306,22 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false, matchEvents, height, heightRanges)
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false, matchEvents, height, heightRange)
|
||||
}
|
||||
}
|
||||
|
||||
results := make([]*abci.TxResult, 0, len(filteredHashes))
|
||||
resultMap := make(map[string]struct{})
|
||||
for _, h := range filteredHashes {
|
||||
res, err := txi.Get(h)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get Tx{%X}: %w", h, err)
|
||||
}
|
||||
results = append(results, res)
|
||||
|
||||
hashString := string(h)
|
||||
if _, ok := resultMap[hashString]; !ok {
|
||||
resultMap[hashString] = struct{}{}
|
||||
results = append(results, res)
|
||||
}
|
||||
// Potentially exit early.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -370,7 +374,7 @@ func (txi *TxIndex) match(
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
height int64,
|
||||
heightRanges indexer.QueryRange,
|
||||
heightRange indexer.QueryRange,
|
||||
) map[string][]byte {
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
// no matches (assuming AND operand).
|
||||
@@ -392,9 +396,9 @@ func (txi *TxIndex) match(
|
||||
|
||||
// If we have a height range in a query, we need only transactions
|
||||
// for this height
|
||||
if heightRanges.Key != "" {
|
||||
if heightRange.Key != "" {
|
||||
eventHeight, err := extractHeightFromKey(it.Key())
|
||||
if err != nil || !checkBounds(heightRanges, eventHeight) {
|
||||
if err != nil || !checkBounds(heightRange, eventHeight) {
|
||||
continue
|
||||
}
|
||||
} else if height != 0 {
|
||||
@@ -516,6 +520,7 @@ func (txi *TxIndex) matchRange(
|
||||
filteredHashes map[string][]byte,
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
heightRange indexer.QueryRange,
|
||||
) map[string][]byte {
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
// no matches (assuming AND operand).
|
||||
@@ -543,6 +548,12 @@ LOOP:
|
||||
continue LOOP
|
||||
}
|
||||
|
||||
if matchEvents && heightRange.Key != "" {
|
||||
keyHeight, err := extractHeightFromKey(it.Key())
|
||||
if err != nil || !checkBounds(heightRange, keyHeight) {
|
||||
continue LOOP
|
||||
}
|
||||
}
|
||||
if checkBounds(qr, v) {
|
||||
txi.setTmpHashes(tmpHashes, it, matchEvents)
|
||||
}
|
||||
|
||||
@@ -321,7 +321,15 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
|
||||
results, err := indexer.Search(ctx, query.MustParse("match.events = 1 AND account.number >= 1"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, results, 1)
|
||||
for _, txr := range results {
|
||||
assert.True(t, proto.Equal(txResult, txr))
|
||||
}
|
||||
|
||||
results, err = indexer.Search(ctx, query.MustParse("match.events = 1 AND account.number > 1 AND tx.height < 2"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, results, 1)
|
||||
|
||||
Reference in New Issue
Block a user