13 KiB
ADR 065: Custom Event Indexing
Changelog
- April 1, 2021: Initial Draft (@alexanderbez)
- April 28, 2021: Specify search capabilities are only supported through the KV indexer (@marbar3778)
- May 19, 2021: Update the SQL schema and the eventsink interface (@jayt106)
Status
Accepted
Context
Currently, Tendermint Core supports block and transaction event indexing through
the tx_index.indexer configuration. Events are captured in transactions and
are indexed via a TxIndexer type. Events are captured in blocks, specifically
from BeginBlock and EndBlock application responses, and are indexed via a
BlockIndexer type. Both of these types are managed by a single IndexerService
which is responsible for consuming events and sending those events off to be
indexed by the respective type.
In addition to indexing, Tendermint Core also supports the ability to query for both indexed transaction and block events via Tendermint's RPC layer. The ability to query for these indexed events facilitates a great multitude of upstream client and application capabilities, e.g. block explorers, IBC relayers, and auxiliary data availability and indexing services.
Currently, Tendermint only supports indexing via a kv indexer, which is supported
by an underlying embedded key/value store database. The kv indexer implements
its own indexing and query mechanisms. While the former is somewhat trivial,
providing a rich and flexible query layer is not as trivial and has caused many
issues and UX concerns for upstream clients and applications.
The fragile nature of the proprietary kv query engine and the potential
performance and scaling issues that arise when a large number of consumers are
introduced, motivate the need for a more robust and flexible indexing and query
solution.
Alternative Approaches
With regards to alternative approaches to a more robust solution, the only serious contender that was considered was to transition to using SQLite.
While the approach would work, it locks us into a specific query language and storage layer, so in some ways it's only a bit better than our current approach. In addition, the implementation would require the introduction of CGO into the Tendermint Core stack, whereas right now CGO is only introduced depending on the database used.
Decision
We will adopt a similar approach to that of the Cosmos SDK's KVStore state
listening described in ADR-038.
Namely, we will perform the following:
- Introduce a new interface,
EventSink, that all data sinks must implement. - Augment the existing
tx_index.indexerconfiguration to now accept a series of one or more indexer types, i.e sinks. - Combine the current
TxIndexerandBlockIndexerinto a singleKVEventSinkthat implements theEventSinkinterface. - Introduce an additional
EventSinkthat is backed by PostgreSQL.- Implement the necessary schemas to support both block and transaction event indexing.
- Update
IndexerServiceto use a series ofEventSinks. - Proxy queries to the relevant sink's native query layer.
- Update all relevant RPC methods.
Detailed Design
EventSink
We introduce the EventSink interface type that all supported sinks must implement.
The interface is defined as follows:
type EventSink interface {
IndexBlockEvents(types.EventDataNewBlockHeader) error
IndexTxEvents([]*abci.TxResult) error
SearchBlockEvents(context.Context, *query.Query) ([]int64, error)
SearchTxEvents(context.Context, *query.Query) ([]*abci.TxResult, error)
GetTxByHash([]byte) (*abci.TxResult, error)
HasBlock(int64) (bool, error)
Type() EventSinkType
Stop() error
}
The IndexerService will accept a list of one or more EventSink types. During
the OnStart method it will call the appropriate APIs on each EventSink to
index both block and transaction events.
Supported Sinks
We will initially support two EventSink types out of the box.
KVEventSink
This type of EventSink is a combination of the TxIndexer and BlockIndexer
indexers, both of which are backed by a single embedded key/value database.
A bulk of the existing business logic will remain the same, but the existing APIs
mapped to the new EventSink API. Both types will be removed in favor of a single
KVEventSink type.
The KVEventSink will be the only EventSink enabled by default, so from a UX
perspective, operators should not notice a difference apart from a configuration
change.
We omit EventSink implementation details as it should be fairly straightforward
to map the existing business logic to the new APIs.
PSQLEventSink
This type of EventSink indexes block and transaction events into a PostgreSQL.
database. We define and automatically migrate the following schema when the
IndexerService starts.
The postgres eventsink will not support tx_search, block_search, GetTxByHash and HasBlock.
-- Table Definition ----------------------------------------------
CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', '');
CREATE TABLE block_events (
id SERIAL PRIMARY KEY,
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
height INTEGER NOT NULL,
type block_event_type,
created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL
);
CREATE TABLE tx_results (
id SERIAL PRIMARY KEY,
tx_result BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE tx_events (
id SERIAL PRIMARY KEY,
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
height INTEGER NOT NULL,
hash VARCHAR NOT NULL,
tx_result_id SERIAL,
created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL,
FOREIGN KEY (tx_result_id)
REFERENCES tx_results(id)
ON DELETE CASCADE
);
-- Indices -------------------------------------------------------
CREATE INDEX idx_block_events_key_value ON block_events(key, value);
CREATE INDEX idx_tx_events_key_value ON tx_events(key, value);
CREATE INDEX idx_tx_events_hash ON tx_events(hash);
The PSQLEventSink will implement the EventSink interface as follows
(some details omitted for brevity):
func NewPSQLEventSink(connStr string, chainID string) (*PSQLEventSink, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
// ...
}
func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type", "created_at", "chain_id")
// index the reserved block height index
ts := time.Now()
sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "", ts, es.chainID)
for _, event := range h.ResultBeginBlock.Events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == types.BlockHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}
if attr.GetIndex() {
sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock, ts, es.chainID)
}
}
}
// index end_block events...
// execute sqlStmt db query...
}
func (es *PSQLEventSink) IndexTxEvents(txr []*abci.TxResult) error {
sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id")
sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result", "created_at")
ts := time.Now()
for _, tx := range txr {
// store the tx result
txBz, err := proto.Marshal(tx)
if err != nil {
return err
}
sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts)
// execute sqlStmtTxResult db query...
var txID uint32
err = sqlStmtTxResult.QueryRow().Scan(&txID)
if err != nil {
return err
}
// index the reserved height and hash indices
hash := types.Tx(tx.Tx).Hash()
sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txID, ts, es.chainID)
sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, tx.Height, tx.Height, hash, txID, ts, es.chainID)
for _, event := range result.Result.Events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
// index if `index: true` is set
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
// ensure event does not conflict with a reserved prefix key
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
}
if attr.GetIndex() {
sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), tx.Height, hash, txID, ts, es.chainID)
}
}
}
}
// execute sqlStmtEvents db query...
}
func (es *PSQLEventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
return nil, errors.New("block search is not supported via the postgres event sink")
}
func (es *PSQLEventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("tx search is not supported via the postgres event sink")
}
func (es *PSQLEventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
return nil, errors.New("getTxByHash is not supported via the postgres event sink")
}
func (es *PSQLEventSink) HasBlock(h int64) (bool, error) {
return false, errors.New("hasBlock is not supported via the postgres event sink")
}
Configuration
The current tx_index.indexer configuration would be changed to accept a list
of supported EventSink types instead of a single value.
Example:
[tx_index]
indexer = [
"kv",
"psql"
]
If the indexer list contains the null indexer, then no indexers will be used
regardless of what other values may exist.
Additional configuration parameters might be required depending on what event
sinks are supplied to tx_index.indexer. The psql will require an additional
connection configuration.
[tx_index]
indexer = [
"kv",
"psql"
]
pqsql_conn = "postgresql://<user>:<password>@<host>:<port>/<db>?<opts>"
Any invalid or misconfigured tx_index configuration should yield an error as
early as possible.
Future Improvements
Although not technically required to maintain feature parity with the current existing Tendermint indexer, it would be beneficial for operators to have a method of performing a "re-index". Specifically, Tendermint operators could invoke an RPC method that allows the Tendermint node to perform a re-indexing of all block and transaction events between two given heights, H1 and H2, so long as the block store contains the blocks and transaction results for all the heights specified in a given range.
Consequences
Positive
- A more robust and flexible indexing and query engine for indexing and search block and transaction events.
- The ability to not have to support a custom indexing and query engine beyond
the legacy
kvtype. - The ability to offload/proxy indexing and querying to the underling sink.
- Scalability and reliability that essentially comes "for free" from the underlying sink, if it supports it.
Negative
- The need to support multiple and potentially a growing set of custom
EventSinktypes.