rpc: index block events to support block event queries (bp #6226) (#6261)

This commit is contained in:
mergify[bot]
2021-03-22 15:01:25 -04:00
committed by GitHub
parent b2f01448be
commit b00cac9368
30 changed files with 1384 additions and 274 deletions

View File

@@ -468,24 +468,55 @@ func (c *baseRPCClient) TxSearch(
page,
perPage *int,
orderBy string,
) (
*ctypes.ResultTxSearch, error) {
) (*ctypes.ResultTxSearch, error) {
result := new(ctypes.ResultTxSearch)
params := map[string]interface{}{
"query": query,
"prove": prove,
"order_by": orderBy,
}
if page != nil {
params["page"] = page
}
if perPage != nil {
params["per_page"] = perPage
}
_, err := c.caller.Call(ctx, "tx_search", params, result)
if err != nil {
return nil, err
}
return result, nil
}
func (c *baseRPCClient) BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
result := new(ctypes.ResultBlockSearch)
params := map[string]interface{}{
"query": query,
"order_by": orderBy,
}
if page != nil {
params["page"] = page
}
if perPage != nil {
params["per_page"] = perPage
}
_, err := c.caller.Call(ctx, "block_search", params, result)
if err != nil {
return nil, err
}
return result, nil
}

View File

@@ -70,8 +70,25 @@ type SignClient interface {
Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error)
Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error)
Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error)
TxSearch(ctx context.Context, query string, prove bool, page, perPage *int,
orderBy string) (*ctypes.ResultTxSearch, error)
// TxSearch defines a method to search for a paginated set of transactions by
// DeliverTx event search criteria.
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*ctypes.ResultTxSearch, error)
// BlockSearch defines a method to search for a paginated set of blocks by
// BeginBlock and EndBlock event search criteria.
BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error)
}
// HistoryClient provides access to data from genesis to now in large chunks.

View File

@@ -178,7 +178,7 @@ func (c *Local) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.Result
}
func (c *Local) TxSearch(
ctx context.Context,
_ context.Context,
query string,
prove bool,
page,
@@ -188,6 +188,15 @@ func (c *Local) TxSearch(
return core.TxSearch(c.ctx, query, prove, page, perPage, orderBy)
}
func (c *Local) BlockSearch(
_ context.Context,
query string,
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
return core.BlockSearch(c.ctx, query, page, perPage, orderBy)
}
func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
return core.BroadcastEvidence(c.ctx, ev)
}

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.3.0. DO NOT EDIT.
// Code generated by mockery v2.6.0. DO NOT EDIT.
package mocks
@@ -160,6 +160,29 @@ func (_m *Client) BlockResults(ctx context.Context, height *int64) (*coretypes.R
return r0, r1
}
// BlockSearch provides a mock function with given fields: ctx, query, page, perPage, orderBy
func (_m *Client) BlockSearch(ctx context.Context, query string, page *int, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) {
ret := _m.Called(ctx, query, page, perPage, orderBy)
var r0 *coretypes.ResultBlockSearch
if rf, ok := ret.Get(0).(func(context.Context, string, *int, *int, string) *coretypes.ResultBlockSearch); ok {
r0 = rf(ctx, query, page, perPage, orderBy)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*coretypes.ResultBlockSearch)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, *int, *int, string) error); ok {
r1 = rf(ctx, query, page, perPage, orderBy)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// BlockchainInfo provides a mock function with given fields: ctx, minHeight, maxHeight
func (_m *Client) BlockchainInfo(ctx context.Context, minHeight int64, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) {
ret := _m.Called(ctx, minHeight, maxHeight)

View File

@@ -1,11 +1,15 @@
package core
import (
"errors"
"fmt"
"sort"
tmmath "github.com/tendermint/tendermint/libs/math"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/types"
)
@@ -154,3 +158,68 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
ConsensusParamUpdates: results.EndBlock.ConsensusParamUpdates,
}, nil
}
// BlockSearch searches for a paginated set of blocks matching BeginBlock and
// EndBlock event search criteria.
func BlockSearch(
ctx *rpctypes.Context,
query string,
pagePtr, perPagePtr *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
// skip if block indexing is disabled
if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
return nil, errors.New("block indexing is disabled")
}
q, err := tmquery.New(query)
if err != nil {
return nil, err
}
results, err := env.BlockIndexer.Search(ctx.Context(), q)
if err != nil {
return nil, err
}
// sort results (must be done before pagination)
switch orderBy {
case "desc", "":
sort.Slice(results, func(i, j int) bool { return results[i] > results[j] })
case "asc":
sort.Slice(results, func(i, j int) bool { return results[i] < results[j] })
default:
return nil, errors.New("expected order_by to be either `asc` or `desc` or empty")
}
// paginate results
totalCount := len(results)
perPage := validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
}
skipCount := validateSkipCount(page, perPage)
pageSize := tmmath.MinInt(perPage, totalCount-skipCount)
apiResults := make([]*ctypes.ResultBlock, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
block := env.BlockStore.LoadBlock(results[i])
if block != nil {
blockMeta := env.BlockStore.LoadBlockMeta(block.Height)
if blockMeta != nil {
apiResults = append(apiResults, &ctypes.ResultBlock{
Block: block,
BlockID: blockMeta.BlockID,
})
}
}
}
return &ctypes.ResultBlockSearch{Blocks: apiResults, TotalCount: totalCount}, nil
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
@@ -82,6 +83,7 @@ type Environment struct {
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
ConsensusReactor *consensus.Reactor
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool

View File

@@ -26,6 +26,7 @@ var Routes = map[string]*rpc.RPCFunc{
"check_tx": rpc.NewRPCFunc(CheckTx, "tx"),
"tx": rpc.NewRPCFunc(Tx, "hash,prove"),
"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"),
"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"),
"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
"consensus_state": rpc.NewRPCFunc(ConsensusState, ""),

