rpc: index block events to support block event queries (#6226)

This commit is contained in:
Aleksandr Bezobchuk
2021-03-22 11:47:35 -04:00
committed by GitHub
parent 99c73dd3c1
commit 003f394512
27 changed files with 1353 additions and 188 deletions

22
state/indexer/block.go Normal file
View File

@@ -0,0 +1,22 @@
package indexer
import (
"context"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
// BlockIndexer defines an interface contract for indexing block events.
type BlockIndexer interface {
// Has returns true if the given height has been indexed. An error is returned
// upon database query failure.
Has(height int64) (bool, error)
// Index indexes BeginBlock and EndBlock events for a given block by its height.
Index(types.EventDataNewBlockHeader) error
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)
}

View File

@@ -0,0 +1,489 @@
package kv
import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"github.com/google/orderedcode"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
// BlockerIndexer implements a block indexer, indexing BeginBlock and EndBlock
// events with an underlying KV store. Block events are indexed by their height,
// such that matching search criteria returns the respective block height(s).
type BlockerIndexer struct {
store dbm.DB
}
func New(store dbm.DB) *BlockerIndexer {
return &BlockerIndexer{
store: store,
}
}
// Has returns true if the given height has been indexed. An error is returned
// upon database query failure.
func (idx *BlockerIndexer) Has(height int64) (bool, error) {
key, err := heightKey(height)
if err != nil {
return false, fmt.Errorf("failed to create block height index key: %w", err)
}
return idx.store.Has(key)
}
// Index indexes BeginBlock and EndBlock events for a given block by its height.
// The following is indexed:
//
// primary key: encode(block.height | height) => encode(height)
// BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height)
// EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height)
func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error {
batch := idx.store.NewBatch()
defer batch.Close()
height := bh.Header.Height
// 1. index by height
key, err := heightKey(height)
if err != nil {
return fmt.Errorf("failed to create block height index key: %w", err)
}
if err := batch.Set(key, int64ToBytes(height)); err != nil {
return err
}
// 2. index BeginBlock events
if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
return fmt.Errorf("failed to index BeginBlock events: %w", err)
}
// 3. index EndBlock events
if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
return fmt.Errorf("failed to index EndBlock events: %w", err)
}
return batch.WriteSync()
}
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria. The given query can match against zero,
// one or more block heights. In the case of height queries, i.e. block.height=H,
// if the height is indexed, that height alone will be returned. An error and
// nil slice is returned. Otherwise, a non-nil slice and nil error is returned.
func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) {
results := make([]int64, 0)
select {
case <-ctx.Done():
return results, nil
default:
}
conditions, err := q.Conditions()
if err != nil {
return nil, fmt.Errorf("failed to parse query conditions: %w", err)
}
// If there is an exact height query, return the result immediately
// (if it exists).
height, ok := lookForHeight(conditions)
if ok {
ok, err := idx.Has(height)
if err != nil {
return nil, err
}
if ok {
return []int64{height}, nil
}
return results, nil
}
var heightsInitialized bool
filteredHeights := make(map[string][]byte)
// conditions to skip because they're handled before "everything else"
skipIndexes := make([]int, 0)
// Extract ranges. If both upper and lower bounds exist, it's better to get
// them in order as to not iterate over kvs that are not within range.
ranges, rangeIndexes := indexer.LookForRanges(conditions)
if len(ranges) > 0 {
skipIndexes = append(skipIndexes, rangeIndexes...)
for _, qr := range ranges {
prefix, err := orderedcode.Append(nil, qr.Key)
if err != nil {
return nil, fmt.Errorf("failed to create prefix key: %w", err)
}
if !heightsInitialized {
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true)
if err != nil {
return nil, err
}
heightsInitialized = true
// Ignore any remaining conditions if the first condition resulted in no
// matches (assuming implicit AND operand).
if len(filteredHeights) == 0 {
break
}
} else {
filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false)
if err != nil {
return nil, err
}
}
}
}
// for all other conditions
for i, c := range conditions {
if intInSlice(i, skipIndexes) {
continue
}
startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand))
if err != nil {
return nil, err
}
if !heightsInitialized {
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true)
if err != nil {
return nil, err
}
heightsInitialized = true
// Ignore any remaining conditions if the first condition resulted in no
// matches (assuming implicit AND operand).
if len(filteredHeights) == 0 {
break
}
} else {
filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false)
if err != nil {
return nil, err
}
}
}
// fetch matching heights
results = make([]int64, 0, len(filteredHeights))
for _, hBz := range filteredHeights {
h := int64FromBytes(hBz)
ok, err := idx.Has(h)
if err != nil {
return nil, err
}
if ok {
results = append(results, h)
}
select {
case <-ctx.Done():
break
default:
}
}
sort.Slice(results, func(i, j int) bool { return results[i] < results[j] })
return results, nil
}
// matchRange returns all matching block heights that match a given QueryRange
// and start key. An already filtered result (filteredHeights) is provided such
// that any non-intersecting matches are removed.
//
// NOTE: The provided filteredHeights may be empty if no previous condition has
// matched.
func (idx *BlockerIndexer) matchRange(
ctx context.Context,
qr indexer.QueryRange,
startKey []byte,
filteredHeights map[string][]byte,
firstRun bool,
) (map[string][]byte, error) {
// A previous match was attempted but resulted in no matches, so we return
// no matches (assuming AND operand).
if !firstRun && len(filteredHeights) == 0 {
return filteredHeights, nil
}
tmpHeights := make(map[string][]byte)
lowerBound := qr.LowerBoundValue()
upperBound := qr.UpperBoundValue()
it, err := dbm.IteratePrefix(idx.store, startKey)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
LOOP:
for ; it.Valid(); it.Next() {
var (
eventValue string
err error
)
if qr.Key == types.BlockHeightKey {
eventValue, err = parseValueFromPrimaryKey(it.Key())
} else {
eventValue, err = parseValueFromEventKey(it.Key())
}
if err != nil {
continue
}
if _, ok := qr.AnyBound().(int64); ok {
v, err := strconv.ParseInt(eventValue, 10, 64)
if err != nil {
continue LOOP
}
include := true
if lowerBound != nil && v < lowerBound.(int64) {
include = false
}
if upperBound != nil && v > upperBound.(int64) {
include = false
}
if include {
tmpHeights[string(it.Value())] = it.Value()
}
}
select {
case <-ctx.Done():
break
default:
}
}
if err := it.Error(); err != nil {
return nil, err
}
if len(tmpHeights) == 0 || firstRun {
// Either:
//
// 1. Regardless if a previous match was attempted, which may have had
// results, but no match was found for the current condition, then we
// return no matches (assuming AND operand).
//
// 2. A previous match was not attempted, so we return all results.
return tmpHeights, nil
}
// Remove/reduce matches in filteredHashes that were not found in this
// match (tmpHashes).
for k := range filteredHeights {
if tmpHeights[k] == nil {
delete(filteredHeights, k)
select {
case <-ctx.Done():
break
default:
}
}
}
return filteredHeights, nil
}
// match returns all matching heights that meet a given query condition and start
// key. An already filtered result (filteredHeights) is provided such that any
// non-intersecting matches are removed.
//
// NOTE: The provided filteredHeights may be empty if no previous condition has
// matched.
func (idx *BlockerIndexer) match(
ctx context.Context,
c query.Condition,
startKeyBz []byte,
filteredHeights map[string][]byte,
firstRun bool,
) (map[string][]byte, error) {
// A previous match was attempted but resulted in no matches, so we return
// no matches (assuming AND operand).
if !firstRun && len(filteredHeights) == 0 {
return filteredHeights, nil
}
tmpHeights := make(map[string][]byte)
switch {
case c.Op == query.OpEqual:
it, err := dbm.IteratePrefix(idx.store, startKeyBz)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
for ; it.Valid(); it.Next() {
tmpHeights[string(it.Value())] = it.Value()
if err := ctx.Err(); err != nil {
break
}
}
if err := it.Error(); err != nil {
return nil, err
}
case c.Op == query.OpExists:
prefix, err := orderedcode.Append(nil, c.CompositeKey)
if err != nil {
return nil, err
}
it, err := dbm.IteratePrefix(idx.store, prefix)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
for ; it.Valid(); it.Next() {
tmpHeights[string(it.Value())] = it.Value()
select {
case <-ctx.Done():
break
default:
}
}
if err := it.Error(); err != nil {
return nil, err
}
case c.Op == query.OpContains:
prefix, err := orderedcode.Append(nil, c.CompositeKey)
if err != nil {
return nil, err
}
it, err := dbm.IteratePrefix(idx.store, prefix)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
for ; it.Valid(); it.Next() {
eventValue, err := parseValueFromEventKey(it.Key())
if err != nil {
continue
}
if strings.Contains(eventValue, c.Operand.(string)) {
tmpHeights[string(it.Value())] = it.Value()
}
select {
case <-ctx.Done():
break
default:
}
}
if err := it.Error(); err != nil {
return nil, err
}
default:
return nil, errors.New("other operators should be handled already")
}
if len(tmpHeights) == 0 || firstRun {
// Either:
//
// 1. Regardless if a previous match was attempted, which may have had
// results, but no match was found for the current condition, then we
// return no matches (assuming AND operand).
//
// 2. A previous match was not attempted, so we return all results.
return tmpHeights, nil
}
// Remove/reduce matches in filteredHeights that were not found in this
// match (tmpHeights).
for k := range filteredHeights {
if tmpHeights[k] == nil {
delete(filteredHeights, k)
select {
case <-ctx.Done():
break
default:
}
}
}
return filteredHeights, nil
}
func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error {
heightBz := int64ToBytes(height)
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == types.TxHashKey || compositeKey == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}
if attr.GetIndex() {
key, err := eventKey(compositeKey, typ, string(attr.Value), height)
if err != nil {
return fmt.Errorf("failed to create block index key: %w", err)
}
if err := batch.Set(key, heightBz); err != nil {
return err
}
}
}
}
return nil
}

