mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 21:36:26 +00:00
state/kvindexer: associate event attributes with events (#9759)
* Updated event sequencing and added query keyword * code cosmetics * Documentation update * Added per event indexing and querying to txindexer * rpc test fix * Added support for older versions where event sequencing is not supported * Added support for old versions to tx indexer * Added RPC match flag, fixed bugs in tx indexer, added tests * Removed reference to match.events from the docs * Openapi update * Added height deduplication Co-authored-by: Thane Thomson <connect@thanethomson.com> Co-authored-by: Anca Zamfir <zamfiranca@gmail.com> Co-authored-by: Sergio Mena <sergio@informal.systems> Co-authored-by: Romain Ruetschi <romain.ruetschi@gmail.com> Co-authored-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
@@ -39,9 +39,11 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
### FEATURES
|
||||
|
||||
- [abci] \#9301 New ABCI methods `PrepareProposal` and `ProcessProposal` which give the app control over transactions proposed and allows for verification of proposed blocks.
|
||||
- [rpc] \#9759 Added `match_event` query parameter to indicate to Tendermint that the query should match event attributes within events, not only within a height.(@jmalicevic)
|
||||
|
||||
### IMPROVEMENTS
|
||||
|
||||
- [state/kvindexer] \#9759 Added `match.event` keyword to support condition evalution based on the event attributes belong to. (@jmalicevic)
|
||||
- [crypto] \#9250 Update to use btcec v2 and the latest btcutil. (@wcsiu)
|
||||
- [proto] \#9356 Migrate from `gogo/protobuf` to `cosmos/gogoproto` (@julienrbrt)
|
||||
- [rpc] \#9276 Added `header` and `header_by_hash` queries to the RPC client (@samricotta)
|
||||
|
||||
@@ -69,6 +69,9 @@ type Application struct {
|
||||
state State
|
||||
RetainBlocks int64 // blocks to retain after commit (via ResponseCommit.RetainHeight)
|
||||
txToRemove map[string]struct{}
|
||||
// If true, the app will generate block events in BeginBlock. Used to test the event indexer
|
||||
// Should be false by default to avoid generating too much data.
|
||||
genBlockEvents bool
|
||||
}
|
||||
|
||||
func NewApplication() *Application {
|
||||
@@ -76,6 +79,10 @@ func NewApplication() *Application {
|
||||
return &Application{state: state}
|
||||
}
|
||||
|
||||
func (app *Application) SetGenBlockEvents() {
|
||||
app.genBlockEvents = true
|
||||
}
|
||||
|
||||
func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) {
|
||||
return types.ResponseInfo{
|
||||
Data: fmt.Sprintf("{\"size\":%v}", app.state.Size),
|
||||
@@ -116,6 +123,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
|
||||
{Key: "noindex_key", Value: "index is working", Index: false},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "app",
|
||||
Attributes: []types.EventAttribute{
|
||||
{Key: "creator", Value: "Cosmoshi", Index: true},
|
||||
{Key: "key", Value: value, Index: true},
|
||||
{Key: "index_key", Value: "index is working", Index: true},
|
||||
{Key: "noindex_key", Value: "index is working", Index: false},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
|
||||
@@ -189,7 +205,70 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo
|
||||
|
||||
func (app *Application) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
|
||||
app.txToRemove = map[string]struct{}{}
|
||||
return types.ResponseBeginBlock{}
|
||||
response := types.ResponseBeginBlock{}
|
||||
|
||||
if !app.genBlockEvents {
|
||||
return response
|
||||
}
|
||||
|
||||
if app.state.Height%2 == 0 {
|
||||
response = types.ResponseBeginBlock{
|
||||
Events: []types.Event{
|
||||
{
|
||||
Type: "begin_event",
|
||||
Attributes: []types.EventAttribute{
|
||||
{
|
||||
Key: "foo",
|
||||
Value: "100",
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "200",
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "begin_event",
|
||||
Attributes: []types.EventAttribute{
|
||||
{
|
||||
Key: "foo",
|
||||
Value: "200",
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "300",
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
response = types.ResponseBeginBlock{
|
||||
Events: []types.Event{
|
||||
{
|
||||
Type: "begin_event",
|
||||
Attributes: []types.EventAttribute{
|
||||
{
|
||||
Key: "foo",
|
||||
Value: "400",
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: "bar",
|
||||
Value: "300",
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return response
|
||||
}
|
||||
|
||||
func (app *Application) ProcessProposal(
|
||||
@@ -200,4 +279,5 @@ func (app *Application) ProcessProposal(
|
||||
}
|
||||
}
|
||||
return types.ResponseProcessProposal{Status: types.ResponseProcessProposal_ACCEPT}
|
||||
|
||||
}
|
||||
|
||||
@@ -54,6 +54,9 @@ func NewPersistentKVStoreApplication(dbDir string) *PersistentKVStoreApplication
|
||||
}
|
||||
}
|
||||
|
||||
func (app *PersistentKVStoreApplication) SetGenBlockEvents() {
|
||||
app.app.genBlockEvents = true
|
||||
}
|
||||
func (app *PersistentKVStoreApplication) SetLogger(l log.Logger) {
|
||||
app.logger = l
|
||||
}
|
||||
|
||||
@@ -34,6 +34,9 @@ would be equal to the composite key of `jack.account.number`.
|
||||
By default, Tendermint will index all transactions by their respective hashes
|
||||
and height and blocks by their height.
|
||||
|
||||
Tendermint allows for different events within the same height to have
|
||||
equal attributes.
|
||||
|
||||
## Configuration
|
||||
|
||||
Operators can configure indexing via the `[tx_index]` section. The `indexer`
|
||||
@@ -67,6 +70,56 @@ for block and transaction events directly against Tendermint's RPC. However, the
|
||||
query syntax is limited and so this indexer type might be deprecated or removed
|
||||
entirely in the future.
|
||||
|
||||
**Implementation and data layout**
|
||||
|
||||
The kv indexer stores each attribute of an event individually, by creating a composite key
|
||||
of the *event type*, *attribute key*, *attribute value*, *height* and *event sequence*.
|
||||
|
||||
For example the following events:
|
||||
|
||||
```
|
||||
Type: "transfer",
|
||||
Attributes: []abci.EventAttribute{
|
||||
{Key: []byte("sender"), Value: []byte("Bob"), Index: true},
|
||||
{Key: []byte("recipient"), Value: []byte("Alice"), Index: true},
|
||||
{Key: []byte("balance"), Value: []byte("100"), Index: true},
|
||||
{Key: []byte("note"), Value: []byte("nothing"), Index: true},
|
||||
},
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
Type: "transfer",
|
||||
Attributes: []abci.EventAttribute{
|
||||
{Key: []byte("sender"), Value: []byte("Tom"), Index: true},
|
||||
{Key: []byte("recipient"), Value: []byte("Alice"), Index: true},
|
||||
{Key: []byte("balance"), Value: []byte("200"), Index: true},
|
||||
{Key: []byte("note"), Value: []byte("nothing"), Index: true},
|
||||
},
|
||||
```
|
||||
|
||||
will be represented as follows in the store:
|
||||
|
||||
```
|
||||
Key value
|
||||
transferSenderBobEndBlock1 1
|
||||
transferRecepientAliceEndBlock11 1
|
||||
transferBalance100EndBlock11 1
|
||||
transferNodeNothingEndblock11 1
|
||||
---- event2 ------
|
||||
transferSenderTomEndBlock12 1
|
||||
transferRecepientAliceEndBlock12 1
|
||||
transferBalance200EndBlock12 1
|
||||
transferNodeNothingEndblock12 1
|
||||
|
||||
```
|
||||
The key is thus formed of the event type, the attribute key and value, the event the attribute belongs to (`EndBlock` or `BeginBlock`),
|
||||
the height and the event number. The event number is a local variable kept by the indexer and incremented when a new event is processed.
|
||||
|
||||
It is an `int64` variable and has no other semantics besides being used to associate attributes belonging to the same events within a height.
|
||||
This variable is not atomically incremented as event indexing is deterministic. **Should this ever change**, the event id generation
|
||||
will be broken.
|
||||
|
||||
#### PostgreSQL
|
||||
|
||||
The `psql` indexer type allows an operator to enable block and transaction event
|
||||
@@ -145,6 +198,9 @@ You can query for a paginated set of transaction by their events by calling the
|
||||
```bash
|
||||
curl "localhost:26657/tx_search?query=\"message.sender='cosmos1...'\"&prove=true"
|
||||
```
|
||||
If the conditions are related to transaction events and the user wants to make sure the
|
||||
conditions are true within the same events, the `match.event` keyword should be used,
|
||||
as described [below](#querying_block_events)
|
||||
|
||||
Check out [API docs](https://docs.tendermint.com/v0.37/rpc/#/Info/tx_search)
|
||||
for more information on query syntax and other options.
|
||||
@@ -168,7 +224,7 @@ a query to `/subscribe` RPC endpoint.
|
||||
Check out [API docs](https://docs.tendermint.com/v0.37/rpc/#subscribe) for more information
|
||||
on query syntax and other options.
|
||||
|
||||
## Querying Blocks Events
|
||||
## Querying Block Events
|
||||
|
||||
You can query for a paginated set of blocks by their events by calling the
|
||||
`/block_search` RPC endpoint:
|
||||
@@ -177,5 +233,30 @@ You can query for a paginated set of blocks by their events by calling the
|
||||
curl "localhost:26657/block_search?query=\"block.height > 10 AND val_set.num_changed > 0\""
|
||||
```
|
||||
|
||||
## `match_events` keyword
|
||||
|
||||
The query results in the height number(s) (or transaction hashes when querying transactions) which contain events whose attributes match the query conditions.
|
||||
However, there are two options to query the indexers. To demonstrate the two modes, we reuse the two events
|
||||
where Bob and Tom send money to Alice and query the block indexer. We issue the following query:
|
||||
|
||||
```bash
|
||||
curl "localhost:26657/block_search?query=\"sender=Bob AND balance = 200\""
|
||||
```
|
||||
|
||||
The result will return height 1 even though the attributes matching the conditions in the query
|
||||
occurred in different events.
|
||||
|
||||
If we wish to retrieve only heights where the attributes occurred within the same event,
|
||||
the query syntax is as follows:
|
||||
|
||||
```bash
|
||||
curl "localhost:26657/block_search?query=\"sender=Bob AND balance = 200\"&match_events=true"
|
||||
```
|
||||
Currently the default behaviour is if `match_events` is set to false.
|
||||
|
||||
Check out [API docs](https://docs.tendermint.com/v0.37/rpc/#/Info/block_search)
|
||||
for more information on query syntax and other options.
|
||||
|
||||
**Backwards compatibility**
|
||||
|
||||
Up until Tendermint 0.34.25, the event sequence was not stored in the kvstore and the `match_events` keyword in the RPC query is not ignored by older versions. Thus, in a network running mixed Tendermint versions, nodes running older versions will still return blocks (or transactions) whose attributes match within different events on the same height.
|
||||
@@ -31,8 +31,8 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
|
||||
"block_results": rpcserver.NewRPCFunc(makeBlockResultsFunc(c), "height"),
|
||||
"commit": rpcserver.NewRPCFunc(makeCommitFunc(c), "height"),
|
||||
"tx": rpcserver.NewRPCFunc(makeTxFunc(c), "hash,prove"),
|
||||
"tx_search": rpcserver.NewRPCFunc(makeTxSearchFunc(c), "query,prove,page,per_page,order_by"),
|
||||
"block_search": rpcserver.NewRPCFunc(makeBlockSearchFunc(c), "query,page,per_page,order_by"),
|
||||
"tx_search": rpcserver.NewRPCFunc(makeTxSearchFuncMatchEvents(c), "query,prove,page,per_page,order_by,match_events"),
|
||||
"block_search": rpcserver.NewRPCFunc(makeBlockSearchFuncMatchEvents(c), "query,page,per_page,order_by,match_events"),
|
||||
"validators": rpcserver.NewRPCFunc(makeValidatorsFunc(c), "height,page,per_page"),
|
||||
"dump_consensus_state": rpcserver.NewRPCFunc(makeDumpConsensusStateFunc(c), ""),
|
||||
"consensus_state": rpcserver.NewRPCFunc(makeConsensusStateFunc(c), ""),
|
||||
@@ -158,42 +158,52 @@ func makeTxFunc(c *lrpc.Client) rpcTxFunc {
|
||||
}
|
||||
}
|
||||
|
||||
type rpcTxSearchFunc func(
|
||||
type rpcTxSearchFuncMatchEvents func(
|
||||
ctx *rpctypes.Context,
|
||||
query string,
|
||||
prove bool,
|
||||
page, perPage *int,
|
||||
orderBy string,
|
||||
matchEvents bool,
|
||||
) (*ctypes.ResultTxSearch, error)
|
||||
|
||||
func makeTxSearchFunc(c *lrpc.Client) rpcTxSearchFunc {
|
||||
func makeTxSearchFuncMatchEvents(c *lrpc.Client) rpcTxSearchFuncMatchEvents {
|
||||
return func(
|
||||
ctx *rpctypes.Context,
|
||||
query string,
|
||||
prove bool,
|
||||
page, perPage *int,
|
||||
orderBy string,
|
||||
matchEvents bool,
|
||||
) (*ctypes.ResultTxSearch, error) {
|
||||
if matchEvents {
|
||||
query = query + " AND match.events = 1"
|
||||
}
|
||||
return c.TxSearch(ctx.Context(), query, prove, page, perPage, orderBy)
|
||||
}
|
||||
}
|
||||
|
||||
type rpcBlockSearchFunc func(
|
||||
type rpcBlockSearchFuncMatchEvents func(
|
||||
ctx *rpctypes.Context,
|
||||
query string,
|
||||
prove bool,
|
||||
page, perPage *int,
|
||||
orderBy string,
|
||||
matchEvents bool,
|
||||
) (*ctypes.ResultBlockSearch, error)
|
||||
|
||||
func makeBlockSearchFunc(c *lrpc.Client) rpcBlockSearchFunc {
|
||||
func makeBlockSearchFuncMatchEvents(c *lrpc.Client) rpcBlockSearchFuncMatchEvents {
|
||||
return func(
|
||||
ctx *rpctypes.Context,
|
||||
query string,
|
||||
prove bool,
|
||||
page, perPage *int,
|
||||
orderBy string,
|
||||
matchEvents bool,
|
||||
) (*ctypes.ResultBlockSearch, error) {
|
||||
if matchEvents {
|
||||
query = query + " AND match.events = 1"
|
||||
}
|
||||
return c.BlockSearch(ctx.Context(), query, page, perPage, orderBy)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
app := kvstore.NewPersistentKVStoreApplication(dir)
|
||||
// If testing block event generation
|
||||
// app.SetGenBlockEvents() needs to be called here
|
||||
node = rpctest.StartTendermint(app)
|
||||
|
||||
code := m.Run()
|
||||
|
||||
@@ -516,6 +516,27 @@ func TestTxSearchWithTimeout(t *testing.T) {
|
||||
require.Greater(t, len(result.Txs), 0, "expected a lot of transactions")
|
||||
}
|
||||
|
||||
// This test does nothing if we do not call app.SetGenBlockEvents() within main_test.go
|
||||
// It will nevertheless pass as there are no events being generated.
|
||||
func TestBlockSearch(t *testing.T) {
|
||||
c := getHTTPClient()
|
||||
|
||||
// first we broadcast a few txs
|
||||
for i := 0; i < 10; i++ {
|
||||
_, _, tx := MakeTxKV()
|
||||
|
||||
_, err := c.BroadcastTxCommit(context.Background(), tx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, client.WaitForHeight(c, 5, nil))
|
||||
// This cannot test match_events as it calls the client BlockSearch function directly
|
||||
// It is the RPC request handler that processes the match_event
|
||||
result, err := c.BlockSearch(context.Background(), "begin_event.foo = 100 AND begin_event.bar = 300", nil, nil, "asc")
|
||||
require.NoError(t, err)
|
||||
blockCount := len(result.Blocks)
|
||||
require.Equal(t, blockCount, 0)
|
||||
|
||||
}
|
||||
func TestTxSearch(t *testing.T) {
|
||||
c := getHTTPClient()
|
||||
|
||||
@@ -536,8 +557,7 @@ func TestTxSearch(t *testing.T) {
|
||||
find := result.Txs[len(result.Txs)-1]
|
||||
anotherTxHash := types.Tx("a different tx").Hash()
|
||||
|
||||
for i, c := range GetClients() {
|
||||
t.Logf("client %d", i)
|
||||
for _, c := range GetClients() {
|
||||
|
||||
// now we query for the tx.
|
||||
result, err := c.TxSearch(context.Background(), fmt.Sprintf("tx.hash='%v'", find.Hash), true, nil, nil, "asc")
|
||||
@@ -616,16 +636,17 @@ func TestTxSearch(t *testing.T) {
|
||||
pages = int(math.Ceil(float64(txCount) / float64(perPage)))
|
||||
)
|
||||
|
||||
totalTx := 0
|
||||
for page := 1; page <= pages; page++ {
|
||||
page := page
|
||||
result, err := c.TxSearch(context.Background(), "tx.height >= 1", false, &page, &perPage, "asc")
|
||||
result, err := c.TxSearch(context.Background(), "tx.height >= 1", true, &page, &perPage, "asc")
|
||||
require.NoError(t, err)
|
||||
if page < pages {
|
||||
require.Len(t, result.Txs, perPage)
|
||||
} else {
|
||||
require.LessOrEqual(t, len(result.Txs), perPage)
|
||||
}
|
||||
require.Equal(t, txCount, result.TotalCount)
|
||||
totalTx = totalTx + len(result.Txs)
|
||||
for _, tx := range result.Txs {
|
||||
require.False(t, seen[tx.Height],
|
||||
"Found duplicate height %v in page %v", tx.Height, page)
|
||||
@@ -635,6 +656,7 @@ func TestTxSearch(t *testing.T) {
|
||||
maxHeight = tx.Height
|
||||
}
|
||||
}
|
||||
require.Equal(t, txCount, totalTx)
|
||||
require.Len(t, seen, txCount)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,6 +193,19 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
|
||||
}, nil
|
||||
}
|
||||
|
||||
func BlockSearchMatchEvents(
|
||||
ctx *rpctypes.Context,
|
||||
query string,
|
||||
pagePtr, perPagePtr *int,
|
||||
orderBy string,
|
||||
matchEvents bool,
|
||||
) (*ctypes.ResultBlockSearch, error) {
|
||||
if matchEvents {
|
||||
query = "match.events = 1 AND " + query
|
||||
}
|
||||
return BlockSearch(ctx, query, pagePtr, perPagePtr, orderBy)
|
||||
}
|
||||
|
||||
// BlockSearch searches for a paginated set of blocks matching BeginBlock and
|
||||
// EndBlock event search criteria.
|
||||
func BlockSearch(
|
||||
@@ -205,7 +218,6 @@ func BlockSearch(
|
||||
if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
|
||||
return nil, errors.New("block indexing is disabled")
|
||||
}
|
||||
|
||||
q, err := tmquery.New(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -28,8 +28,8 @@ var Routes = map[string]*rpc.RPCFunc{
|
||||
"header_by_hash": rpc.NewRPCFunc(HeaderByHash, "hash"),
|
||||
"check_tx": rpc.NewRPCFunc(CheckTx, "tx"),
|
||||
"tx": rpc.NewRPCFunc(Tx, "hash,prove"),
|
||||
"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"),
|
||||
"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"),
|
||||
"tx_search": rpc.NewRPCFunc(TxSearchMatchEvents, "query,prove,page,per_page,order_by,match_events"),
|
||||
"block_search": rpc.NewRPCFunc(BlockSearchMatchEvents, "query,page,per_page,order_by,match_events"),
|
||||
"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"),
|
||||
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
|
||||
"consensus_state": rpc.NewRPCFunc(ConsensusState, ""),
|
||||
|
||||
@@ -129,3 +129,23 @@ func TxSearch(
|
||||
|
||||
return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil
|
||||
}
|
||||
|
||||
// TxSearchMatchEvents allows you to query for multiple transactions results and match the
|
||||
// query attributes to a common event. It returns a
|
||||
// list of transactions (maximum ?per_page entries) and the total count.
|
||||
// More: https://docs.tendermint.com/v0.34/rpc/#/Info/tx_search
|
||||
func TxSearchMatchEvents(
|
||||
ctx *rpctypes.Context,
|
||||
query string,
|
||||
prove bool,
|
||||
pagePtr, perPagePtr *int,
|
||||
orderBy string,
|
||||
matchEvents bool,
|
||||
) (*ctypes.ResultTxSearch, error) {
|
||||
|
||||
if matchEvents {
|
||||
query = "match.events = 1 AND " + query
|
||||
}
|
||||
return TxSearch(ctx, query, prove, pagePtr, perPagePtr, orderBy)
|
||||
|
||||
}
|
||||
|
||||
@@ -1092,6 +1092,14 @@ paths:
|
||||
type: string
|
||||
default: "asc"
|
||||
example: "asc"
|
||||
- in: query
|
||||
name: match_events
|
||||
description: Match attributes in query within events, in addition to the height & txhash
|
||||
required: false
|
||||
schema:
|
||||
type: boolean
|
||||
default: false
|
||||
example: true
|
||||
tags:
|
||||
- Info
|
||||
responses:
|
||||
@@ -1147,6 +1155,14 @@ paths:
|
||||
type: string
|
||||
default: "desc"
|
||||
example: "asc"
|
||||
- in: query
|
||||
name: match_events
|
||||
description: Match attributes in query within events, in addition to the height
|
||||
required: false
|
||||
schema:
|
||||
type: boolean
|
||||
default: false
|
||||
example: true
|
||||
tags:
|
||||
- Info
|
||||
responses:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -24,6 +25,10 @@ var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
|
||||
// such that matching search criteria returns the respective block height(s).
|
||||
type BlockerIndexer struct {
|
||||
store dbm.DB
|
||||
|
||||
// Add unique event identifier to use when querying
|
||||
// Matching will be done both on height AND eventSeq
|
||||
eventSeq int64
|
||||
}
|
||||
|
||||
func New(store dbm.DB) *BlockerIndexer {
|
||||
@@ -95,11 +100,46 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse query conditions: %w", err)
|
||||
}
|
||||
// conditions to skip because they're handled before "everything else"
|
||||
skipIndexes := make([]int, 0)
|
||||
|
||||
var matchEvents bool
|
||||
var matchEventIdx int
|
||||
|
||||
// If the match.events keyword is at the beginning of the query, we will only
|
||||
// return heights where the conditions are true within the same event
|
||||
// and set the matchEvents to true
|
||||
conditions, matchEvents = dedupMatchEvents(conditions)
|
||||
|
||||
if matchEvents {
|
||||
matchEventIdx = 0
|
||||
} else {
|
||||
matchEventIdx = -1
|
||||
}
|
||||
|
||||
if matchEventIdx != -1 {
|
||||
skipIndexes = append(skipIndexes, matchEventIdx)
|
||||
}
|
||||
// If there is an exact height query, return the result immediately
|
||||
// (if it exists).
|
||||
height, ok := lookForHeight(conditions)
|
||||
if ok {
|
||||
var height int64
|
||||
var ok bool
|
||||
var heightIdx int
|
||||
if matchEvents {
|
||||
// If we are not matching events and block.height = 3 occurs more than once, the later value will
|
||||
// overwrite the first one. For match.events it will create problems.
|
||||
conditions, height, ok, heightIdx = dedupHeight(conditions)
|
||||
} else {
|
||||
height, ok, heightIdx = lookForHeight(conditions)
|
||||
}
|
||||
|
||||
// If we have additional constraints and want to query per event
|
||||
// attributes, we cannot simply return all blocks for a height.
|
||||
// But we remember the height we want to find and forward it to
|
||||
// match(). If we only have the height constraint and match.events keyword
|
||||
// in the query (the second part of the ||), we don't need to query
|
||||
// per event conditions and return all events within the height range.
|
||||
if ok && (!matchEvents || (matchEvents && len(conditions) == 2)) {
|
||||
ok, err := idx.Has(height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -114,24 +154,39 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
|
||||
var heightsInitialized bool
|
||||
filteredHeights := make(map[string][]byte)
|
||||
|
||||
// conditions to skip because they're handled before "everything else"
|
||||
skipIndexes := make([]int, 0)
|
||||
if matchEvents && heightIdx != -1 {
|
||||
skipIndexes = append(skipIndexes, heightIdx)
|
||||
}
|
||||
|
||||
// Extract ranges. If both upper and lower bounds exist, it's better to get
|
||||
// them in order as to not iterate over kvs that are not within range.
|
||||
ranges, rangeIndexes := indexer.LookForRanges(conditions)
|
||||
var heightRanges indexer.QueryRange
|
||||
if len(ranges) > 0 {
|
||||
skipIndexes = append(skipIndexes, rangeIndexes...)
|
||||
|
||||
for _, qr := range ranges {
|
||||
// If we have a query range over height and want to still look for
|
||||
// specific event values we do not want to simply return all
|
||||
// blocks in this height range. We remember the height range info
|
||||
// and pass it on to match() to take into account when processing events.
|
||||
if qr.Key == types.BlockHeightKey && matchEvents {
|
||||
heightRanges = qr
|
||||
// If the query contains ranges other than the height then we need to treat the height
|
||||
// range when querying the conditions of the other range.
|
||||
// Otherwise we can just return all the blocks within the height range (as there is no
|
||||
// additional constraint on events)
|
||||
if len(ranges)+1 != 2 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
prefix, err := orderedcode.Append(nil, qr.Key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create prefix key: %w", err)
|
||||
}
|
||||
|
||||
if !heightsInitialized {
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true)
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true, matchEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -144,7 +199,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false)
|
||||
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false, matchEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -159,12 +214,13 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
}
|
||||
|
||||
startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !heightsInitialized {
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true)
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true, matchEvents, height, heightRanges)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -177,7 +233,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false)
|
||||
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false, matchEvents, height, heightRanges)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -222,6 +278,7 @@ func (idx *BlockerIndexer) matchRange(
|
||||
startKey []byte,
|
||||
filteredHeights map[string][]byte,
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
) (map[string][]byte, error) {
|
||||
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
@@ -231,8 +288,6 @@ func (idx *BlockerIndexer) matchRange(
|
||||
}
|
||||
|
||||
tmpHeights := make(map[string][]byte)
|
||||
lowerBound := qr.LowerBoundValue()
|
||||
upperBound := qr.UpperBoundValue()
|
||||
|
||||
it, err := dbm.IteratePrefix(idx.store, startKey)
|
||||
if err != nil {
|
||||
@@ -262,18 +317,8 @@ LOOP:
|
||||
if err != nil {
|
||||
continue LOOP
|
||||
}
|
||||
|
||||
include := true
|
||||
if lowerBound != nil && v < lowerBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
if upperBound != nil && v > upperBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
if include {
|
||||
tmpHeights[string(it.Value())] = it.Value()
|
||||
if checkBounds(qr, v) {
|
||||
idx.setTmpHeights(tmpHeights, it, matchEvents)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,8 +347,12 @@ LOOP:
|
||||
|
||||
// Remove/reduce matches in filteredHashes that were not found in this
|
||||
// match (tmpHashes).
|
||||
for k := range filteredHeights {
|
||||
if tmpHeights[k] == nil {
|
||||
for k, v := range filteredHeights {
|
||||
tmpHeight := tmpHeights[k]
|
||||
|
||||
// Check whether in this iteration we have not found an overlapping height (tmpHeight == nil)
|
||||
// or whether the events in which the attributed occurred do not match (first part of the condition)
|
||||
if tmpHeight == nil || !bytes.Equal(tmpHeight, v) {
|
||||
delete(filteredHeights, k)
|
||||
|
||||
select {
|
||||
@@ -318,6 +367,33 @@ LOOP:
|
||||
return filteredHeights, nil
|
||||
}
|
||||
|
||||
func (idx *BlockerIndexer) setTmpHeights(tmpHeights map[string][]byte, it dbm.Iterator, matchEvents bool) {
|
||||
// If we return attributes that occur within the same events, then store the event sequence in the
|
||||
// result map as well
|
||||
if matchEvents {
|
||||
eventSeq, _ := parseEventSeqFromEventKey(it.Key())
|
||||
retVal := it.Value()
|
||||
tmpHeights[string(retVal)+strconv.FormatInt(eventSeq, 10)] = it.Value()
|
||||
} else {
|
||||
tmpHeights[string(it.Value())] = it.Value()
|
||||
}
|
||||
}
|
||||
|
||||
func checkBounds(ranges indexer.QueryRange, v int64) bool {
|
||||
include := true
|
||||
lowerBound := ranges.LowerBoundValue()
|
||||
upperBound := ranges.UpperBoundValue()
|
||||
if lowerBound != nil && v < lowerBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
if upperBound != nil && v > upperBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
return include
|
||||
}
|
||||
|
||||
// match returns all matching heights that meet a given query condition and start
|
||||
// key. An already filtered result (filteredHeights) is provided such that any
|
||||
// non-intersecting matches are removed.
|
||||
@@ -330,6 +406,9 @@ func (idx *BlockerIndexer) match(
|
||||
startKeyBz []byte,
|
||||
filteredHeights map[string][]byte,
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
height int64,
|
||||
heightRanges indexer.QueryRange,
|
||||
) (map[string][]byte, error) {
|
||||
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
@@ -349,7 +428,23 @@ func (idx *BlockerIndexer) match(
|
||||
defer it.Close()
|
||||
|
||||
for ; it.Valid(); it.Next() {
|
||||
tmpHeights[string(it.Value())] = it.Value()
|
||||
if matchEvents {
|
||||
|
||||
if heightRanges.Key != "" {
|
||||
eventHeight, err := parseHeightFromEventKey(it.Key())
|
||||
if err != nil || !checkBounds(heightRanges, eventHeight) {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if height != 0 {
|
||||
eventHeight, _ := parseHeightFromEventKey(it.Key())
|
||||
if eventHeight != height {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
idx.setTmpHeights(tmpHeights, it, matchEvents)
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
break
|
||||
@@ -373,7 +468,7 @@ func (idx *BlockerIndexer) match(
|
||||
defer it.Close()
|
||||
|
||||
for ; it.Valid(); it.Next() {
|
||||
tmpHeights[string(it.Value())] = it.Value()
|
||||
idx.setTmpHeights(tmpHeights, it, matchEvents)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -406,7 +501,7 @@ func (idx *BlockerIndexer) match(
|
||||
}
|
||||
|
||||
if strings.Contains(eventValue, c.Operand.(string)) {
|
||||
tmpHeights[string(it.Value())] = it.Value()
|
||||
idx.setTmpHeights(tmpHeights, it, matchEvents)
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -437,8 +532,9 @@ func (idx *BlockerIndexer) match(
|
||||
|
||||
// Remove/reduce matches in filteredHeights that were not found in this
|
||||
// match (tmpHeights).
|
||||
for k := range filteredHeights {
|
||||
if tmpHeights[k] == nil {
|
||||
for k, v := range filteredHeights {
|
||||
tmpHeight := tmpHeights[k]
|
||||
if tmpHeight == nil || !bytes.Equal(tmpHeight, v) {
|
||||
delete(filteredHeights, k)
|
||||
|
||||
select {
|
||||
@@ -457,6 +553,7 @@ func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ
|
||||
heightBz := int64ToBytes(height)
|
||||
|
||||
for _, event := range events {
|
||||
idx.eventSeq = idx.eventSeq + 1
|
||||
// only index events with a non-empty type
|
||||
if len(event.Type) == 0 {
|
||||
continue
|
||||
@@ -474,7 +571,7 @@ func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ
|
||||
}
|
||||
|
||||
if attr.GetIndex() {
|
||||
key, err := eventKey(compositeKey, typ, attr.Value, height)
|
||||
key, err := eventKey(compositeKey, typ, attr.Value, height, idx.eventSeq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create block index key: %w", err)
|
||||
}
|
||||
|
||||
@@ -140,3 +140,141 @@ func TestBlockIndexer(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockIndexerMulti(t *testing.T) {
|
||||
store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events"))
|
||||
indexer := blockidxkv.New(store)
|
||||
|
||||
require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{
|
||||
Header: types.Header{Height: 1},
|
||||
ResultBeginBlock: abci.ResponseBeginBlock{
|
||||
Events: []abci.Event{},
|
||||
},
|
||||
ResultEndBlock: abci.ResponseEndBlock{
|
||||
Events: []abci.Event{
|
||||
{
|
||||
Type: "end_event",
|
||||
Attributes: []abci.EventAttribute{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("100"),
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: []byte("bar"),
|
||||
Value: []byte("200"),
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "end_event",
|
||||
Attributes: []abci.EventAttribute{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("300"),
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: []byte("bar"),
|
||||
Value: []byte("400"),
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{
|
||||
Header: types.Header{Height: 2},
|
||||
ResultBeginBlock: abci.ResponseBeginBlock{
|
||||
Events: []abci.Event{},
|
||||
},
|
||||
ResultEndBlock: abci.ResponseEndBlock{
|
||||
Events: []abci.Event{
|
||||
{
|
||||
Type: "end_event",
|
||||
Attributes: []abci.EventAttribute{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("100"),
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: []byte("bar"),
|
||||
Value: []byte("200"),
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "end_event",
|
||||
Attributes: []abci.EventAttribute{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("300"),
|
||||
Index: true,
|
||||
},
|
||||
{
|
||||
Key: []byte("bar"),
|
||||
Value: []byte("400"),
|
||||
Index: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
testCases := map[string]struct {
|
||||
q *query.Query
|
||||
results []int64
|
||||
}{
|
||||
"query return all events from a height - exact": {
|
||||
q: query.MustParse("match.events = 1 AND block.height = 1"),
|
||||
results: []int64{1},
|
||||
},
|
||||
"query return all events from a height - exact (deduplicate height)": {
|
||||
q: query.MustParse("match.events = 1 AND block.height = 1 AND block.height = 2"),
|
||||
results: []int64{1},
|
||||
},
|
||||
"query return all events from a height - range": {
|
||||
q: query.MustParse("match.events = 1 AND block.height < 2 AND block.height > 0 AND block.height > 0"),
|
||||
results: []int64{1},
|
||||
},
|
||||
"query matches fields from same event": {
|
||||
q: query.MustParse("match.events = 1 AND end_event.bar < 300 AND end_event.foo = 100 AND block.height > 0 AND block.height <= 2"),
|
||||
results: []int64{1, 2},
|
||||
},
|
||||
"query matches fields from multiple events": {
|
||||
q: query.MustParse("match.events = 1 AND end_event.foo = 100 AND end_event.bar = 400 AND block.height = 2"),
|
||||
results: []int64{},
|
||||
},
|
||||
"deduplication test - match.events only at beginning": {
|
||||
q: query.MustParse("end_event.foo = 100 AND end_event.bar = 400 AND block.height = 2 AND match.events = 1"),
|
||||
results: []int64{2},
|
||||
},
|
||||
"deduplication test - match.events multiple": {
|
||||
q: query.MustParse("match.events = 1 AND end_event.foo = 100 AND end_event.bar = 400 AND block.height = 2 AND match.events = 1"),
|
||||
results: []int64{},
|
||||
},
|
||||
"query matches fields from multiple events allowed": {
|
||||
q: query.MustParse("end_event.foo = 100 AND end_event.bar = 400"),
|
||||
results: []int64{1, 2},
|
||||
},
|
||||
"query matches fields from all events whose attribute is within range": {
|
||||
q: query.MustParse("match.events = 1 AND end_event.foo < 300 AND block.height = 2"),
|
||||
results: []int64{1, 2},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
results, err := indexer.Search(context.Background(), tc.q)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.results, results)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,13 +40,14 @@ func heightKey(height int64) ([]byte, error) {
|
||||
)
|
||||
}
|
||||
|
||||
func eventKey(compositeKey, typ, eventValue string, height int64) ([]byte, error) {
|
||||
func eventKey(compositeKey, typ, eventValue string, height int64, eventSeq int64) ([]byte, error) {
|
||||
return orderedcode.Append(
|
||||
nil,
|
||||
compositeKey,
|
||||
eventValue,
|
||||
height,
|
||||
typ,
|
||||
eventSeq,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -74,24 +75,97 @@ func parseValueFromEventKey(key []byte) (string, error) {
|
||||
height int64
|
||||
)
|
||||
|
||||
remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ)
|
||||
_, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse event key: %w", err)
|
||||
}
|
||||
|
||||
if len(remaining) != 0 {
|
||||
return "", fmt.Errorf("unexpected remainder in key: %s", remaining)
|
||||
}
|
||||
|
||||
return eventValue, nil
|
||||
}
|
||||
|
||||
func lookForHeight(conditions []query.Condition) (int64, bool) {
|
||||
for _, c := range conditions {
|
||||
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
|
||||
return c.Operand.(int64), true
|
||||
func parseHeightFromEventKey(key []byte) (int64, error) {
|
||||
var (
|
||||
compositeKey, typ, eventValue string
|
||||
height int64
|
||||
)
|
||||
|
||||
_, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("failed to parse event key: %w", err)
|
||||
}
|
||||
|
||||
return height, nil
|
||||
}
|
||||
|
||||
func parseEventSeqFromEventKey(key []byte) (int64, error) {
|
||||
var (
|
||||
compositeKey, typ, eventValue string
|
||||
height int64
|
||||
eventSeq int64
|
||||
)
|
||||
|
||||
remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to parse event key: %w", err)
|
||||
}
|
||||
|
||||
// This is done to support previous versions that did not have event sequence in their key
|
||||
if len(remaining) != 0 {
|
||||
remaining, err = orderedcode.Parse(remaining, &eventSeq)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to parse event key: %w", err)
|
||||
}
|
||||
if len(remaining) != 0 {
|
||||
return 0, fmt.Errorf("unexpected remainder in key: %s", remaining)
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
return eventSeq, nil
|
||||
}
|
||||
|
||||
func lookForHeight(conditions []query.Condition) (int64, bool, int) {
|
||||
for i, c := range conditions {
|
||||
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
|
||||
return c.Operand.(int64), true, i
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false, -1
|
||||
}
|
||||
|
||||
func dedupHeight(conditions []query.Condition) (dedupConditions []query.Condition, height int64, found bool, idx int) {
|
||||
idx = -1
|
||||
for i, c := range conditions {
|
||||
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
|
||||
if found {
|
||||
continue
|
||||
} else {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
height = c.Operand.(int64)
|
||||
found = true
|
||||
idx = i
|
||||
}
|
||||
} else {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func dedupMatchEvents(conditions []query.Condition) ([]query.Condition, bool) {
|
||||
var dedupConditions []query.Condition
|
||||
matchEvents := false
|
||||
for i, c := range conditions {
|
||||
if c.CompositeKey == types.MatchEventKey {
|
||||
// Match events should be added only via RPC as the very first query condition
|
||||
if i == 0 {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
matchEvents = true
|
||||
}
|
||||
} else {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
}
|
||||
|
||||
}
|
||||
return dedupConditions, matchEvents
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ var _ txindex.TxIndexer = (*TxIndex)(nil)
|
||||
// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
|
||||
type TxIndex struct {
|
||||
store dbm.DB
|
||||
// Number the events in the event list
|
||||
eventSeq int64
|
||||
}
|
||||
|
||||
// NewTxIndex creates new KV indexer.
|
||||
@@ -134,6 +136,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
|
||||
|
||||
func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error {
|
||||
for _, event := range result.Result.Events {
|
||||
txi.eventSeq = txi.eventSeq + 1
|
||||
// only index events with a non-empty type
|
||||
if len(event.Type) == 0 {
|
||||
continue
|
||||
@@ -151,7 +154,7 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba
|
||||
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
|
||||
}
|
||||
if attr.GetIndex() {
|
||||
err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash)
|
||||
err := store.Set(keyForEvent(compositeTag, attr.Value, result, txi.eventSeq), hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -206,19 +209,46 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
}
|
||||
}
|
||||
|
||||
var matchEvents bool
|
||||
var matchEventIdx int
|
||||
|
||||
// If the match.events keyword is at the beginning of the query, we will only
|
||||
// return heights where the conditions are true within the same event
|
||||
// and set the matchEvents to true
|
||||
conditions, matchEvents = dedupMatchEvents(conditions)
|
||||
|
||||
if matchEvents {
|
||||
matchEventIdx = 0
|
||||
} else {
|
||||
matchEventIdx = -1
|
||||
}
|
||||
|
||||
// conditions to skip because they're handled before "everything else"
|
||||
skipIndexes := make([]int, 0)
|
||||
|
||||
if matchEventIdx != -1 {
|
||||
skipIndexes = append(skipIndexes, matchEventIdx)
|
||||
}
|
||||
// extract ranges
|
||||
// if both upper and lower bounds exist, it's better to get them in order not
|
||||
// no iterate over kvs that are not within range.
|
||||
ranges, rangeIndexes := indexer.LookForRanges(conditions)
|
||||
var heightRanges indexer.QueryRange
|
||||
if len(ranges) > 0 {
|
||||
skipIndexes = append(skipIndexes, rangeIndexes...)
|
||||
|
||||
for _, qr := range ranges {
|
||||
|
||||
//If we have a query range over height and want to still look for
|
||||
// specific event values we do not want to simply return all
|
||||
// transactios in this height range. We remember the height range info
|
||||
// and pass it on to match() to take into account when processing events.
|
||||
if qr.Key == types.TxHeightKey && matchEvents {
|
||||
heightRanges = qr
|
||||
continue
|
||||
}
|
||||
if !hashesInitialized {
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true)
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true, matchEvents)
|
||||
hashesInitialized = true
|
||||
|
||||
// Ignore any remaining conditions if the first condition resulted
|
||||
@@ -227,13 +257,24 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false)
|
||||
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false, matchEvents)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if there is a height condition ("tx.height=3"), extract it
|
||||
height := lookForHeight(conditions)
|
||||
var height int64
|
||||
var heightIdx int
|
||||
if matchEvents {
|
||||
// If we are not matching events and tx.height = 3 occurs more than once, the later value will
|
||||
// overwrite the first one. For match.events it will create problems.
|
||||
conditions, height, heightIdx = dedupHeight(conditions)
|
||||
} else {
|
||||
height, heightIdx = lookForHeight(conditions)
|
||||
}
|
||||
if matchEvents && (len(conditions) != 2) {
|
||||
skipIndexes = append(skipIndexes, heightIdx)
|
||||
}
|
||||
|
||||
// for all other conditions
|
||||
for i, c := range conditions {
|
||||
@@ -242,7 +283,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
}
|
||||
|
||||
if !hashesInitialized {
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true)
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true, matchEvents, height, heightRanges)
|
||||
hashesInitialized = true
|
||||
|
||||
// Ignore any remaining conditions if the first condition resulted
|
||||
@@ -251,7 +292,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false)
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false, matchEvents, height, heightRanges)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,13 +327,21 @@ func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error)
|
||||
}
|
||||
|
||||
// lookForHeight returns a height if there is an "height=X" condition.
|
||||
func lookForHeight(conditions []query.Condition) (height int64) {
|
||||
for _, c := range conditions {
|
||||
func lookForHeight(conditions []query.Condition) (height int64, heightIdx int) {
|
||||
for i, c := range conditions {
|
||||
if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual {
|
||||
return c.Operand.(int64)
|
||||
return c.Operand.(int64), i
|
||||
}
|
||||
}
|
||||
return 0
|
||||
return 0, -1
|
||||
}
|
||||
func (txi *TxIndex) setTmpHashes(tmpHeights map[string][]byte, it dbm.Iterator, matchEvents bool) {
|
||||
if matchEvents {
|
||||
eventSeq := extractEventSeqFromKey(it.Key())
|
||||
tmpHeights[string(it.Value())+eventSeq] = it.Value()
|
||||
} else {
|
||||
tmpHeights[string(it.Value())] = it.Value()
|
||||
}
|
||||
}
|
||||
|
||||
// match returns all matching txs by hash that meet a given condition and start
|
||||
@@ -306,6 +355,9 @@ func (txi *TxIndex) match(
|
||||
startKeyBz []byte,
|
||||
filteredHashes map[string][]byte,
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
height int64,
|
||||
heightRanges indexer.QueryRange,
|
||||
) map[string][]byte {
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
// no matches (assuming AND operand).
|
||||
@@ -325,8 +377,24 @@ func (txi *TxIndex) match(
|
||||
|
||||
EQ_LOOP:
|
||||
for ; it.Valid(); it.Next() {
|
||||
tmpHashes[string(it.Value())] = it.Value()
|
||||
|
||||
// If we have a height range in a query, we need only transactions
|
||||
// for this height
|
||||
if heightRanges.Key != "" {
|
||||
eventHeight, err := extractHeightFromKey(it.Key())
|
||||
if err != nil || !checkBounds(heightRanges, eventHeight) {
|
||||
continue
|
||||
}
|
||||
} else if height != 0 {
|
||||
// If we have a particular height in the query, return only transactions
|
||||
// matching this height.
|
||||
eventHeight, err := extractHeightFromKey(it.Key())
|
||||
if eventHeight != height || err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
txi.setTmpHashes(tmpHashes, it, matchEvents)
|
||||
// Potentially exit early.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -349,7 +417,7 @@ func (txi *TxIndex) match(
|
||||
|
||||
EXISTS_LOOP:
|
||||
for ; it.Valid(); it.Next() {
|
||||
tmpHashes[string(it.Value())] = it.Value()
|
||||
txi.setTmpHashes(tmpHashes, it, matchEvents)
|
||||
|
||||
// Potentially exit early.
|
||||
select {
|
||||
@@ -379,7 +447,7 @@ func (txi *TxIndex) match(
|
||||
}
|
||||
|
||||
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
|
||||
tmpHashes[string(it.Value())] = it.Value()
|
||||
txi.setTmpHashes(tmpHashes, it, matchEvents)
|
||||
}
|
||||
|
||||
// Potentially exit early.
|
||||
@@ -410,8 +478,9 @@ func (txi *TxIndex) match(
|
||||
// Remove/reduce matches in filteredHashes that were not found in this
|
||||
// match (tmpHashes).
|
||||
REMOVE_LOOP:
|
||||
for k := range filteredHashes {
|
||||
if tmpHashes[k] == nil {
|
||||
for k, v := range filteredHashes {
|
||||
tmpHash := tmpHashes[k]
|
||||
if tmpHash == nil || !bytes.Equal(tmpHash, v) {
|
||||
delete(filteredHashes, k)
|
||||
|
||||
// Potentially exit early.
|
||||
@@ -437,6 +506,7 @@ func (txi *TxIndex) matchRange(
|
||||
startKey []byte,
|
||||
filteredHashes map[string][]byte,
|
||||
firstRun bool,
|
||||
matchEvents bool,
|
||||
) map[string][]byte {
|
||||
// A previous match was attempted but resulted in no matches, so we return
|
||||
// no matches (assuming AND operand).
|
||||
@@ -445,8 +515,6 @@ func (txi *TxIndex) matchRange(
|
||||
}
|
||||
|
||||
tmpHashes := make(map[string][]byte)
|
||||
lowerBound := qr.LowerBoundValue()
|
||||
upperBound := qr.UpperBoundValue()
|
||||
|
||||
it, err := dbm.IteratePrefix(txi.store, startKey)
|
||||
if err != nil {
|
||||
@@ -466,17 +534,8 @@ LOOP:
|
||||
continue LOOP
|
||||
}
|
||||
|
||||
include := true
|
||||
if lowerBound != nil && v < lowerBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
if upperBound != nil && v > upperBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
if include {
|
||||
tmpHashes[string(it.Value())] = it.Value()
|
||||
if checkBounds(qr, v) {
|
||||
txi.setTmpHashes(tmpHashes, it, matchEvents)
|
||||
}
|
||||
|
||||
// XXX: passing time in a ABCI Events is not yet implemented
|
||||
@@ -512,8 +571,9 @@ LOOP:
|
||||
// Remove/reduce matches in filteredHashes that were not found in this
|
||||
// match (tmpHashes).
|
||||
REMOVE_LOOP:
|
||||
for k := range filteredHashes {
|
||||
if tmpHashes[k] == nil {
|
||||
for k, v := range filteredHashes {
|
||||
tmpHash := tmpHashes[k]
|
||||
if tmpHash == nil || !bytes.Equal(tmpHashes[k], v) {
|
||||
delete(filteredHashes, k)
|
||||
|
||||
// Potentially exit early.
|
||||
@@ -531,29 +591,49 @@ REMOVE_LOOP:
|
||||
// Keys
|
||||
|
||||
func isTagKey(key []byte) bool {
|
||||
return strings.Count(string(key), tagKeySeparator) == 3
|
||||
// This should be always 4 if data is indexed together with event sequences
|
||||
// The check for 3 was added to allow data indexed before (w/o the event number)
|
||||
// to be retrieved.
|
||||
numTags := strings.Count(string(key), tagKeySeparator)
|
||||
return numTags == 4 || numTags == 3
|
||||
}
|
||||
|
||||
func extractHeightFromKey(key []byte) (int64, error) {
|
||||
parts := strings.SplitN(string(key), tagKeySeparator, -1)
|
||||
return strconv.ParseInt(parts[2], 10, 64)
|
||||
}
|
||||
func extractValueFromKey(key []byte) string {
|
||||
parts := strings.SplitN(string(key), tagKeySeparator, 3)
|
||||
parts := strings.SplitN(string(key), tagKeySeparator, -1)
|
||||
return parts[1]
|
||||
}
|
||||
|
||||
func keyForEvent(key string, value string, result *abci.TxResult) []byte {
|
||||
return []byte(fmt.Sprintf("%s/%s/%d/%d",
|
||||
func extractEventSeqFromKey(key []byte) string {
|
||||
parts := strings.SplitN(string(key), tagKeySeparator, -1)
|
||||
|
||||
if len(parts) == 5 {
|
||||
return parts[4]
|
||||
}
|
||||
return "0"
|
||||
}
|
||||
func keyForEvent(key string, value string, result *abci.TxResult, eventSeq int64) []byte {
|
||||
return []byte(fmt.Sprintf("%s/%s/%d/%d/%d",
|
||||
key,
|
||||
value,
|
||||
result.Height,
|
||||
result.Index,
|
||||
eventSeq,
|
||||
))
|
||||
}
|
||||
|
||||
func keyForHeight(result *abci.TxResult) []byte {
|
||||
return []byte(fmt.Sprintf("%s/%d/%d/%d",
|
||||
return []byte(fmt.Sprintf("%s/%d/%d/%d/%d",
|
||||
types.TxHeightKey,
|
||||
result.Height,
|
||||
result.Height,
|
||||
result.Index,
|
||||
// Added to facilitate having the eventSeq in event keys
|
||||
// Otherwise queries break expecting 5 entries
|
||||
0,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -571,3 +651,18 @@ func startKey(fields ...interface{}) []byte {
|
||||
}
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
func checkBounds(ranges indexer.QueryRange, v int64) bool {
|
||||
include := true
|
||||
lowerBound := ranges.LowerBoundValue()
|
||||
upperBound := ranges.UpperBoundValue()
|
||||
if lowerBound != nil && v < lowerBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
if upperBound != nil && v > upperBound.(int64) {
|
||||
include = false
|
||||
}
|
||||
|
||||
return include
|
||||
}
|
||||
|
||||
@@ -139,6 +139,78 @@ func TestTxSearch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxSearchEventMatch(t *testing.T) {
|
||||
|
||||
indexer := NewTxIndex(db.NewMemDB())
|
||||
|
||||
txResult := txResultWithEvents([]abci.Event{
|
||||
{Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}, {Key: []byte("owner"), Value: []byte("Ana"), Index: true}}},
|
||||
{Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("2"), Index: true}, {Key: []byte("owner"), Value: []byte("Ivan"), Index: true}}},
|
||||
{Type: "", Attributes: []abci.EventAttribute{{Key: []byte("not_allowed"), Value: []byte("Vlad"), Index: true}}},
|
||||
})
|
||||
|
||||
err := indexer.Index(txResult)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := map[string]struct {
|
||||
q string
|
||||
resultsLength int
|
||||
}{
|
||||
"Return all events from a height": {
|
||||
q: "match.events = 1 AND tx.height = 1",
|
||||
resultsLength: 1,
|
||||
},
|
||||
"Return all events from a height (deduplicate height)": {
|
||||
q: "match.events = 1 AND tx.height = 1 AND tx.height = 1",
|
||||
resultsLength: 1,
|
||||
},
|
||||
"Match attributes with height range and event": {
|
||||
q: "match.events = 1 AND tx.height < 2 AND tx.height > 0 AND account.number = 1 AND account.owner CONTAINS 'Ana'",
|
||||
resultsLength: 1,
|
||||
},
|
||||
"Match attributes with height range and event - no match": {
|
||||
q: "match.events = 1 AND tx.height < 2 AND tx.height > 0 AND account.number = 2 AND account.owner = 'Ana'",
|
||||
resultsLength: 0,
|
||||
},
|
||||
"Deduplucation test - match events only at the beginning": {
|
||||
q: "tx.height < 2 AND tx.height > 0 AND account.number = 2 AND account.owner = 'Ana' AND match.events = 1",
|
||||
resultsLength: 1,
|
||||
},
|
||||
"Deduplucation test - match events multiple": {
|
||||
q: "match.events = 1 AND tx.height < 2 AND tx.height > 0 AND account.number = 2 AND account.owner = 'Ana' AND match.events = 1",
|
||||
resultsLength: 0,
|
||||
},
|
||||
"Match attributes with event": {
|
||||
q: "account.number = 2 AND account.owner = 'Ana' AND tx.height = 1",
|
||||
resultsLength: 1,
|
||||
},
|
||||
"Match range w/o match events": {
|
||||
q: "account.number < 2 AND account.owner = 'Ivan'",
|
||||
resultsLength: 1,
|
||||
},
|
||||
" Match range with match events": {
|
||||
q: "match.events = 1 AND account.number < 2 AND account.owner = 'Ivan'",
|
||||
resultsLength: 0,
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.q, func(t *testing.T) {
|
||||
results, err := indexer.Search(ctx, query.MustParse(tc.q))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, results, tc.resultsLength)
|
||||
if tc.resultsLength > 0 {
|
||||
for _, txr := range results {
|
||||
assert.True(t, proto.Equal(txResult, txr))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
func TestTxSearchWithCancelation(t *testing.T) {
|
||||
indexer := NewTxIndex(db.NewMemDB())
|
||||
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/google/orderedcode"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// IntInSlice returns true if a is found in the list.
|
||||
func intInSlice(a int, list []int) bool {
|
||||
for _, b := range list {
|
||||
@@ -9,3 +17,60 @@ func intInSlice(a int, list []int) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func dedupMatchEvents(conditions []query.Condition) ([]query.Condition, bool) {
|
||||
var dedupConditions []query.Condition
|
||||
matchEvents := false
|
||||
for i, c := range conditions {
|
||||
if c.CompositeKey == types.MatchEventKey {
|
||||
// Match events should be added only via RPC as the very first query condition
|
||||
if i == 0 {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
matchEvents = true
|
||||
}
|
||||
} else {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
}
|
||||
|
||||
}
|
||||
return dedupConditions, matchEvents
|
||||
}
|
||||
|
||||
func ParseEventSeqFromEventKey(key []byte) (int64, error) {
|
||||
var (
|
||||
compositeKey, typ, eventValue string
|
||||
height int64
|
||||
eventSeq int64
|
||||
)
|
||||
|
||||
remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ, &eventSeq)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to parse event key: %w", err)
|
||||
}
|
||||
|
||||
if len(remaining) != 0 {
|
||||
return 0, fmt.Errorf("unexpected remainder in key: %s", remaining)
|
||||
}
|
||||
|
||||
return eventSeq, nil
|
||||
}
|
||||
func dedupHeight(conditions []query.Condition) (dedupConditions []query.Condition, height int64, idx int) {
|
||||
found := false
|
||||
idx = -1
|
||||
height = 0
|
||||
for i, c := range conditions {
|
||||
if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual {
|
||||
if found {
|
||||
continue
|
||||
} else {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
height = c.Operand.(int64)
|
||||
found = true
|
||||
idx = i
|
||||
}
|
||||
} else {
|
||||
dedupConditions = append(dedupConditions, c)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -140,6 +140,11 @@ const (
|
||||
// BlockHeightKey is a reserved key used for indexing BeginBlock and Endblock
|
||||
// events.
|
||||
BlockHeightKey = "block.height"
|
||||
|
||||
// MatchEventsKey is a reserved key used to indicate to the indexer that the
|
||||
// conditions in the query have to have occurred both on the same height
|
||||
// as well as in the same event
|
||||
MatchEventKey = "match.events"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user