View File

@@ -54,8 +54,14 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error
// TxSearch allows you to query for multiple transactions results. It returns a
// list of transactions (maximum ?per_page entries) and the total count.
// More: https://docs.tendermint.com/master/rpc/#/Info/tx_search
func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (
*ctypes.ResultTxSearch, error) {
func TxSearch(
ctx *rpctypes.Context,
query string,
prove bool,
pagePtr, perPagePtr *int,
orderBy string,
) (*ctypes.ResultTxSearch, error) {
// if index is disabled, return error
if _, ok := env.TxIndexer.(*null.TxIndex); ok {
return nil, errors.New("transaction indexing is disabled")
@@ -94,10 +100,12 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPageP
// paginate results
totalCount := len(results)
perPage := validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
}
skipCount := validateSkipCount(page, perPage)
pageSize := tmmath.MinInt(perPage, totalCount-skipCount)

View File

@@ -195,6 +195,12 @@ type ResultTxSearch struct {
TotalCount int `json:"total_count"`
}
// ResultBlockSearch defines the RPC response type for a block search by events.
type ResultBlockSearch struct {
Blocks []*ResultBlock `json:"blocks"`
TotalCount int `json:"total_count"`
}
// List of mempool txs
type ResultUnconfirmedTxs struct {
Count int `json:"n_txs"`

View File

@@ -1016,6 +1016,62 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
/block_search:
get:
summary: Search for blocks by BeginBlock and EndBlock events
description: |
Search for blocks by BeginBlock and EndBlock events.
See /subscribe for the query syntax.
operationId: block_search
parameters:
- in: query
name: query
description: Query
required: true
schema:
type: string
example: "block.height > 1000 AND valset.changed > 0"
- in: query
name: page
description: "Page number (1-based)"
required: false
schema:
type: integer
default: 1
example: 1
- in: query
name: per_page
description: "Number of entries per page (max: 100)"
required: false
schema:
type: integer
default: 30
example: 30
- in: query
name: order_by
description: Order in which blocks are sorted ("asc" or "desc"), by height. If empty, default sorting will be still applied.
required: false
schema:
type: string
default: "desc"
example: "asc"
tags:
- Info
responses:
"200":
description: List of paginated blocks matching the search criteria.
content:
application/json:
schema:
$ref: "#/components/schemas/BlockSearchResponse"
"500":
description: Error
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
/tx:
get:
summary: Get transactions by hash