View File

@@ -0,0 +1,141 @@
package kv_test
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
"github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"
)
func TestBlockIndexer(t *testing.T) {
store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events"))
indexer := blockidxkv.New(store)
require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
ResultBeginBlock: abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "begin_event",
Attributes: []abci.EventAttribute{
{
Key: []byte("proposer"),
Value: []byte("FCAA001"),
Index: true,
},
},
},
},
},
ResultEndBlock: abci.ResponseEndBlock{
Events: []abci.Event{
{
Type: "end_event",
Attributes: []abci.EventAttribute{
{
Key: []byte("foo"),
Value: []byte("100"),
Index: true,
},
},
},
},
},
}))
for i := 2; i < 12; i++ {
var index bool
if i%2 == 0 {
index = true
}
require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{
Header: types.Header{Height: int64(i)},
ResultBeginBlock: abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "begin_event",
Attributes: []abci.EventAttribute{
{
Key: []byte("proposer"),
Value: []byte("FCAA001"),
Index: true,
},
},
},
},
},
ResultEndBlock: abci.ResponseEndBlock{
Events: []abci.Event{
{
Type: "end_event",
Attributes: []abci.EventAttribute{
{
Key: []byte("foo"),
Value: []byte(fmt.Sprintf("%d", i)),
Index: index,
},
},
},
},
},
}))
}
testCases := map[string]struct {
q *query.Query
results []int64
}{
"block.height = 100": {
q: query.MustParse("block.height = 100"),
results: []int64{},
},
"block.height = 5": {
q: query.MustParse("block.height = 5"),
results: []int64{5},
},
"begin_event.key1 = 'value1'": {
q: query.MustParse("begin_event.key1 = 'value1'"),
results: []int64{},
},
"begin_event.proposer = 'FCAA001'": {
q: query.MustParse("begin_event.proposer = 'FCAA001'"),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
"end_event.foo <= 5": {
q: query.MustParse("end_event.foo <= 5"),
results: []int64{2, 4},
},
"end_event.foo >= 100": {
q: query.MustParse("end_event.foo >= 100"),
results: []int64{1},
},
"block.height > 2 AND end_event.foo <= 8": {
q: query.MustParse("block.height > 2 AND end_event.foo <= 8"),
results: []int64{4, 6, 8},
},
"begin_event.proposer CONTAINS 'FFFFFFF'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"),
results: []int64{},
},
"begin_event.proposer CONTAINS 'FCAA001'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
results, err := indexer.Search(context.Background(), tc.q)
require.NoError(t, err)
require.Equal(t, tc.results, results)
})
}
}

