diff --git a/docs/architecture/adr-065-custom-event-indexing.md b/docs/architecture/adr-065-custom-event-indexing.md index e6a3fdead..b5c86ecfa 100644 --- a/docs/architecture/adr-065-custom-event-indexing.md +++ b/docs/architecture/adr-065-custom-event-indexing.md @@ -24,6 +24,7 @@ - April 1, 2021: Initial Draft (@alexanderbez) - April 28, 2021: Specify search capabilities are only supported through the KV indexer (@marbar3778) - May 19, 2021: Update the SQL schema and the eventsink interface (@jayt106) +- Aug 30, 2021: Update the SQL schema and the psql implementation (@creachadair) ## Status @@ -145,163 +146,190 @@ The postgres eventsink will not support `tx_search`, `block_search`, `GetTxByHas ```sql -- Table Definition ---------------------------------------------- -CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', ''); +-- The blocks table records metadata about each block. +-- The block record does not include its events or transactions (see tx_results). +CREATE TABLE blocks ( + rowid BIGSERIAL PRIMARY KEY, -CREATE TABLE block_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - type block_event_type, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL + height BIGINT NOT NULL, + chain_id VARCHAR NOT NULL, + + -- When this block header was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + + UNIQUE (height, chain_id) ); +-- Index blocks by height and chain, since we need to resolve block IDs when +-- indexing transaction records and transaction events. +CREATE INDEX idx_blocks_height_chain ON blocks(height, chain_id); + +-- The tx_results table records metadata about transaction results. Note that +-- the events from a transaction are stored separately. CREATE TABLE tx_results ( - id SERIAL PRIMARY KEY, - tx_result BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL + rowid BIGSERIAL PRIMARY KEY, + + -- The block to which this transaction belongs. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + -- The sequential index of the transaction within the block. + index INTEGER NOT NULL, + -- When this result record was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + -- The hex-encoded hash of the transaction. + tx_hash VARCHAR NOT NULL, + -- The protobuf wire encoding of the TxResult message. + tx_result BYTEA NOT NULL, + + UNIQUE (block_id, index) ); -CREATE TABLE tx_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - hash VARCHAR NOT NULL, - tx_result_id SERIAL, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL, - FOREIGN KEY (tx_result_id) - REFERENCES tx_results(id) - ON DELETE CASCADE +-- The events table records events. All events (both block and transaction) are +-- associated with a block ID; transaction events also have a transaction ID. +CREATE TABLE events ( + rowid BIGSERIAL PRIMARY KEY, + + -- The block and transaction this event belongs to. + -- If tx_id is NULL, this is a block event. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + tx_id BIGINT NULL REFERENCES tx_results(rowid), + + -- The application-defined type label for the event. + type VARCHAR NOT NULL ); --- Indices ------------------------------------------------------- +-- The attributes table records event attributes. +CREATE TABLE attributes ( + event_id BIGINT NOT NULL REFERENCES events(rowid), + key VARCHAR NOT NULL, -- bare key + composite_key VARCHAR NOT NULL, -- composed type.key + value VARCHAR NULL, -CREATE INDEX idx_block_events_key_value ON block_events(key, value); -CREATE INDEX idx_tx_events_key_value ON tx_events(key, value); -CREATE INDEX idx_tx_events_hash ON tx_events(hash); + UNIQUE (event_id, key) +); + +-- A joined view of events and their attributes. Events that do not have any +-- attributes are represented as a single row with empty key and value fields. +CREATE VIEW event_attributes AS + SELECT block_id, tx_id, type, key, composite_key, value + FROM events LEFT JOIN attributes ON (events.rowid = attributes.event_id); + +-- A joined view of all block events (those having tx_id NULL). +CREATE VIEW block_events AS + SELECT blocks.rowid as block_id, height, chain_id, type, key, composite_key, value + FROM blocks JOIN event_attributes ON (blocks.rowid = event_attributes.block_id) + WHERE event_attributes.tx_id IS NULL; + +-- A joined view of all transaction events. +CREATE VIEW tx_events AS + SELECT height, index, chain_id, type, key, composite_key, value, tx_results.created_at + FROM blocks JOIN tx_results ON (blocks.rowid = tx_results.block_id) + JOIN event_attributes ON (tx_results.rowid = event_attributes.tx_id) + WHERE event_attributes.tx_id IS NOT NULL; ``` The `PSQLEventSink` will implement the `EventSink` interface as follows (some details omitted for brevity): - ```go -func NewPSQLEventSink(connStr string, chainID string) (*PSQLEventSink, error) { - db, err := sql.Open("postgres", connStr) - if err != nil { - return nil, err - } +func NewEventSink(connStr, chainID string) (*EventSink, error) { + db, err := sql.Open(driverName, connStr) + // ... - // ... + return &EventSink{ + store: db, + chainID: chainID, + }, nil } -func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { - sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type", "created_at", "chain_id") +func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { + ts := time.Now().UTC() - // index the reserved block height index - ts := time.Now() - sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "", ts, es.chainID) + return runInTransaction(es.store, func(tx *sql.Tx) error { + // Add the block to the blocks table and report back its row ID for use + // in indexing the events for the block. + blockID, err := queryWithID(tx, ` +INSERT INTO blocks (height, chain_id, created_at) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, h.Header.Height, es.chainID, ts) + // ... - for _, event := range h.ResultBeginBlock.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index iff the event specified index:true and it's not a reserved event - compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) - if compositeKey == types.BlockHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) - } - - if attr.GetIndex() { - sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock, ts, es.chainID) - } - } - } - - // index end_block events... - // execute sqlStmt db query... + // Insert the special block meta-event for height. + if err := insertEvents(tx, blockID, 0, []abci.Event{ + makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)), + }); err != nil { + return fmt.Errorf("block meta-events: %w", err) + } + // Insert all the block events. Order is important here, + if err := insertEvents(tx, blockID, 0, h.ResultBeginBlock.Events); err != nil { + return fmt.Errorf("begin-block events: %w", err) + } + if err := insertEvents(tx, blockID, 0, h.ResultEndBlock.Events); err != nil { + return fmt.Errorf("end-block events: %w", err) + } + return nil + }) } -func (es *PSQLEventSink) IndexTxEvents(txr []*abci.TxResult) error { - sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id") - sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result", "created_at") +func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error { + ts := time.Now().UTC() - ts := time.Now() - for _, tx := range txr { - // store the tx result - txBz, err := proto.Marshal(tx) - if err != nil { - return err - } + for _, txr := range txrs { + // Encode the result message in protobuf wire format for indexing. + resultData, err := proto.Marshal(txr) + // ... - sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) + // Index the hash of the underlying transaction as a hex string. + txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash()) - // execute sqlStmtTxResult db query... - var txID uint32 - err = sqlStmtTxResult.QueryRow().Scan(&txID) - if err != nil { + if err := runInTransaction(es.store, func(tx *sql.Tx) error { + // Find the block associated with this transaction. + blockID, err := queryWithID(tx, ` +SELECT rowid FROM blocks WHERE height = $1 AND chain_id = $2; +`, txr.Height, es.chainID) + // ... + + // Insert a record for this tx_result and capture its ID for indexing events. + txID, err := queryWithID(tx, ` +INSERT INTO tx_results (block_id, index, created_at, tx_hash, tx_result) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, blockID, txr.Index, ts, txHash, resultData) + // ... + + // Insert the special transaction meta-events for hash and height. + if err := insertEvents(tx, blockID, txID, []abci.Event{ + makeIndexedEvent(types.TxHashKey, txHash), + makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)), + }); err != nil { + return fmt.Errorf("indexing transaction meta-events: %w", err) + } + // Index any events packaged with the transaction. + if err := insertEvents(tx, blockID, txID, txr.Result.Events); err != nil { + return fmt.Errorf("indexing transaction events: %w", err) + } + return nil + + }); err != nil { return err } - - // index the reserved height and hash indices - hash := types.Tx(tx.Tx).Hash() - sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txID, ts, es.chainID) - sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, tx.Height, tx.Height, hash, txID, ts, es.chainID) - - for _, event := range result.Result.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index if `index: true` is set - compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) - - // ensure event does not conflict with a reserved prefix key - if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) - } - - if attr.GetIndex() { - sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), tx.Height, hash, txID, ts, es.chainID) - } - } - } - } - - // execute sqlStmtEvents db query... + } + return nil } -func (es *PSQLEventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) { - return nil, errors.New("block search is not supported via the postgres event sink") -} +// SearchBlockEvents is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) -func (es *PSQLEventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { - return nil, errors.New("tx search is not supported via the postgres event sink") -} +// SearchTxEvents is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) -func (es *PSQLEventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) { - return nil, errors.New("getTxByHash is not supported via the postgres event sink") -} +// GetTxByHash is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) -func (es *PSQLEventSink) HasBlock(h int64) (bool, error) { - return false, errors.New("hasBlock is not supported via the postgres event sink") -} +// HasBlock is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) HasBlock(h int64) (bool, error) ``` ### Configuration diff --git a/state/indexer/indexer_service_test.go b/state/indexer/indexer_service_test.go index 457ed065a..4d12cc86f 100644 --- a/state/indexer/indexer_service_test.go +++ b/state/indexer/indexer_service_test.go @@ -139,7 +139,7 @@ func setupDB(t *testing.T) (*dockertest.Pool, error) { assert.Nil(t, err) resource, err = pool.RunWithOptions(&dockertest.RunOptions{ - Repository: psql.DriverName, + Repository: "postgres", Tag: "13", Env: []string{ "POSTGRES_USER=" + user, diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index 8bd378f4a..e452ed406 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -6,23 +6,22 @@ import ( "database/sql" "errors" "fmt" + "strings" "time" - sq "github.com/Masterminds/squirrel" - proto "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/proto" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" ) -var _ indexer.EventSink = (*EventSink)(nil) - const ( - TableEventBlock = "block_events" - TableEventTx = "tx_events" - TableResultTx = "tx_results" - DriverName = "postgres" + tableBlocks = "blocks" + tableTxResults = "tx_results" + tableEvents = "events" + tableAttributes = "attributes" + driverName = "postgres" ) // EventSink is an indexer backend providing the tx/block index services. This @@ -37,7 +36,7 @@ type EventSink struct { // database specified by connStr. Events written to the sink are attributed to // the specified chainID. func NewEventSink(connStr, chainID string) (*EventSink, error) { - db, err := sql.Open(DriverName, connStr) + db, err := sql.Open(driverName, connStr) if err != nil { return nil, err } @@ -55,116 +54,183 @@ func (es *EventSink) DB() *sql.DB { return es.store } // Type returns the structure type for this sink, which is Postgres. func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL } +// runInTransaction executes query in a fresh database transaction. +// If query reports an error, the transaction is rolled back and the +// error from query is reported to the caller. +// Otherwise, the result of committing the transaction is returned. +func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error { + dbtx, err := db.Begin() + if err != nil { + return err + } + if err := query(dbtx); err != nil { + _ = dbtx.Rollback() // report the initial error, not the rollback + return err + } + return dbtx.Commit() +} + +// queryWithID executes the specified SQL query with the given arguments, +// expecting a single-row, single-column result containing an ID. If the query +// succeeds, the ID from the result is returned. +func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) { + var id uint32 + if err := tx.QueryRow(query, args...).Scan(&id); err != nil { + return 0, err + } + return id, nil +} + +// insertEvents inserts a slice of events and any indexed attributes of those +// events into the database associated with dbtx. +// +// If txID > 0, the event is attributed to the Tendermint transaction with that +// ID; otherwise it is recorded as a block event. +func insertEvents(dbtx *sql.Tx, blockID, txID uint32, evts []abci.Event) error { + // Populate the transaction ID field iff one is defined (> 0). + var txIDArg interface{} + if txID > 0 { + txIDArg = txID + } + + // Add each event to the events table, and retrieve its row ID to use when + // adding any attributes the event provides. + for _, evt := range evts { + // Skip events with an empty type. + if evt.Type == "" { + continue + } + + eid, err := queryWithID(dbtx, ` +INSERT INTO `+tableEvents+` (block_id, tx_id, type) VALUES ($1, $2, $3) + RETURNING rowid; +`, blockID, txIDArg, evt.Type) + if err != nil { + return err + } + + // Add any attributes flagged for indexing. + for _, attr := range evt.Attributes { + if !attr.Index { + continue + } + compositeKey := evt.Type + "." + attr.Key + if _, err := dbtx.Exec(` +INSERT INTO `+tableAttributes+` (event_id, key, composite_key, value) + VALUES ($1, $2, $3, $4); +`, eid, attr.Key, compositeKey, attr.Value); err != nil { + return err + } + } + } + return nil +} + +// makeIndexedEvent constructs an event from the specified composite key and +// value. If the key has the form "type.name", the event will have a single +// attribute with that name and the value; otherwise the event will have only +// a type and no attributes. +func makeIndexedEvent(compositeKey, value string) abci.Event { + i := strings.Index(compositeKey, ".") + if i < 0 { + return abci.Event{Type: compositeKey} + } + return abci.Event{Type: compositeKey[:i], Attributes: []abci.EventAttribute{ + {Key: compositeKey[i+1:], Value: value, Index: true}, + }} +} + // IndexBlockEvents indexes the specified block header, part of the // indexer.EventSink interface. func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { - sqlStmt := sq. - Insert(TableEventBlock). - Columns("key", "value", "height", "type", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT (key,height)"). - Suffix("DO NOTHING") + ts := time.Now().UTC() - ts := time.Now() - // index the reserved block height index - sqlStmt = sqlStmt. - Values(types.BlockHeightKey, fmt.Sprint(h.Header.Height), h.Header.Height, "", ts, es.chainID) + return runInTransaction(es.store, func(dbtx *sql.Tx) error { + // Add the block to the blocks table and report back its row ID for use + // in indexing the events for the block. + blockID, err := queryWithID(dbtx, ` +INSERT INTO `+tableBlocks+` (height, chain_id, created_at) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, h.Header.Height, es.chainID, ts) + if err == sql.ErrNoRows { + return nil // we already saw this block; quietly succeed + } else if err != nil { + return fmt.Errorf("indexing block header: %w", err) + } - // index begin_block events - sqlStmt, err := indexBlockEvents( - sqlStmt, h.ResultBeginBlock.Events, types.EventTypeBeginBlock, h.Header.Height, ts, es.chainID) - if err != nil { - return err - } - - // index end_block events - sqlStmt, err = indexBlockEvents( - sqlStmt, h.ResultEndBlock.Events, types.EventTypeEndBlock, h.Header.Height, ts, es.chainID) - if err != nil { - return err - } - - _, err = sqlStmt.RunWith(es.store).Exec() - return err + // Insert the special block meta-event for height. + if err := insertEvents(dbtx, blockID, 0, []abci.Event{ + makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)), + }); err != nil { + return fmt.Errorf("block meta-events: %w", err) + } + // Insert all the block events. Order is important here, + if err := insertEvents(dbtx, blockID, 0, h.ResultBeginBlock.Events); err != nil { + return fmt.Errorf("begin-block events: %w", err) + } + if err := insertEvents(dbtx, blockID, 0, h.ResultEndBlock.Events); err != nil { + return fmt.Errorf("end-block events: %w", err) + } + return nil + }) } -func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { - // index the tx result - var txid uint32 - sqlStmtTxResult := sq. - Insert(TableResultTx). - Columns("tx_result", "created_at"). - PlaceholderFormat(sq.Dollar). - RunWith(es.store). - Suffix("ON CONFLICT (tx_result)"). - Suffix("DO NOTHING"). - Suffix("RETURNING \"id\"") +func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error { + ts := time.Now().UTC() - sqlStmtEvents := sq. - Insert(TableEventTx). - Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT (key,hash)"). - Suffix("DO NOTHING") - - ts := time.Now() - for _, tx := range txr { - txBz, err := proto.Marshal(tx) + for _, txr := range txrs { + // Encode the result message in protobuf wire format for indexing. + resultData, err := proto.Marshal(txr) if err != nil { - return err + return fmt.Errorf("marshaling tx_result: %w", err) } - sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) + // Index the hash of the underlying transaction as a hex string. + txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash()) - // execute sqlStmtTxResult db query and retrieve the txid - r, err := sqlStmtTxResult.Query() - if err != nil { - return err - } - defer r.Close() + if err := runInTransaction(es.store, func(dbtx *sql.Tx) error { + // Find the block associated with this transaction. The block header + // must have been indexed prior to the transactions belonging to it. + blockID, err := queryWithID(dbtx, ` +SELECT rowid FROM `+tableBlocks+` WHERE height = $1 AND chain_id = $2; +`, txr.Height, es.chainID) + if err != nil { + return fmt.Errorf("finding block ID: %w", err) + } - if !r.Next() { + // Insert a record for this tx_result and capture its ID for indexing events. + txID, err := queryWithID(dbtx, ` +INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, blockID, txr.Index, ts, txHash, resultData) + if err == sql.ErrNoRows { + return nil // we already saw this transaction; quietly succeed + } else if err != nil { + return fmt.Errorf("indexing tx_result: %w", err) + } + + // Insert the special transaction meta-events for hash and height. + if err := insertEvents(dbtx, blockID, txID, []abci.Event{ + makeIndexedEvent(types.TxHashKey, txHash), + makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)), + }); err != nil { + return fmt.Errorf("indexing transaction meta-events: %w", err) + } + // Index any events packaged with the transaction. + if err := insertEvents(dbtx, blockID, txID, txr.Result.Events); err != nil { + return fmt.Errorf("indexing transaction events: %w", err) + } return nil - } - if err := r.Scan(&txid); err != nil { + }); err != nil { return err } - - // index the reserved height and hash indices - hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash()) - - sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID) - sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID) - for _, event := range tx.Result.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index if `index: true` is set - compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) - - // ensure event does not conflict with a reserved prefix key - if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) - } - - if attr.GetIndex() { - sqlStmtEvents = sqlStmtEvents.Values(compositeTag, attr.Value, tx.Height, hash, txid, ts, es.chainID) - } - } - } } - - // execute sqlStmtEvents db query... - _, err := sqlStmtEvents.RunWith(es.store).Exec() - return err + return nil } // SearchBlockEvents is not implemented by this sink, and reports an error for all queries. @@ -187,39 +253,5 @@ func (es *EventSink) HasBlock(h int64) (bool, error) { return false, errors.New("hasBlock is not supported via the postgres event sink") } -func indexBlockEvents( - sqlStmt sq.InsertBuilder, - events []abci.Event, - ty string, - height int64, - ts time.Time, - chainID string, -) (sq.InsertBuilder, error) { - for _, event := range events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index iff the event specified index:true and it's not a reserved event - compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key) - if compositeKey == types.BlockHeightKey { - return sqlStmt, fmt.Errorf( - "event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) - } - - if attr.GetIndex() { - sqlStmt = sqlStmt.Values(compositeKey, attr.Value, height, ty, ts, chainID) - } - } - } - return sqlStmt, nil -} - // Stop closes the underlying PostgreSQL database. func (es *EventSink) Stop() error { return es.store.Close() } diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 35ad7eea3..e8a1ce833 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -3,320 +3,64 @@ package psql import ( "context" "database/sql" - "errors" + "flag" "fmt" "io/ioutil" + "log" "os" + "os/signal" "testing" "time" - sq "github.com/Masterminds/squirrel" - schema "github.com/adlio/schema" - proto "github.com/gogo/protobuf/proto" - _ "github.com/lib/pq" - dockertest "github.com/ory/dockertest" + "github.com/adlio/schema" + "github.com/gogo/protobuf/proto" + "github.com/ory/dockertest" "github.com/ory/dockertest/docker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" + + // Register the Postgres database driver. + _ "github.com/lib/pq" ) -var db *sql.DB -var resource *dockertest.Resource -var chainID = "test-chainID" +// Verify that the type satisfies the EventSink interface. +var _ indexer.EventSink = (*EventSink)(nil) var ( + doPauseAtExit = flag.Bool("pause-at-exit", false, + "If true, pause the test until interrupted at shutdown, to allow debugging") + + // A hook that test cases can call to obtain the shared database instance + // used for testing the sink. This is initialized in TestMain (see below). + testDB func() *sql.DB +) + +const ( user = "postgres" password = "secret" port = "5432" dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable" dbName = "postgres" + chainID = "test-chainID" + + viewBlockEvents = "block_events" + viewTxEvents = "tx_events" ) -func TestType(t *testing.T) { - pool, err := setupDB(t) - require.NoError(t, err) +func TestMain(m *testing.M) { + flag.Parse() - psqlSink := &EventSink{store: db, chainID: chainID} - assert.Equal(t, indexer.PSQL, psqlSink.Type()) - require.NoError(t, teardown(t, pool)) -} - -func TestBlockFuncs(t *testing.T) { - pool, err := setupDB(t) - require.NoError(t, err) - - indexer := &EventSink{store: db, chainID: chainID} - require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader())) - - r, err := verifyBlock(1) - assert.True(t, r) - require.NoError(t, err) - - r, err = verifyBlock(2) - assert.False(t, r) - require.NoError(t, err) - - r, err = indexer.HasBlock(1) - assert.False(t, r) - assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) - - r, err = indexer.HasBlock(2) - assert.False(t, r) - assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) - - r2, err := indexer.SearchBlockEvents(context.TODO(), nil) - assert.Nil(t, r2) - assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err) - - require.NoError(t, verifyTimeStamp(TableEventBlock)) - - // try to insert the duplicate block events. - err = indexer.IndexBlockEvents(getTestBlockHeader()) - require.NoError(t, err) - - require.NoError(t, teardown(t, pool)) -} - -func TestTxFuncs(t *testing.T) { - pool, err := setupDB(t) - assert.Nil(t, err) - - indexer := &EventSink{store: db, chainID: chainID} - - txResult := txResultWithEvents([]abci.Event{ - {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, - {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}}, - {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, - }) - err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) - require.NoError(t, err) - - tx, err := verifyTx(types.Tx(txResult.Tx).Hash()) - require.NoError(t, err) - assert.Equal(t, txResult, tx) - - require.NoError(t, verifyTimeStamp(TableEventTx)) - require.NoError(t, verifyTimeStamp(TableResultTx)) - - tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash()) - assert.Nil(t, tx) - assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err) - - r2, err := indexer.SearchTxEvents(context.TODO(), nil) - assert.Nil(t, r2) - assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err) - - // try to insert the duplicate tx events. - err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) - require.NoError(t, err) - - assert.Nil(t, teardown(t, pool)) -} - -func TestStop(t *testing.T) { - pool, err := setupDB(t) - require.NoError(t, err) - - indexer := &EventSink{store: db} - require.NoError(t, indexer.Stop()) - - defer db.Close() - require.NoError(t, pool.Purge(resource)) -} - -func getTestBlockHeader() types.EventDataNewBlockHeader { - return types.EventDataNewBlockHeader{ - Header: types.Header{Height: 1}, - ResultBeginBlock: abci.ResponseBeginBlock{ - Events: []abci.Event{ - { - Type: "begin_event", - Attributes: []abci.EventAttribute{ - { - Key: "proposer", - Value: "FCAA001", - Index: true, - }, - }, - }, - }, - }, - ResultEndBlock: abci.ResponseEndBlock{ - Events: []abci.Event{ - { - Type: "end_event", - Attributes: []abci.EventAttribute{ - { - Key: "foo", - Value: "100", - Index: true, - }, - }, - }, - }, - }, - } -} - -func readSchema() ([]*schema.Migration, error) { - - filename := "schema.sql" - contents, err := ioutil.ReadFile(filename) - if err != nil { - return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err) - } - - mg := &schema.Migration{} - mg.ID = time.Now().Local().String() + " db schema" - mg.Script = string(contents) - return append([]*schema.Migration{}, mg), nil -} - -func resetDB(t *testing.T) { - q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results" - _, err := db.Exec(q) - - require.NoError(t, err) - - q = "DROP TYPE IF EXISTS block_event_type" - _, err = db.Exec(q) - require.NoError(t, err) -} - -func txResultWithEvents(events []abci.Event) *abci.TxResult { - tx := types.Tx("HELLO WORLD") - return &abci.TxResult{ - Height: 1, - Index: 0, - Tx: tx, - Result: abci.ResponseDeliverTx{ - Data: []byte{0}, - Code: abci.CodeTypeOK, - Log: "", - Events: events, - }, - } -} - -func verifyTx(hash []byte) (*abci.TxResult, error) { - join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx) - sqlStmt := sq. - Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id"). - Distinct().From(TableResultTx). - InnerJoin(join). - Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash)) - - rows, err := sqlStmt.RunWith(db).Query() - if err != nil { - return nil, err - } - - defer rows.Close() - - if rows.Next() { - var txResult []byte - var txResultID, txid int - var h, cid string - err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid) - if err != nil { - return nil, nil - } - - msg := new(abci.TxResult) - err = proto.Unmarshal(txResult, msg) - if err != nil { - return nil, err - } - - return msg, err - } - - // No result - return nil, nil -} - -func verifyTimeStamp(tb string) error { - - // We assume the tx indexing time would not exceed 2 second from now - sqlStmt := sq. - Select(fmt.Sprintf("%s.created_at", tb)). - Distinct().From(tb). - Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second)) - - rows, err := sqlStmt.RunWith(db).Query() - if err != nil { - return err - } - - defer rows.Close() - - if rows.Next() { - var ts string - return rows.Scan(&ts) - } - - return errors.New("no result") -} - -func verifyBlock(h int64) (bool, error) { - sqlStmt := sq. - Select("height"). - Distinct(). - From(TableEventBlock). - Where(fmt.Sprintf("height = %d", h)) - rows, err := sqlStmt.RunWith(db).Query() - if err != nil { - return false, err - } - - defer rows.Close() - - if !rows.Next() { - return false, nil - } - - sqlStmt = sq. - Select("type, height", "chain_id"). - Distinct(). - From(TableEventBlock). - Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID)) - - rows, err = sqlStmt.RunWith(db).Query() - if err != nil { - return false, err - } - defer rows.Close() - - if !rows.Next() { - return false, nil - } - - sqlStmt = sq. - Select("type, height"). - Distinct(). - From(TableEventBlock). - Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock)) - rows, err = sqlStmt.RunWith(db).Query() - - if err != nil { - return false, err - } - defer rows.Close() - - return rows.Next(), nil -} - -func setupDB(t *testing.T) (*dockertest.Pool, error) { - t.Helper() + // Set up docker and start a container running PostgreSQL. pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL")) + if err != nil { + log.Fatalf("Creating docker pool: %v", err) + } - require.NoError(t, err) - - resource, err = pool.RunWithOptions(&dockertest.RunOptions{ - Repository: DriverName, + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", Tag: "13", Env: []string{ "POSTGRES_USER=" + user, @@ -332,35 +76,269 @@ func setupDB(t *testing.T) (*dockertest.Pool, error) { Name: "no", } }) + if err != nil { + log.Fatalf("Starting docker pool: %v", err) + } - require.NoError(t, err) - - // Set the container to expire in a minute to avoid orphaned containers - // hanging around - _ = resource.Expire(60) + if *doPauseAtExit { + log.Print("Pause at exit is enabled, containers will not expire") + } else { + const expireSeconds = 60 + _ = resource.Expire(expireSeconds) + log.Printf("Container expiration set to %d seconds", expireSeconds) + } + // Connect to the database, clear any leftover data, and install the + // indexing schema. conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName) + var db *sql.DB - require.NoError(t, pool.Retry(func() error { + if err := pool.Retry(func() error { sink, err := NewEventSink(conn, chainID) if err != nil { return err } db = sink.DB() // set global for test use return db.Ping() - })) + }); err != nil { + log.Fatalf("Connecting to database: %v", err) + } - resetDB(t) + if err := resetDatabase(db); err != nil { + log.Fatalf("Flushing database: %v", err) + } sm, err := readSchema() - assert.Nil(t, err) - assert.Nil(t, schema.NewMigrator().Apply(db, sm)) - return pool, nil + if err != nil { + log.Fatalf("Reading schema: %v", err) + } else if err := schema.NewMigrator().Apply(db, sm); err != nil { + log.Fatalf("Applying schema: %v", err) + } + + // Set up the hook for tests to get the shared database handle. + testDB = func() *sql.DB { return db } + + // Run the selected test cases. + code := m.Run() + + // Clean up and shut down the database container. + if *doPauseAtExit { + log.Print("Testing complete, pausing for inspection. Send SIGINT to resume teardown") + waitForInterrupt() + log.Print("(resuming)") + } + log.Print("Shutting down database") + if err := pool.Purge(resource); err != nil { + log.Printf("WARNING: Purging pool failed: %v", err) + } + if err := db.Close(); err != nil { + log.Printf("WARNING: Closing database failed: %v", err) + } + + os.Exit(code) } -func teardown(t *testing.T, pool *dockertest.Pool) error { - t.Helper() - // When you're done, kill and remove the container - assert.Nil(t, pool.Purge(resource)) - return db.Close() +func TestType(t *testing.T) { + psqlSink := &EventSink{store: testDB(), chainID: chainID} + assert.Equal(t, indexer.PSQL, psqlSink.Type()) +} + +func TestIndexing(t *testing.T) { + t.Run("IndexBlockEvents", func(t *testing.T) { + indexer := &EventSink{store: testDB(), chainID: chainID} + require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader())) + + verifyBlock(t, 1) + verifyBlock(t, 2) + + verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(1) }) + verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(2) }) + + verifyNotImplemented(t, "block search", func() (bool, error) { + v, err := indexer.SearchBlockEvents(context.Background(), nil) + return v != nil, err + }) + + require.NoError(t, verifyTimeStamp(tableBlocks)) + + // Attempting to reindex the same events should gracefully succeed. + require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader())) + }) + + t.Run("IndexTxEvents", func(t *testing.T) { + indexer := &EventSink{store: testDB(), chainID: chainID} + + txResult := txResultWithEvents([]abci.Event{ + makeIndexedEvent("account.number", "1"), + makeIndexedEvent("account.owner", "Ivan"), + makeIndexedEvent("account.owner", "Yulieta"), + + {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, + }) + require.NoError(t, indexer.IndexTxEvents([]*abci.TxResult{txResult})) + + txr, err := loadTxResult(types.Tx(txResult.Tx).Hash()) + require.NoError(t, err) + assert.Equal(t, txResult, txr) + + require.NoError(t, verifyTimeStamp(tableTxResults)) + require.NoError(t, verifyTimeStamp(viewTxEvents)) + + verifyNotImplemented(t, "getTxByHash", func() (bool, error) { + txr, err := indexer.GetTxByHash(types.Tx(txResult.Tx).Hash()) + return txr != nil, err + }) + verifyNotImplemented(t, "tx search", func() (bool, error) { + txr, err := indexer.SearchTxEvents(context.Background(), nil) + return txr != nil, err + }) + + // try to insert the duplicate tx events. + err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + }) +} + +func TestStop(t *testing.T) { + indexer := &EventSink{store: testDB()} + require.NoError(t, indexer.Stop()) +} + +// newTestBlockHeader constructs a fresh copy of a block header containing +// known test values to exercise the indexer. +func newTestBlockHeader() types.EventDataNewBlockHeader { + return types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{ + makeIndexedEvent("begin_event.proposer", "FCAA001"), + makeIndexedEvent("thingy.whatzit", "O.O"), + }, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + makeIndexedEvent("end_event.foo", "100"), + makeIndexedEvent("thingy.whatzit", "-.O"), + }, + }, + } +} + +// readSchema loads the indexing database schema file +func readSchema() ([]*schema.Migration, error) { + const filename = "schema.sql" + contents, err := ioutil.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err) + } + + return []*schema.Migration{{ + ID: time.Now().Local().String() + " db schema", + Script: string(contents), + }}, nil +} + +// resetDB drops all the data from the test database. +func resetDatabase(db *sql.DB) error { + _, err := db.Exec(`DROP TABLE IF EXISTS blocks,tx_results,events,attributes CASCADE;`) + if err != nil { + return fmt.Errorf("dropping tables: %v", err) + } + _, err = db.Exec(`DROP VIEW IF EXISTS event_attributes,block_events,tx_events CASCADE;`) + if err != nil { + return fmt.Errorf("dropping views: %v", err) + } + return nil +} + +// txResultWithEvents constructs a fresh transaction result with fixed values +// for testing, that includes the specified events. +func txResultWithEvents(events []abci.Event) *abci.TxResult { + return &abci.TxResult{ + Height: 1, + Index: 0, + Tx: types.Tx("HELLO WORLD"), + Result: abci.ResponseDeliverTx{ + Data: []byte{0}, + Code: abci.CodeTypeOK, + Log: "", + Events: events, + }, + } +} + +func loadTxResult(hash []byte) (*abci.TxResult, error) { + hashString := fmt.Sprintf("%X", hash) + var resultData []byte + if err := testDB().QueryRow(` +SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1; +`, hashString).Scan(&resultData); err != nil { + return nil, fmt.Errorf("lookup transaction for hash %q failed: %v", hashString, err) + } + + txr := new(abci.TxResult) + if err := proto.Unmarshal(resultData, txr); err != nil { + return nil, fmt.Errorf("unmarshaling txr: %v", err) + } + + return txr, nil +} + +func verifyTimeStamp(tableName string) error { + return testDB().QueryRow(fmt.Sprintf(` +SELECT DISTINCT %[1]s.created_at + FROM %[1]s + WHERE %[1]s.created_at >= $1; +`, tableName), time.Now().Add(-2*time.Second)).Err() +} + +func verifyBlock(t *testing.T, height int64) { + // Check that the blocks table contains an entry for this height. + if err := testDB().QueryRow(` +SELECT height FROM `+tableBlocks+` WHERE height = $1; +`, height).Err(); err == sql.ErrNoRows { + t.Errorf("No block found for height=%d", height) + } else if err != nil { + t.Fatalf("Database query failed: %v", err) + } + + // Verify the presence of begin_block and end_block events. + if err := testDB().QueryRow(` +SELECT type, height, chain_id FROM `+viewBlockEvents+` + WHERE height = $1 AND type = $2 AND chain_id = $3; +`, height, types.EventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows { + t.Errorf("No %q event found for height=%d", types.EventTypeBeginBlock, height) + } else if err != nil { + t.Fatalf("Database query failed: %v", err) + } + + if err := testDB().QueryRow(` +SELECT type, height, chain_id FROM `+viewBlockEvents+` + WHERE height = $1 AND type = $2 AND chain_id = $3; +`, height, types.EventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows { + t.Errorf("No %q event found for height=%d", types.EventTypeEndBlock, height) + } else if err != nil { + t.Fatalf("Database query failed: %v", err) + } +} + +// verifyNotImplemented calls f and verifies that it returns both a +// false-valued flag and a non-nil error whose string matching the expected +// "not supported" message with label prefixed. +func verifyNotImplemented(t *testing.T, label string, f func() (bool, error)) { + t.Helper() + t.Logf("Verifying that %q reports it is not implemented", label) + + want := label + " is not supported via the postgres event sink" + ok, err := f() + assert.False(t, ok) + require.NotNil(t, err) + assert.Equal(t, want, err.Error()) +} + +// waitForInterrupt blocks until a SIGINT is received by the process. +func waitForInterrupt() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + <-ch } diff --git a/state/indexer/sink/psql/schema.sql b/state/indexer/sink/psql/schema.sql index 0563136e2..1091cd4c3 100644 --- a/state/indexer/sink/psql/schema.sql +++ b/state/indexer/sink/psql/schema.sql @@ -1,32 +1,85 @@ -CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', ''); -CREATE TABLE block_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - type block_event_type, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL, - UNIQUE (key, height) +/* + This file defines the database schema for the PostgresQL ("psql") event sink + implementation in Tendermint. The operator must create a database and install + this schema before using the database to index events. + */ + +-- The blocks table records metadata about each block. +-- The block record does not include its events or transactions (see tx_results). +CREATE TABLE blocks ( + rowid BIGSERIAL PRIMARY KEY, + + height BIGINT NOT NULL, + chain_id VARCHAR NOT NULL, + + -- When this block header was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + + UNIQUE (height, chain_id) ); + +-- Index blocks by height and chain, since we need to resolve block IDs when +-- indexing transaction records and transaction events. +CREATE INDEX idx_blocks_height_chain ON blocks(height, chain_id); + +-- The tx_results table records metadata about transaction results. Note that +-- the events from a transaction are stored separately. CREATE TABLE tx_results ( - id SERIAL PRIMARY KEY, - tx_result BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - UNIQUE (tx_result) + rowid BIGSERIAL PRIMARY KEY, + + -- The block to which this transaction belongs. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + -- The sequential index of the transaction within the block. + index INTEGER NOT NULL, + -- When this result record was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + -- The hex-encoded hash of the transaction. + tx_hash VARCHAR NOT NULL, + -- The protobuf wire encoding of the TxResult message. + tx_result BYTEA NOT NULL, + + UNIQUE (block_id, index) ); -CREATE TABLE tx_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - hash VARCHAR NOT NULL, - tx_result_id SERIAL, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL, - UNIQUE (hash, key), - FOREIGN KEY (tx_result_id) REFERENCES tx_results(id) ON DELETE CASCADE + +-- The events table records events. All events (both block and transaction) are +-- associated with a block ID; transaction events also have a transaction ID. +CREATE TABLE events ( + rowid BIGSERIAL PRIMARY KEY, + + -- The block and transaction this event belongs to. + -- If tx_id is NULL, this is a block event. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + tx_id BIGINT NULL REFERENCES tx_results(rowid), + + -- The application-defined type label for the event. + type VARCHAR NOT NULL ); -CREATE INDEX idx_block_events_key_value ON block_events(key, value); -CREATE INDEX idx_tx_events_key_value ON tx_events(key, value); -CREATE INDEX idx_tx_events_hash ON tx_events(hash); + +-- The attributes table records event attributes. +CREATE TABLE attributes ( + event_id BIGINT NOT NULL REFERENCES events(rowid), + key VARCHAR NOT NULL, -- bare key + composite_key VARCHAR NOT NULL, -- composed type.key + value VARCHAR NULL, + + UNIQUE (event_id, key) +); + +-- A joined view of events and their attributes. Events that do not have any +-- attributes are represented as a single row with empty key and value fields. +CREATE VIEW event_attributes AS + SELECT block_id, tx_id, type, key, composite_key, value + FROM events LEFT JOIN attributes ON (events.rowid = attributes.event_id); + +-- A joined view of all block events (those having tx_id NULL). +CREATE VIEW block_events AS + SELECT blocks.rowid as block_id, height, chain_id, type, key, composite_key, value + FROM blocks JOIN event_attributes ON (blocks.rowid = event_attributes.block_id) + WHERE event_attributes.tx_id IS NULL; + +-- A joined view of all transaction events. +CREATE VIEW tx_events AS + SELECT height, index, chain_id, type, key, composite_key, value, tx_results.created_at + FROM blocks JOIN tx_results ON (blocks.rowid = tx_results.block_id) + JOIN event_attributes ON (tx_results.rowid = event_attributes.tx_id) + WHERE event_attributes.tx_id IS NOT NULL;