From b6d99beae6fd3cde609a72e494c1f9f202bb44e8 Mon Sep 17 00:00:00 2001 From: Jasmina Malicevic Date: Wed, 14 Dec 2022 10:34:38 +0100 Subject: [PATCH] state/kvindexer: associate event attributes with events (#9759) * Updated event sequencing and added query keyword * code cosmetics * Documentation update * Added per event indexing and querying to txindexer * rpc test fix * Added support for older versions where event sequencing is not supported * Added support for old versions to tx indexer * Added RPC match flag, fixed bugs in tx indexer, added tests * Removed reference to match.events from the docs * Openapi update * Added height deduplication Co-authored-by: Thane Thomson Co-authored-by: Anca Zamfir Co-authored-by: Sergio Mena Co-authored-by: Romain Ruetschi Co-authored-by: Thane Thomson --- CHANGELOG_PENDING.md | 2 + abci/example/kvstore/kvstore.go | 82 +++++++++- abci/example/kvstore/persistent_kvstore.go | 3 + docs/app-dev/indexing-transactions.md | 83 ++++++++++- light/proxy/routes.go | 22 ++- rpc/client/main_test.go | 2 + rpc/client/rpc_test.go | 30 +++- rpc/core/blocks.go | 14 +- rpc/core/routes.go | 4 +- rpc/core/tx.go | 20 +++ rpc/openapi/openapi.yaml | 16 ++ state/indexer/block/kv/kv.go | 159 ++++++++++++++++---- state/indexer/block/kv/kv_test.go | 138 +++++++++++++++++ state/indexer/block/kv/util.go | 96 ++++++++++-- state/txindex/kv/kv.go | 165 ++++++++++++++++----- state/txindex/kv/kv_test.go | 72 +++++++++ state/txindex/kv/utils.go | 65 ++++++++ types/events.go | 5 + 18 files changed, 886 insertions(+), 92 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 5edc5dd32..79d7e3e5a 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -39,9 +39,11 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### FEATURES - [abci] \#9301 New ABCI methods `PrepareProposal` and `ProcessProposal` which give the app control over transactions proposed and allows for verification of proposed blocks. +- [rpc] \#9759 Added `match_event` query parameter to indicate to Tendermint that the query should match event attributes within events, not only within a height.(@jmalicevic) ### IMPROVEMENTS +- [state/kvindexer] \#9759 Added `match.event` keyword to support condition evalution based on the event attributes belong to. (@jmalicevic) - [crypto] \#9250 Update to use btcec v2 and the latest btcutil. (@wcsiu) - [proto] \#9356 Migrate from `gogo/protobuf` to `cosmos/gogoproto` (@julienrbrt) - [rpc] \#9276 Added `header` and `header_by_hash` queries to the RPC client (@samricotta) diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 38aab62ca..4c7c5716f 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -69,6 +69,9 @@ type Application struct { state State RetainBlocks int64 // blocks to retain after commit (via ResponseCommit.RetainHeight) txToRemove map[string]struct{} + // If true, the app will generate block events in BeginBlock. Used to test the event indexer + // Should be false by default to avoid generating too much data. + genBlockEvents bool } func NewApplication() *Application { @@ -76,6 +79,10 @@ func NewApplication() *Application { return &Application{state: state} } +func (app *Application) SetGenBlockEvents() { + app.genBlockEvents = true +} + func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) { return types.ResponseInfo{ Data: fmt.Sprintf("{\"size\":%v}", app.state.Size), @@ -116,6 +123,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli {Key: "noindex_key", Value: "index is working", Index: false}, }, }, + { + Type: "app", + Attributes: []types.EventAttribute{ + {Key: "creator", Value: "Cosmoshi", Index: true}, + {Key: "key", Value: value, Index: true}, + {Key: "index_key", Value: "index is working", Index: true}, + {Key: "noindex_key", Value: "index is working", Index: false}, + }, + }, } return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} @@ -189,7 +205,70 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo func (app *Application) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock { app.txToRemove = map[string]struct{}{} - return types.ResponseBeginBlock{} + response := types.ResponseBeginBlock{} + + if !app.genBlockEvents { + return response + } + + if app.state.Height%2 == 0 { + response = types.ResponseBeginBlock{ + Events: []types.Event{ + { + Type: "begin_event", + Attributes: []types.EventAttribute{ + { + Key: "foo", + Value: "100", + Index: true, + }, + { + Key: "bar", + Value: "200", + Index: true, + }, + }, + }, + { + Type: "begin_event", + Attributes: []types.EventAttribute{ + { + Key: "foo", + Value: "200", + Index: true, + }, + { + Key: "bar", + Value: "300", + Index: true, + }, + }, + }, + }, + } + } else { + response = types.ResponseBeginBlock{ + Events: []types.Event{ + { + Type: "begin_event", + Attributes: []types.EventAttribute{ + { + Key: "foo", + Value: "400", + Index: true, + }, + { + Key: "bar", + Value: "300", + Index: true, + }, + }, + }, + }, + } + } + + return response } func (app *Application) ProcessProposal( @@ -200,4 +279,5 @@ func (app *Application) ProcessProposal( } } return types.ResponseProcessProposal{Status: types.ResponseProcessProposal_ACCEPT} + } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index 8aa255b75..a92e71f5d 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -54,6 +54,9 @@ func NewPersistentKVStoreApplication(dbDir string) *PersistentKVStoreApplication } } +func (app *PersistentKVStoreApplication) SetGenBlockEvents() { + app.app.genBlockEvents = true +} func (app *PersistentKVStoreApplication) SetLogger(l log.Logger) { app.logger = l } diff --git a/docs/app-dev/indexing-transactions.md b/docs/app-dev/indexing-transactions.md index 019fe5f2d..f7e2d6911 100644 --- a/docs/app-dev/indexing-transactions.md +++ b/docs/app-dev/indexing-transactions.md @@ -34,6 +34,9 @@ would be equal to the composite key of `jack.account.number`. By default, Tendermint will index all transactions by their respective hashes and height and blocks by their height. +Tendermint allows for different events within the same height to have +equal attributes. + ## Configuration Operators can configure indexing via the `[tx_index]` section. The `indexer` @@ -67,6 +70,56 @@ for block and transaction events directly against Tendermint's RPC. However, the query syntax is limited and so this indexer type might be deprecated or removed entirely in the future. +**Implementation and data layout** + +The kv indexer stores each attribute of an event individually, by creating a composite key +of the *event type*, *attribute key*, *attribute value*, *height* and *event sequence*. + +For example the following events: + +``` +Type: "transfer", + Attributes: []abci.EventAttribute{ + {Key: []byte("sender"), Value: []byte("Bob"), Index: true}, + {Key: []byte("recipient"), Value: []byte("Alice"), Index: true}, + {Key: []byte("balance"), Value: []byte("100"), Index: true}, + {Key: []byte("note"), Value: []byte("nothing"), Index: true}, + }, + +``` + +``` +Type: "transfer", + Attributes: []abci.EventAttribute{ + {Key: []byte("sender"), Value: []byte("Tom"), Index: true}, + {Key: []byte("recipient"), Value: []byte("Alice"), Index: true}, + {Key: []byte("balance"), Value: []byte("200"), Index: true}, + {Key: []byte("note"), Value: []byte("nothing"), Index: true}, + }, +``` + +will be represented as follows in the store: + +``` +Key value +transferSenderBobEndBlock1 1 +transferRecepientAliceEndBlock11 1 +transferBalance100EndBlock11 1 +transferNodeNothingEndblock11 1 +---- event2 ------ +transferSenderTomEndBlock12 1 +transferRecepientAliceEndBlock12 1 +transferBalance200EndBlock12 1 +transferNodeNothingEndblock12 1 + +``` +The key is thus formed of the event type, the attribute key and value, the event the attribute belongs to (`EndBlock` or `BeginBlock`), +the height and the event number. The event number is a local variable kept by the indexer and incremented when a new event is processed. + +It is an `int64` variable and has no other semantics besides being used to associate attributes belonging to the same events within a height. +This variable is not atomically incremented as event indexing is deterministic. **Should this ever change**, the event id generation +will be broken. + #### PostgreSQL The `psql` indexer type allows an operator to enable block and transaction event @@ -145,6 +198,9 @@ You can query for a paginated set of transaction by their events by calling the ```bash curl "localhost:26657/tx_search?query=\"message.sender='cosmos1...'\"&prove=true" ``` +If the conditions are related to transaction events and the user wants to make sure the +conditions are true within the same events, the `match.event` keyword should be used, +as described [below](#querying_block_events) Check out [API docs](https://docs.tendermint.com/v0.37/rpc/#/Info/tx_search) for more information on query syntax and other options. @@ -168,7 +224,7 @@ a query to `/subscribe` RPC endpoint. Check out [API docs](https://docs.tendermint.com/v0.37/rpc/#subscribe) for more information on query syntax and other options. -## Querying Blocks Events +## Querying Block Events You can query for a paginated set of blocks by their events by calling the `/block_search` RPC endpoint: @@ -177,5 +233,30 @@ You can query for a paginated set of blocks by their events by calling the curl "localhost:26657/block_search?query=\"block.height > 10 AND val_set.num_changed > 0\"" ``` +## `match_events` keyword + +The query results in the height number(s) (or transaction hashes when querying transactions) which contain events whose attributes match the query conditions. +However, there are two options to query the indexers. To demonstrate the two modes, we reuse the two events +where Bob and Tom send money to Alice and query the block indexer. We issue the following query: + +```bash +curl "localhost:26657/block_search?query=\"sender=Bob AND balance = 200\"" +``` + +The result will return height 1 even though the attributes matching the conditions in the query +occurred in different events. + +If we wish to retrieve only heights where the attributes occurred within the same event, +the query syntax is as follows: + +```bash +curl "localhost:26657/block_search?query=\"sender=Bob AND balance = 200\"&match_events=true" +``` +Currently the default behaviour is if `match_events` is set to false. + Check out [API docs](https://docs.tendermint.com/v0.37/rpc/#/Info/block_search) for more information on query syntax and other options. + +**Backwards compatibility** + +Up until Tendermint 0.34.25, the event sequence was not stored in the kvstore and the `match_events` keyword in the RPC query is not ignored by older versions. Thus, in a network running mixed Tendermint versions, nodes running older versions will still return blocks (or transactions) whose attributes match within different events on the same height. \ No newline at end of file diff --git a/light/proxy/routes.go b/light/proxy/routes.go index c97a91dfd..9e0d590bd 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -31,8 +31,8 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc { "block_results": rpcserver.NewRPCFunc(makeBlockResultsFunc(c), "height"), "commit": rpcserver.NewRPCFunc(makeCommitFunc(c), "height"), "tx": rpcserver.NewRPCFunc(makeTxFunc(c), "hash,prove"), - "tx_search": rpcserver.NewRPCFunc(makeTxSearchFunc(c), "query,prove,page,per_page,order_by"), - "block_search": rpcserver.NewRPCFunc(makeBlockSearchFunc(c), "query,page,per_page,order_by"), + "tx_search": rpcserver.NewRPCFunc(makeTxSearchFuncMatchEvents(c), "query,prove,page,per_page,order_by,match_events"), + "block_search": rpcserver.NewRPCFunc(makeBlockSearchFuncMatchEvents(c), "query,page,per_page,order_by,match_events"), "validators": rpcserver.NewRPCFunc(makeValidatorsFunc(c), "height,page,per_page"), "dump_consensus_state": rpcserver.NewRPCFunc(makeDumpConsensusStateFunc(c), ""), "consensus_state": rpcserver.NewRPCFunc(makeConsensusStateFunc(c), ""), @@ -158,42 +158,52 @@ func makeTxFunc(c *lrpc.Client) rpcTxFunc { } } -type rpcTxSearchFunc func( +type rpcTxSearchFuncMatchEvents func( ctx *rpctypes.Context, query string, prove bool, page, perPage *int, orderBy string, + matchEvents bool, ) (*ctypes.ResultTxSearch, error) -func makeTxSearchFunc(c *lrpc.Client) rpcTxSearchFunc { +func makeTxSearchFuncMatchEvents(c *lrpc.Client) rpcTxSearchFuncMatchEvents { return func( ctx *rpctypes.Context, query string, prove bool, page, perPage *int, orderBy string, + matchEvents bool, ) (*ctypes.ResultTxSearch, error) { + if matchEvents { + query = query + " AND match.events = 1" + } return c.TxSearch(ctx.Context(), query, prove, page, perPage, orderBy) } } -type rpcBlockSearchFunc func( +type rpcBlockSearchFuncMatchEvents func( ctx *rpctypes.Context, query string, prove bool, page, perPage *int, orderBy string, + matchEvents bool, ) (*ctypes.ResultBlockSearch, error) -func makeBlockSearchFunc(c *lrpc.Client) rpcBlockSearchFunc { +func makeBlockSearchFuncMatchEvents(c *lrpc.Client) rpcBlockSearchFuncMatchEvents { return func( ctx *rpctypes.Context, query string, prove bool, page, perPage *int, orderBy string, + matchEvents bool, ) (*ctypes.ResultBlockSearch, error) { + if matchEvents { + query = query + " AND match.events = 1" + } return c.BlockSearch(ctx.Context(), query, page, perPage, orderBy) } } diff --git a/rpc/client/main_test.go b/rpc/client/main_test.go index 4c0534868..018bd2b6d 100644 --- a/rpc/client/main_test.go +++ b/rpc/client/main_test.go @@ -19,6 +19,8 @@ func TestMain(m *testing.M) { } app := kvstore.NewPersistentKVStoreApplication(dir) + // If testing block event generation + // app.SetGenBlockEvents() needs to be called here node = rpctest.StartTendermint(app) code := m.Run() diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 4b8b11f98..09994581d 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -516,6 +516,27 @@ func TestTxSearchWithTimeout(t *testing.T) { require.Greater(t, len(result.Txs), 0, "expected a lot of transactions") } +// This test does nothing if we do not call app.SetGenBlockEvents() within main_test.go +// It will nevertheless pass as there are no events being generated. +func TestBlockSearch(t *testing.T) { + c := getHTTPClient() + + // first we broadcast a few txs + for i := 0; i < 10; i++ { + _, _, tx := MakeTxKV() + + _, err := c.BroadcastTxCommit(context.Background(), tx) + require.NoError(t, err) + } + require.NoError(t, client.WaitForHeight(c, 5, nil)) + // This cannot test match_events as it calls the client BlockSearch function directly + // It is the RPC request handler that processes the match_event + result, err := c.BlockSearch(context.Background(), "begin_event.foo = 100 AND begin_event.bar = 300", nil, nil, "asc") + require.NoError(t, err) + blockCount := len(result.Blocks) + require.Equal(t, blockCount, 0) + +} func TestTxSearch(t *testing.T) { c := getHTTPClient() @@ -536,8 +557,7 @@ func TestTxSearch(t *testing.T) { find := result.Txs[len(result.Txs)-1] anotherTxHash := types.Tx("a different tx").Hash() - for i, c := range GetClients() { - t.Logf("client %d", i) + for _, c := range GetClients() { // now we query for the tx. result, err := c.TxSearch(context.Background(), fmt.Sprintf("tx.hash='%v'", find.Hash), true, nil, nil, "asc") @@ -616,16 +636,17 @@ func TestTxSearch(t *testing.T) { pages = int(math.Ceil(float64(txCount) / float64(perPage))) ) + totalTx := 0 for page := 1; page <= pages; page++ { page := page - result, err := c.TxSearch(context.Background(), "tx.height >= 1", false, &page, &perPage, "asc") + result, err := c.TxSearch(context.Background(), "tx.height >= 1", true, &page, &perPage, "asc") require.NoError(t, err) if page < pages { require.Len(t, result.Txs, perPage) } else { require.LessOrEqual(t, len(result.Txs), perPage) } - require.Equal(t, txCount, result.TotalCount) + totalTx = totalTx + len(result.Txs) for _, tx := range result.Txs { require.False(t, seen[tx.Height], "Found duplicate height %v in page %v", tx.Height, page) @@ -635,6 +656,7 @@ func TestTxSearch(t *testing.T) { maxHeight = tx.Height } } + require.Equal(t, txCount, totalTx) require.Len(t, seen, txCount) } } diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index bad6f0360..73f022741 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -193,6 +193,19 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR }, nil } +func BlockSearchMatchEvents( + ctx *rpctypes.Context, + query string, + pagePtr, perPagePtr *int, + orderBy string, + matchEvents bool, +) (*ctypes.ResultBlockSearch, error) { + if matchEvents { + query = "match.events = 1 AND " + query + } + return BlockSearch(ctx, query, pagePtr, perPagePtr, orderBy) +} + // BlockSearch searches for a paginated set of blocks matching BeginBlock and // EndBlock event search criteria. func BlockSearch( @@ -205,7 +218,6 @@ func BlockSearch( if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok { return nil, errors.New("block indexing is disabled") } - q, err := tmquery.New(query) if err != nil { return nil, err diff --git a/rpc/core/routes.go b/rpc/core/routes.go index fe2d17e8b..a585037f3 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -28,8 +28,8 @@ var Routes = map[string]*rpc.RPCFunc{ "header_by_hash": rpc.NewRPCFunc(HeaderByHash, "hash"), "check_tx": rpc.NewRPCFunc(CheckTx, "tx"), "tx": rpc.NewRPCFunc(Tx, "hash,prove"), - "tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), - "block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), + "tx_search": rpc.NewRPCFunc(TxSearchMatchEvents, "query,prove,page,per_page,order_by,match_events"), + "block_search": rpc.NewRPCFunc(BlockSearchMatchEvents, "query,page,per_page,order_by,match_events"), "validators": rpc.NewRPCFunc(Validators, "height,page,per_page"), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), "consensus_state": rpc.NewRPCFunc(ConsensusState, ""), diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 493dac5eb..fdd0de2f8 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -129,3 +129,23 @@ func TxSearch( return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil } + +// TxSearchMatchEvents allows you to query for multiple transactions results and match the +// query attributes to a common event. It returns a +// list of transactions (maximum ?per_page entries) and the total count. +// More: https://docs.tendermint.com/v0.34/rpc/#/Info/tx_search +func TxSearchMatchEvents( + ctx *rpctypes.Context, + query string, + prove bool, + pagePtr, perPagePtr *int, + orderBy string, + matchEvents bool, +) (*ctypes.ResultTxSearch, error) { + + if matchEvents { + query = "match.events = 1 AND " + query + } + return TxSearch(ctx, query, prove, pagePtr, perPagePtr, orderBy) + +} diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index b7023eca0..83b7992f5 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1092,6 +1092,14 @@ paths: type: string default: "asc" example: "asc" + - in: query + name: match_events + description: Match attributes in query within events, in addition to the height & txhash + required: false + schema: + type: boolean + default: false + example: true tags: - Info responses: @@ -1147,6 +1155,14 @@ paths: type: string default: "desc" example: "asc" + - in: query + name: match_events + description: Match attributes in query within events, in addition to the height + required: false + schema: + type: boolean + default: false + example: true tags: - Info responses: diff --git a/state/indexer/block/kv/kv.go b/state/indexer/block/kv/kv.go index 1787be9ef..58478b6a1 100644 --- a/state/indexer/block/kv/kv.go +++ b/state/indexer/block/kv/kv.go @@ -1,6 +1,7 @@ package kv import ( + "bytes" "context" "errors" "fmt" @@ -24,6 +25,10 @@ var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) // such that matching search criteria returns the respective block height(s). type BlockerIndexer struct { store dbm.DB + + // Add unique event identifier to use when querying + // Matching will be done both on height AND eventSeq + eventSeq int64 } func New(store dbm.DB) *BlockerIndexer { @@ -95,11 +100,46 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, if err != nil { return nil, fmt.Errorf("failed to parse query conditions: %w", err) } + // conditions to skip because they're handled before "everything else" + skipIndexes := make([]int, 0) + var matchEvents bool + var matchEventIdx int + + // If the match.events keyword is at the beginning of the query, we will only + // return heights where the conditions are true within the same event + // and set the matchEvents to true + conditions, matchEvents = dedupMatchEvents(conditions) + + if matchEvents { + matchEventIdx = 0 + } else { + matchEventIdx = -1 + } + + if matchEventIdx != -1 { + skipIndexes = append(skipIndexes, matchEventIdx) + } // If there is an exact height query, return the result immediately // (if it exists). - height, ok := lookForHeight(conditions) - if ok { + var height int64 + var ok bool + var heightIdx int + if matchEvents { + // If we are not matching events and block.height = 3 occurs more than once, the later value will + // overwrite the first one. For match.events it will create problems. + conditions, height, ok, heightIdx = dedupHeight(conditions) + } else { + height, ok, heightIdx = lookForHeight(conditions) + } + + // If we have additional constraints and want to query per event + // attributes, we cannot simply return all blocks for a height. + // But we remember the height we want to find and forward it to + // match(). If we only have the height constraint and match.events keyword + // in the query (the second part of the ||), we don't need to query + // per event conditions and return all events within the height range. + if ok && (!matchEvents || (matchEvents && len(conditions) == 2)) { ok, err := idx.Has(height) if err != nil { return nil, err @@ -114,24 +154,39 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, var heightsInitialized bool filteredHeights := make(map[string][]byte) - - // conditions to skip because they're handled before "everything else" - skipIndexes := make([]int, 0) + if matchEvents && heightIdx != -1 { + skipIndexes = append(skipIndexes, heightIdx) + } // Extract ranges. If both upper and lower bounds exist, it's better to get // them in order as to not iterate over kvs that are not within range. ranges, rangeIndexes := indexer.LookForRanges(conditions) + var heightRanges indexer.QueryRange if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) for _, qr := range ranges { + // If we have a query range over height and want to still look for + // specific event values we do not want to simply return all + // blocks in this height range. We remember the height range info + // and pass it on to match() to take into account when processing events. + if qr.Key == types.BlockHeightKey && matchEvents { + heightRanges = qr + // If the query contains ranges other than the height then we need to treat the height + // range when querying the conditions of the other range. + // Otherwise we can just return all the blocks within the height range (as there is no + // additional constraint on events) + if len(ranges)+1 != 2 { + continue + } + } prefix, err := orderedcode.Append(nil, qr.Key) if err != nil { return nil, fmt.Errorf("failed to create prefix key: %w", err) } if !heightsInitialized { - filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true) + filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true, matchEvents) if err != nil { return nil, err } @@ -144,7 +199,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, break } } else { - filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false) + filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false, matchEvents) if err != nil { return nil, err } @@ -159,12 +214,13 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, } startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand)) + if err != nil { return nil, err } if !heightsInitialized { - filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true) + filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true, matchEvents, height, heightRanges) if err != nil { return nil, err } @@ -177,7 +233,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, break } } else { - filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false) + filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false, matchEvents, height, heightRanges) if err != nil { return nil, err } @@ -222,6 +278,7 @@ func (idx *BlockerIndexer) matchRange( startKey []byte, filteredHeights map[string][]byte, firstRun bool, + matchEvents bool, ) (map[string][]byte, error) { // A previous match was attempted but resulted in no matches, so we return @@ -231,8 +288,6 @@ func (idx *BlockerIndexer) matchRange( } tmpHeights := make(map[string][]byte) - lowerBound := qr.LowerBoundValue() - upperBound := qr.UpperBoundValue() it, err := dbm.IteratePrefix(idx.store, startKey) if err != nil { @@ -262,18 +317,8 @@ LOOP: if err != nil { continue LOOP } - - include := true - if lowerBound != nil && v < lowerBound.(int64) { - include = false - } - - if upperBound != nil && v > upperBound.(int64) { - include = false - } - - if include { - tmpHeights[string(it.Value())] = it.Value() + if checkBounds(qr, v) { + idx.setTmpHeights(tmpHeights, it, matchEvents) } } @@ -302,8 +347,12 @@ LOOP: // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). - for k := range filteredHeights { - if tmpHeights[k] == nil { + for k, v := range filteredHeights { + tmpHeight := tmpHeights[k] + + // Check whether in this iteration we have not found an overlapping height (tmpHeight == nil) + // or whether the events in which the attributed occurred do not match (first part of the condition) + if tmpHeight == nil || !bytes.Equal(tmpHeight, v) { delete(filteredHeights, k) select { @@ -318,6 +367,33 @@ LOOP: return filteredHeights, nil } +func (idx *BlockerIndexer) setTmpHeights(tmpHeights map[string][]byte, it dbm.Iterator, matchEvents bool) { + // If we return attributes that occur within the same events, then store the event sequence in the + // result map as well + if matchEvents { + eventSeq, _ := parseEventSeqFromEventKey(it.Key()) + retVal := it.Value() + tmpHeights[string(retVal)+strconv.FormatInt(eventSeq, 10)] = it.Value() + } else { + tmpHeights[string(it.Value())] = it.Value() + } +} + +func checkBounds(ranges indexer.QueryRange, v int64) bool { + include := true + lowerBound := ranges.LowerBoundValue() + upperBound := ranges.UpperBoundValue() + if lowerBound != nil && v < lowerBound.(int64) { + include = false + } + + if upperBound != nil && v > upperBound.(int64) { + include = false + } + + return include +} + // match returns all matching heights that meet a given query condition and start // key. An already filtered result (filteredHeights) is provided such that any // non-intersecting matches are removed. @@ -330,6 +406,9 @@ func (idx *BlockerIndexer) match( startKeyBz []byte, filteredHeights map[string][]byte, firstRun bool, + matchEvents bool, + height int64, + heightRanges indexer.QueryRange, ) (map[string][]byte, error) { // A previous match was attempted but resulted in no matches, so we return @@ -349,7 +428,23 @@ func (idx *BlockerIndexer) match( defer it.Close() for ; it.Valid(); it.Next() { - tmpHeights[string(it.Value())] = it.Value() + if matchEvents { + + if heightRanges.Key != "" { + eventHeight, err := parseHeightFromEventKey(it.Key()) + if err != nil || !checkBounds(heightRanges, eventHeight) { + continue + } + } else { + if height != 0 { + eventHeight, _ := parseHeightFromEventKey(it.Key()) + if eventHeight != height { + continue + } + } + } + } + idx.setTmpHeights(tmpHeights, it, matchEvents) if err := ctx.Err(); err != nil { break @@ -373,7 +468,7 @@ func (idx *BlockerIndexer) match( defer it.Close() for ; it.Valid(); it.Next() { - tmpHeights[string(it.Value())] = it.Value() + idx.setTmpHeights(tmpHeights, it, matchEvents) select { case <-ctx.Done(): @@ -406,7 +501,7 @@ func (idx *BlockerIndexer) match( } if strings.Contains(eventValue, c.Operand.(string)) { - tmpHeights[string(it.Value())] = it.Value() + idx.setTmpHeights(tmpHeights, it, matchEvents) } select { @@ -437,8 +532,9 @@ func (idx *BlockerIndexer) match( // Remove/reduce matches in filteredHeights that were not found in this // match (tmpHeights). - for k := range filteredHeights { - if tmpHeights[k] == nil { + for k, v := range filteredHeights { + tmpHeight := tmpHeights[k] + if tmpHeight == nil || !bytes.Equal(tmpHeight, v) { delete(filteredHeights, k) select { @@ -457,6 +553,7 @@ func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ heightBz := int64ToBytes(height) for _, event := range events { + idx.eventSeq = idx.eventSeq + 1 // only index events with a non-empty type if len(event.Type) == 0 { continue @@ -474,7 +571,7 @@ func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ } if attr.GetIndex() { - key, err := eventKey(compositeKey, typ, attr.Value, height) + key, err := eventKey(compositeKey, typ, attr.Value, height, idx.eventSeq) if err != nil { return fmt.Errorf("failed to create block index key: %w", err) } diff --git a/state/indexer/block/kv/kv_test.go b/state/indexer/block/kv/kv_test.go index a23ad24ac..75f9266d3 100644 --- a/state/indexer/block/kv/kv_test.go +++ b/state/indexer/block/kv/kv_test.go @@ -140,3 +140,141 @@ func TestBlockIndexer(t *testing.T) { }) } } + +func TestBlockIndexerMulti(t *testing.T) { + store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events")) + indexer := blockidxkv.New(store) + + require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{}, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte("100"), + Index: true, + }, + { + Key: []byte("bar"), + Value: []byte("200"), + Index: true, + }, + }, + }, + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte("300"), + Index: true, + }, + { + Key: []byte("bar"), + Value: []byte("400"), + Index: true, + }, + }, + }, + }, + }, + })) + + require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 2}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{}, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte("100"), + Index: true, + }, + { + Key: []byte("bar"), + Value: []byte("200"), + Index: true, + }, + }, + }, + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte("300"), + Index: true, + }, + { + Key: []byte("bar"), + Value: []byte("400"), + Index: true, + }, + }, + }, + }, + }, + })) + + testCases := map[string]struct { + q *query.Query + results []int64 + }{ + "query return all events from a height - exact": { + q: query.MustParse("match.events = 1 AND block.height = 1"), + results: []int64{1}, + }, + "query return all events from a height - exact (deduplicate height)": { + q: query.MustParse("match.events = 1 AND block.height = 1 AND block.height = 2"), + results: []int64{1}, + }, + "query return all events from a height - range": { + q: query.MustParse("match.events = 1 AND block.height < 2 AND block.height > 0 AND block.height > 0"), + results: []int64{1}, + }, + "query matches fields from same event": { + q: query.MustParse("match.events = 1 AND end_event.bar < 300 AND end_event.foo = 100 AND block.height > 0 AND block.height <= 2"), + results: []int64{1, 2}, + }, + "query matches fields from multiple events": { + q: query.MustParse("match.events = 1 AND end_event.foo = 100 AND end_event.bar = 400 AND block.height = 2"), + results: []int64{}, + }, + "deduplication test - match.events only at beginning": { + q: query.MustParse("end_event.foo = 100 AND end_event.bar = 400 AND block.height = 2 AND match.events = 1"), + results: []int64{2}, + }, + "deduplication test - match.events multiple": { + q: query.MustParse("match.events = 1 AND end_event.foo = 100 AND end_event.bar = 400 AND block.height = 2 AND match.events = 1"), + results: []int64{}, + }, + "query matches fields from multiple events allowed": { + q: query.MustParse("end_event.foo = 100 AND end_event.bar = 400"), + results: []int64{1, 2}, + }, + "query matches fields from all events whose attribute is within range": { + q: query.MustParse("match.events = 1 AND end_event.foo < 300 AND block.height = 2"), + results: []int64{1, 2}, + }, + } + + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + results, err := indexer.Search(context.Background(), tc.q) + require.NoError(t, err) + require.Equal(t, tc.results, results) + }) + } +} diff --git a/state/indexer/block/kv/util.go b/state/indexer/block/kv/util.go index 05d6fc45c..b0a5be3aa 100644 --- a/state/indexer/block/kv/util.go +++ b/state/indexer/block/kv/util.go @@ -40,13 +40,14 @@ func heightKey(height int64) ([]byte, error) { ) } -func eventKey(compositeKey, typ, eventValue string, height int64) ([]byte, error) { +func eventKey(compositeKey, typ, eventValue string, height int64, eventSeq int64) ([]byte, error) { return orderedcode.Append( nil, compositeKey, eventValue, height, typ, + eventSeq, ) } @@ -74,24 +75,97 @@ func parseValueFromEventKey(key []byte) (string, error) { height int64 ) - remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ) + _, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ) if err != nil { return "", fmt.Errorf("failed to parse event key: %w", err) } - if len(remaining) != 0 { - return "", fmt.Errorf("unexpected remainder in key: %s", remaining) - } - return eventValue, nil } -func lookForHeight(conditions []query.Condition) (int64, bool) { - for _, c := range conditions { - if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual { - return c.Operand.(int64), true +func parseHeightFromEventKey(key []byte) (int64, error) { + var ( + compositeKey, typ, eventValue string + height int64 + ) + + _, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ) + if err != nil { + return -1, fmt.Errorf("failed to parse event key: %w", err) + } + + return height, nil +} + +func parseEventSeqFromEventKey(key []byte) (int64, error) { + var ( + compositeKey, typ, eventValue string + height int64 + eventSeq int64 + ) + + remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ) + if err != nil { + return 0, fmt.Errorf("failed to parse event key: %w", err) + } + + // This is done to support previous versions that did not have event sequence in their key + if len(remaining) != 0 { + remaining, err = orderedcode.Parse(remaining, &eventSeq) + if err != nil { + return 0, fmt.Errorf("failed to parse event key: %w", err) + } + if len(remaining) != 0 { + return 0, fmt.Errorf("unexpected remainder in key: %s", remaining) } } - return 0, false + return eventSeq, nil +} + +func lookForHeight(conditions []query.Condition) (int64, bool, int) { + for i, c := range conditions { + if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual { + return c.Operand.(int64), true, i + } + } + + return 0, false, -1 +} + +func dedupHeight(conditions []query.Condition) (dedupConditions []query.Condition, height int64, found bool, idx int) { + idx = -1 + for i, c := range conditions { + if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual { + if found { + continue + } else { + dedupConditions = append(dedupConditions, c) + height = c.Operand.(int64) + found = true + idx = i + } + } else { + dedupConditions = append(dedupConditions, c) + } + } + return +} + +func dedupMatchEvents(conditions []query.Condition) ([]query.Condition, bool) { + var dedupConditions []query.Condition + matchEvents := false + for i, c := range conditions { + if c.CompositeKey == types.MatchEventKey { + // Match events should be added only via RPC as the very first query condition + if i == 0 { + dedupConditions = append(dedupConditions, c) + matchEvents = true + } + } else { + dedupConditions = append(dedupConditions, c) + } + + } + return dedupConditions, matchEvents } diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 28a26664f..5059d750c 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -27,6 +27,8 @@ var _ txindex.TxIndexer = (*TxIndex)(nil) // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). type TxIndex struct { store dbm.DB + // Number the events in the event list + eventSeq int64 } // NewTxIndex creates new KV indexer. @@ -134,6 +136,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { for _, event := range result.Result.Events { + txi.eventSeq = txi.eventSeq + 1 // only index events with a non-empty type if len(event.Type) == 0 { continue @@ -151,7 +154,7 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) } if attr.GetIndex() { - err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash) + err := store.Set(keyForEvent(compositeTag, attr.Value, result, txi.eventSeq), hash) if err != nil { return err } @@ -206,19 +209,46 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul } } + var matchEvents bool + var matchEventIdx int + + // If the match.events keyword is at the beginning of the query, we will only + // return heights where the conditions are true within the same event + // and set the matchEvents to true + conditions, matchEvents = dedupMatchEvents(conditions) + + if matchEvents { + matchEventIdx = 0 + } else { + matchEventIdx = -1 + } + // conditions to skip because they're handled before "everything else" skipIndexes := make([]int, 0) + if matchEventIdx != -1 { + skipIndexes = append(skipIndexes, matchEventIdx) + } // extract ranges // if both upper and lower bounds exist, it's better to get them in order not // no iterate over kvs that are not within range. ranges, rangeIndexes := indexer.LookForRanges(conditions) + var heightRanges indexer.QueryRange if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) for _, qr := range ranges { + + //If we have a query range over height and want to still look for + // specific event values we do not want to simply return all + // transactios in this height range. We remember the height range info + // and pass it on to match() to take into account when processing events. + if qr.Key == types.TxHeightKey && matchEvents { + heightRanges = qr + continue + } if !hashesInitialized { - filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true) + filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true, matchEvents) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -227,13 +257,24 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul break } } else { - filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false) + filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false, matchEvents) } } } // if there is a height condition ("tx.height=3"), extract it - height := lookForHeight(conditions) + var height int64 + var heightIdx int + if matchEvents { + // If we are not matching events and tx.height = 3 occurs more than once, the later value will + // overwrite the first one. For match.events it will create problems. + conditions, height, heightIdx = dedupHeight(conditions) + } else { + height, heightIdx = lookForHeight(conditions) + } + if matchEvents && (len(conditions) != 2) { + skipIndexes = append(skipIndexes, heightIdx) + } // for all other conditions for i, c := range conditions { @@ -242,7 +283,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul } if !hashesInitialized { - filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true) + filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true, matchEvents, height, heightRanges) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -251,7 +292,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul break } } else { - filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false) + filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false, matchEvents, height, heightRanges) } } @@ -286,13 +327,21 @@ func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) } // lookForHeight returns a height if there is an "height=X" condition. -func lookForHeight(conditions []query.Condition) (height int64) { - for _, c := range conditions { +func lookForHeight(conditions []query.Condition) (height int64, heightIdx int) { + for i, c := range conditions { if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual { - return c.Operand.(int64) + return c.Operand.(int64), i } } - return 0 + return 0, -1 +} +func (txi *TxIndex) setTmpHashes(tmpHeights map[string][]byte, it dbm.Iterator, matchEvents bool) { + if matchEvents { + eventSeq := extractEventSeqFromKey(it.Key()) + tmpHeights[string(it.Value())+eventSeq] = it.Value() + } else { + tmpHeights[string(it.Value())] = it.Value() + } } // match returns all matching txs by hash that meet a given condition and start @@ -306,6 +355,9 @@ func (txi *TxIndex) match( startKeyBz []byte, filteredHashes map[string][]byte, firstRun bool, + matchEvents bool, + height int64, + heightRanges indexer.QueryRange, ) map[string][]byte { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). @@ -325,8 +377,24 @@ func (txi *TxIndex) match( EQ_LOOP: for ; it.Valid(); it.Next() { - tmpHashes[string(it.Value())] = it.Value() + // If we have a height range in a query, we need only transactions + // for this height + if heightRanges.Key != "" { + eventHeight, err := extractHeightFromKey(it.Key()) + if err != nil || !checkBounds(heightRanges, eventHeight) { + continue + } + } else if height != 0 { + // If we have a particular height in the query, return only transactions + // matching this height. + eventHeight, err := extractHeightFromKey(it.Key()) + if eventHeight != height || err != nil { + continue + } + } + + txi.setTmpHashes(tmpHashes, it, matchEvents) // Potentially exit early. select { case <-ctx.Done(): @@ -349,7 +417,7 @@ func (txi *TxIndex) match( EXISTS_LOOP: for ; it.Valid(); it.Next() { - tmpHashes[string(it.Value())] = it.Value() + txi.setTmpHashes(tmpHashes, it, matchEvents) // Potentially exit early. select { @@ -379,7 +447,7 @@ func (txi *TxIndex) match( } if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { - tmpHashes[string(it.Value())] = it.Value() + txi.setTmpHashes(tmpHashes, it, matchEvents) } // Potentially exit early. @@ -410,8 +478,9 @@ func (txi *TxIndex) match( // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). REMOVE_LOOP: - for k := range filteredHashes { - if tmpHashes[k] == nil { + for k, v := range filteredHashes { + tmpHash := tmpHashes[k] + if tmpHash == nil || !bytes.Equal(tmpHash, v) { delete(filteredHashes, k) // Potentially exit early. @@ -437,6 +506,7 @@ func (txi *TxIndex) matchRange( startKey []byte, filteredHashes map[string][]byte, firstRun bool, + matchEvents bool, ) map[string][]byte { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). @@ -445,8 +515,6 @@ func (txi *TxIndex) matchRange( } tmpHashes := make(map[string][]byte) - lowerBound := qr.LowerBoundValue() - upperBound := qr.UpperBoundValue() it, err := dbm.IteratePrefix(txi.store, startKey) if err != nil { @@ -466,17 +534,8 @@ LOOP: continue LOOP } - include := true - if lowerBound != nil && v < lowerBound.(int64) { - include = false - } - - if upperBound != nil && v > upperBound.(int64) { - include = false - } - - if include { - tmpHashes[string(it.Value())] = it.Value() + if checkBounds(qr, v) { + txi.setTmpHashes(tmpHashes, it, matchEvents) } // XXX: passing time in a ABCI Events is not yet implemented @@ -512,8 +571,9 @@ LOOP: // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). REMOVE_LOOP: - for k := range filteredHashes { - if tmpHashes[k] == nil { + for k, v := range filteredHashes { + tmpHash := tmpHashes[k] + if tmpHash == nil || !bytes.Equal(tmpHashes[k], v) { delete(filteredHashes, k) // Potentially exit early. @@ -531,29 +591,49 @@ REMOVE_LOOP: // Keys func isTagKey(key []byte) bool { - return strings.Count(string(key), tagKeySeparator) == 3 + // This should be always 4 if data is indexed together with event sequences + // The check for 3 was added to allow data indexed before (w/o the event number) + // to be retrieved. + numTags := strings.Count(string(key), tagKeySeparator) + return numTags == 4 || numTags == 3 } +func extractHeightFromKey(key []byte) (int64, error) { + parts := strings.SplitN(string(key), tagKeySeparator, -1) + return strconv.ParseInt(parts[2], 10, 64) +} func extractValueFromKey(key []byte) string { - parts := strings.SplitN(string(key), tagKeySeparator, 3) + parts := strings.SplitN(string(key), tagKeySeparator, -1) return parts[1] } -func keyForEvent(key string, value string, result *abci.TxResult) []byte { - return []byte(fmt.Sprintf("%s/%s/%d/%d", +func extractEventSeqFromKey(key []byte) string { + parts := strings.SplitN(string(key), tagKeySeparator, -1) + + if len(parts) == 5 { + return parts[4] + } + return "0" +} +func keyForEvent(key string, value string, result *abci.TxResult, eventSeq int64) []byte { + return []byte(fmt.Sprintf("%s/%s/%d/%d/%d", key, value, result.Height, result.Index, + eventSeq, )) } func keyForHeight(result *abci.TxResult) []byte { - return []byte(fmt.Sprintf("%s/%d/%d/%d", + return []byte(fmt.Sprintf("%s/%d/%d/%d/%d", types.TxHeightKey, result.Height, result.Height, result.Index, + // Added to facilitate having the eventSeq in event keys + // Otherwise queries break expecting 5 entries + 0, )) } @@ -571,3 +651,18 @@ func startKey(fields ...interface{}) []byte { } return b.Bytes() } + +func checkBounds(ranges indexer.QueryRange, v int64) bool { + include := true + lowerBound := ranges.LowerBoundValue() + upperBound := ranges.UpperBoundValue() + if lowerBound != nil && v < lowerBound.(int64) { + include = false + } + + if upperBound != nil && v > upperBound.(int64) { + include = false + } + + return include +} diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 5ccae0bfd..4b39a4727 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -139,6 +139,78 @@ func TestTxSearch(t *testing.T) { } } +func TestTxSearchEventMatch(t *testing.T) { + + indexer := NewTxIndex(db.NewMemDB()) + + txResult := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}, {Key: []byte("owner"), Value: []byte("Ana"), Index: true}}}, + {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("2"), Index: true}, {Key: []byte("owner"), Value: []byte("Ivan"), Index: true}}}, + {Type: "", Attributes: []abci.EventAttribute{{Key: []byte("not_allowed"), Value: []byte("Vlad"), Index: true}}}, + }) + + err := indexer.Index(txResult) + require.NoError(t, err) + + testCases := map[string]struct { + q string + resultsLength int + }{ + "Return all events from a height": { + q: "match.events = 1 AND tx.height = 1", + resultsLength: 1, + }, + "Return all events from a height (deduplicate height)": { + q: "match.events = 1 AND tx.height = 1 AND tx.height = 1", + resultsLength: 1, + }, + "Match attributes with height range and event": { + q: "match.events = 1 AND tx.height < 2 AND tx.height > 0 AND account.number = 1 AND account.owner CONTAINS 'Ana'", + resultsLength: 1, + }, + "Match attributes with height range and event - no match": { + q: "match.events = 1 AND tx.height < 2 AND tx.height > 0 AND account.number = 2 AND account.owner = 'Ana'", + resultsLength: 0, + }, + "Deduplucation test - match events only at the beginning": { + q: "tx.height < 2 AND tx.height > 0 AND account.number = 2 AND account.owner = 'Ana' AND match.events = 1", + resultsLength: 1, + }, + "Deduplucation test - match events multiple": { + q: "match.events = 1 AND tx.height < 2 AND tx.height > 0 AND account.number = 2 AND account.owner = 'Ana' AND match.events = 1", + resultsLength: 0, + }, + "Match attributes with event": { + q: "account.number = 2 AND account.owner = 'Ana' AND tx.height = 1", + resultsLength: 1, + }, + "Match range w/o match events": { + q: "account.number < 2 AND account.owner = 'Ivan'", + resultsLength: 1, + }, + " Match range with match events": { + q: "match.events = 1 AND account.number < 2 AND account.owner = 'Ivan'", + resultsLength: 0, + }, + } + + ctx := context.Background() + + for _, tc := range testCases { + tc := tc + t.Run(tc.q, func(t *testing.T) { + results, err := indexer.Search(ctx, query.MustParse(tc.q)) + assert.NoError(t, err) + + assert.Len(t, results, tc.resultsLength) + if tc.resultsLength > 0 { + for _, txr := range results { + assert.True(t, proto.Equal(txResult, txr)) + } + } + }) + } +} func TestTxSearchWithCancelation(t *testing.T) { indexer := NewTxIndex(db.NewMemDB()) diff --git a/state/txindex/kv/utils.go b/state/txindex/kv/utils.go index 48362bfbc..8b3edade5 100644 --- a/state/txindex/kv/utils.go +++ b/state/txindex/kv/utils.go @@ -1,5 +1,13 @@ package kv +import ( + "fmt" + + "github.com/google/orderedcode" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/types" +) + // IntInSlice returns true if a is found in the list. func intInSlice(a int, list []int) bool { for _, b := range list { @@ -9,3 +17,60 @@ func intInSlice(a int, list []int) bool { } return false } + +func dedupMatchEvents(conditions []query.Condition) ([]query.Condition, bool) { + var dedupConditions []query.Condition + matchEvents := false + for i, c := range conditions { + if c.CompositeKey == types.MatchEventKey { + // Match events should be added only via RPC as the very first query condition + if i == 0 { + dedupConditions = append(dedupConditions, c) + matchEvents = true + } + } else { + dedupConditions = append(dedupConditions, c) + } + + } + return dedupConditions, matchEvents +} + +func ParseEventSeqFromEventKey(key []byte) (int64, error) { + var ( + compositeKey, typ, eventValue string + height int64 + eventSeq int64 + ) + + remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ, &eventSeq) + if err != nil { + return 0, fmt.Errorf("failed to parse event key: %w", err) + } + + if len(remaining) != 0 { + return 0, fmt.Errorf("unexpected remainder in key: %s", remaining) + } + + return eventSeq, nil +} +func dedupHeight(conditions []query.Condition) (dedupConditions []query.Condition, height int64, idx int) { + found := false + idx = -1 + height = 0 + for i, c := range conditions { + if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual { + if found { + continue + } else { + dedupConditions = append(dedupConditions, c) + height = c.Operand.(int64) + found = true + idx = i + } + } else { + dedupConditions = append(dedupConditions, c) + } + } + return +} diff --git a/types/events.go b/types/events.go index b71661a05..5583d477f 100644 --- a/types/events.go +++ b/types/events.go @@ -140,6 +140,11 @@ const ( // BlockHeightKey is a reserved key used for indexing BeginBlock and Endblock // events. BlockHeightKey = "block.height" + + // MatchEventsKey is a reserved key used to indicate to the indexer that the + // conditions in the query have to have occurred both on the same height + // as well as in the same event + MatchEventKey = "match.events" ) var (