View File

@@ -0,0 +1,96 @@
package kv
import (
"encoding/binary"
"fmt"
"strconv"
"github.com/google/orderedcode"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
func intInSlice(a int, list []int) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
func int64FromBytes(bz []byte) int64 {
v, _ := binary.Varint(bz)
return v
}
func int64ToBytes(i int64) []byte {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(buf, i)
return buf[:n]
}
func heightKey(height int64) ([]byte, error) {
return orderedcode.Append(
nil,
types.BlockHeightKey,
height,
)
}
func eventKey(compositeKey, typ, eventValue string, height int64) ([]byte, error) {
return orderedcode.Append(
nil,
compositeKey,
eventValue,
height,
typ,
)
}
func parseValueFromPrimaryKey(key []byte) (string, error) {
var (
compositeKey string
height int64
)
remaining, err := orderedcode.Parse(string(key), &compositeKey, &height)
if err != nil {
return "", fmt.Errorf("failed to parse event key: %w", err)
}
if len(remaining) != 0 {
return "", fmt.Errorf("unexpected remainder in key: %s", remaining)
}
return strconv.FormatInt(height, 10), nil
}
func parseValueFromEventKey(key []byte) (string, error) {
var (
compositeKey, typ, eventValue string
height int64
)
remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ)
if err != nil {
return "", fmt.Errorf("failed to parse event key: %w", err)
}
if len(remaining) != 0 {
return "", fmt.Errorf("unexpected remainder in key: %s", remaining)
}
return eventValue, nil
}
func lookForHeight(conditions []query.Condition) (int64, bool) {
for _, c := range conditions {
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
return c.Operand.(int64), true
}
}
return 0, false
}

View File

@@ -0,0 +1,26 @@
package null
import (
"context"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
// TxIndex implements a no-op block indexer.
type BlockerIndexer struct{}
func (idx *BlockerIndexer) Has(height int64) (bool, error) {
return false, nil
}
func (idx *BlockerIndexer) Index(types.EventDataNewBlockHeader) error {
return nil
}
func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) {
return nil, nil
}

View File

@@ -0,0 +1,123 @@
package indexer
import (
"time"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
// QueryRanges defines a mapping between a composite event key and a QueryRange.
//
// e.g.account.number => queryRange{lowerBound: 1, upperBound: 5}
type QueryRanges map[string]QueryRange
// QueryRange defines a range within a query condition.
type QueryRange struct {
LowerBound interface{} // int || time.Time
UpperBound interface{} // int || time.Time
Key string
IncludeLowerBound bool
IncludeUpperBound bool
}
// AnyBound returns either the lower bound if non-nil, otherwise the upper bound.
func (qr QueryRange) AnyBound() interface{} {
if qr.LowerBound != nil {
return qr.LowerBound
}
return qr.UpperBound
}
// LowerBoundValue returns the value for the lower bound. If the lower bound is
// nil, nil will be returned.
func (qr QueryRange) LowerBoundValue() interface{} {
if qr.LowerBound == nil {
return nil
}
if qr.IncludeLowerBound {
return qr.LowerBound
}
switch t := qr.LowerBound.(type) {
case int64:
return t + 1
case time.Time:
return t.Unix() + 1
default:
panic("not implemented")
}
}
// UpperBoundValue returns the value for the upper bound. If the upper bound is
// nil, nil will be returned.
func (qr QueryRange) UpperBoundValue() interface{} {
if qr.UpperBound == nil {
return nil
}
if qr.IncludeUpperBound {
return qr.UpperBound
}
switch t := qr.UpperBound.(type) {
case int64:
return t - 1
case time.Time:
return t.Unix() - 1
default:
panic("not implemented")
}
}
// LookForRanges returns a mapping of QueryRanges and the matching indexes in
// the provided query conditions.
func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int) {
ranges = make(QueryRanges)
for i, c := range conditions {
if IsRangeOperation(c.Op) {
r, ok := ranges[c.CompositeKey]
if !ok {
r = QueryRange{Key: c.CompositeKey}
}
switch c.Op {
case query.OpGreater:
r.LowerBound = c.Operand
case query.OpGreaterEqual:
r.IncludeLowerBound = true
r.LowerBound = c.Operand
case query.OpLess:
r.UpperBound = c.Operand
case query.OpLessEqual:
r.IncludeUpperBound = true
r.UpperBound = c.Operand
}
ranges[c.CompositeKey] = r
indexes = append(indexes, i)
}
}
return ranges, indexes
}
// IsRangeOperation returns a boolean signifying if a query Operator is a range
// operation or not.
func IsRangeOperation(op query.Operator) bool {
switch op {
case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
return true
default:
return false
}
}

View File

@@ -8,9 +8,10 @@ import (
"github.com/tendermint/tendermint/libs/pubsub/query"
)
// XXX/TODO: These types should be moved to the indexer package.
// TxIndexer interface defines methods to index and search transactions.
type TxIndexer interface {
// AddBatch analyzes, indexes and stores a batch of transactions.
AddBatch(b *Batch) error
@@ -25,9 +26,6 @@ type TxIndexer interface {
Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error)
}
//----------------------------------------------------
// Txs are written as a batch
// Batch groups together multiple Index operations to be performed at the same time.
// NOTE: Batch is NOT thread-safe and must not be modified after starting its execution.
type Batch struct {
@@ -52,8 +50,5 @@ func (b *Batch) Size() int {
return len(b.Ops)
}
//----------------------------------------------------
// Errors
// ErrorEmptyHash indicates empty hash
var ErrorEmptyHash = errors.New("transaction hash cannot be empty")

View File

@@ -4,26 +4,34 @@ import (
"context"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
// XXX/TODO: These types should be moved to the indexer package.
const (
subscriber = "IndexerService"
)
// IndexerService connects event bus and transaction indexer together in order
// to index transactions coming from event bus.
// IndexerService connects event bus, transaction and block indexers together in
// order to index transactions and blocks coming from the event bus.
type IndexerService struct {
service.BaseService
idr TxIndexer
eventBus *types.EventBus
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
}
// NewIndexerService returns a new service instance.
func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService {
is := &IndexerService{idr: idr, eventBus: eventBus}
func NewIndexerService(
txIdxr TxIndexer,
blockIdxr indexer.BlockIndexer,
eventBus *types.EventBus,
) *IndexerService {
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
@@ -34,7 +42,6 @@ func (is *IndexerService) OnStart() error {
// Use SubscribeUnbuffered here to ensure both subscriptions does not get
// canceled due to not pulling messages fast enough. Cause this might
// sometimes happen when there are no other subscribers.
blockHeadersSub, err := is.eventBus.SubscribeUnbuffered(
context.Background(),
subscriber,
@@ -54,20 +61,31 @@ func (is *IndexerService) OnStart() error {
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult
if err = batch.Add(&txResult); err != nil {
is.Logger.Error("Can't add tx to batch",
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err)
"err", err,
)
}
}
if err = is.idr.AddBatch(batch); err != nil {
is.Logger.Error("Failed to index block", "height", height, "err", err)
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
} else {
is.Logger.Debug("Indexed block", "height", height)
is.Logger.Error("indexed block", "height", height, "err", err)
}
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else {
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
}
}
}()

View File

@@ -4,13 +4,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/types"
@@ -31,8 +30,9 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
// tx indexer
store := db.NewMemDB()
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
service := txindex.NewIndexerService(txIndexer, eventBus)
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
@@ -67,11 +67,15 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// check the result
res, err := txIndexer.Get(types.Tx("foo").Hash())
assert.NoError(t, err)
assert.Equal(t, txResult1, res)
require.NoError(t, err)
require.Equal(t, txResult1, res)
ok, err := blockIndexer.Has(1)
require.NoError(t, err)
require.True(t, ok)
res, err = txIndexer.Get(types.Tx("bar").Hash())
assert.NoError(t, err)
assert.Equal(t, txResult2, res)
require.NoError(t, err)
require.Equal(t, txResult2, res)
}

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/google/orderedcode"
@@ -14,6 +13,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
@@ -147,7 +147,7 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
// ensure event does not conflict with a reserved prefix key
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved. Please use a different key", compositeTag)
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
}
if attr.GetIndex() {
err := store.Set(keyFromEvent(compositeTag, attr.Value, result), hash)
@@ -173,11 +173,10 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba
// Search will exit early and return any result fetched so far,
// when a message is received on the context chan.
func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
// Potentially exit early.
select {
case <-ctx.Done():
results := make([]*abci.TxResult, 0)
return results, nil
return make([]*abci.TxResult, 0), nil
default:
}
@@ -212,13 +211,13 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
// extract ranges
// if both upper and lower bounds exist, it's better to get them in order not
// no iterate over kvs that are not within range.
ranges, rangeIndexes := lookForRanges(conditions)
ranges, rangeIndexes := indexer.LookForRanges(conditions)
if len(ranges) > 0 {
skipIndexes = append(skipIndexes, rangeIndexes...)
for _, r := range ranges {
for _, qr := range ranges {
if !hashesInitialized {
filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, true)
filteredHashes = txi.matchRange(ctx, qr, prefixFromCompositeKey(qr.Key), filteredHashes, true)
hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted
@@ -227,7 +226,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
break
}
} else {
filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, false)
filteredHashes = txi.matchRange(ctx, qr, prefixFromCompositeKey(qr.Key), filteredHashes, false)
}
}
}
@@ -294,100 +293,6 @@ func lookForHeight(conditions []query.Condition) (height int64) {
return 0
}
// special map to hold range conditions
// Example: account.number => queryRange{lowerBound: 1, upperBound: 5}
type queryRanges map[string]queryRange
type queryRange struct {
lowerBound interface{} // int || time.Time
upperBound interface{} // int || time.Time
key string
includeLowerBound bool
includeUpperBound bool
}
func (r queryRange) lowerBoundValue() interface{} {
if r.lowerBound == nil {
return nil
}
if r.includeLowerBound {
return r.lowerBound
}
switch t := r.lowerBound.(type) {
case int64:
return t + 1
case time.Time:
return t.Unix() + 1
default:
panic("not implemented")
}
}
func (r queryRange) AnyBound() interface{} {
if r.lowerBound != nil {
return r.lowerBound
}
return r.upperBound
}
func (r queryRange) upperBoundValue() interface{} {
if r.upperBound == nil {
return nil
}
if r.includeUpperBound {
return r.upperBound
}
switch t := r.upperBound.(type) {
case int64:
return t - 1
case time.Time:
return t.Unix() - 1
default:
panic("not implemented")
}
}
func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
ranges = make(queryRanges)
for i, c := range conditions {
if isRangeOperation(c.Op) {
r, ok := ranges[c.CompositeKey]
if !ok {
r = queryRange{key: c.CompositeKey}
}
switch c.Op {
case query.OpGreater:
r.lowerBound = c.Operand
case query.OpGreaterEqual:
r.includeLowerBound = true
r.lowerBound = c.Operand
case query.OpLess:
r.upperBound = c.Operand
case query.OpLessEqual:
r.includeUpperBound = true
r.upperBound = c.Operand
}
ranges[c.CompositeKey] = r
indexes = append(indexes, i)
}
}
return ranges, indexes
}
func isRangeOperation(op query.Operator) bool {
switch op {
case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
return true
default:
return false
}
}
// match returns all matching txs by hash that meet a given condition and start
// key. An already filtered result (filteredHashes) is provided such that any
// non-intersecting matches are removed.
@@ -522,7 +427,7 @@ func (txi *TxIndex) match(
// NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) matchRange(
ctx context.Context,
r queryRange,
qr indexer.QueryRange,
startKey []byte,
filteredHashes map[string][]byte,
firstRun bool,
@@ -534,8 +439,8 @@ func (txi *TxIndex) matchRange(
}
tmpHashes := make(map[string][]byte)
lowerBound := r.lowerBoundValue()
upperBound := r.upperBoundValue()
lowerBound := qr.LowerBoundValue()
upperBound := qr.UpperBoundValue()
it, err := dbm.IteratePrefix(txi.store, startKey)
if err != nil {
@@ -549,7 +454,7 @@ LOOP:
if err != nil {
continue
}
if _, ok := r.AnyBound().(int64); ok {
if _, ok := qr.AnyBound().(int64); ok {
v, err := strconv.ParseInt(value, 10, 64)
if err != nil {
continue LOOP