diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index b9a12d719..636ceeabf 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -8,7 +8,6 @@ import ( "fmt" "time" - sq "github.com/Masterminds/squirrel" proto "github.com/gogo/protobuf/proto" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -17,11 +16,11 @@ import ( ) const ( - tableBlocks = "blocks" - tableTxResults = "tx_results" - tableEvents = "events" - tableEventAttribute = "event_attributes" - driverName = "postgres" + tableBlocks = "blocks" + tableTxResults = "tx_results" + tableEvents = "events" + tableAttributes = "attributes" + driverName = "postgres" ) // EventSink is an indexer backend providing the tx/block index services. This @@ -54,8 +53,70 @@ func (es *EventSink) DB() *sql.DB { return es.store } // Type returns the structure type for this sink, which is Postgres. func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL } -func (es *EventSink) indexEvents(blockID, txID int, ts time.Time, events []abci.Event) error { - panic("unimplemented") +// runInTransaction executes query in a fresh database transaction. +// If query reports an error, the transaction is rolled back and the +// error from query is reported to the caller. +// Otherwise, the result of committing the transaction is returned. +func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error { + tx, err := db.Begin() + if err != nil { + return err + } + if err := query(tx); err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} + +// queryWithID executes the specified SQL query with the given arguments, +// expecting a single one-column result containing an ID. If the query +// succeeds, the ID from the result is returned. +func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) { + var id uint32 + if err := tx.QueryRow(query, args...).Scan(&id); err != nil { + return 0, err + } + return id, nil +} + +// insertEvents inserts a slice of events and any indexed attributes of those +// events into the database associated with tx. +// +// If txID > 0, the event is attributed to the transaction with that ID; +// otherwise it is recorded as a block event. +func insertEvents(tx *sql.Tx, blockID, txID uint32, evts []abci.Event) error { + // Populate the transaction ID field iff one is defined (> 0). + var txIDArg interface{} + if txID > 0 { + txIDArg = txID + } + + // Add each event to the events table, and retrieve its row ID to use when + // adding any attributes the event provides. + for _, evt := range evts { + eid, err := queryWithID(tx, ` +INSERT INTO `+tableEvents+` (block_id, tx_id, type) VALUES (?, ?, ?); +`, blockID, txIDArg, evt.Type) + if err != nil { + return err + } + + // Add any attributes flagged for indexing. + for _, attr := range evt.Attributes { + if !attr.Index { + continue + } + compositeKey := evt.Type + "." + attr.Key + if _, err := tx.Exec(` +INSERT INTO `+tableAttributes+` (event_id, key, composite_key, value) + VALUES (?, ?, ?); +`, eid, attr.Key, compositeKey, attr.Value); err != nil { + return err + } + } + } + return nil } // IndexBlockEvents indexes the specified block header, part of the @@ -63,103 +124,72 @@ func (es *EventSink) indexEvents(blockID, txID int, ts time.Time, events []abci. func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { ts := time.Now().UTC() - // Add the block to the blocks table and report back its row ID for use in - // indexing the events for the block. - row := sq.Insert(tableBlocks). - Columns("height", "chain_id", "created_at"). - PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT DO NOTHING"). - Suffix("RETURNING rowid"). - RunWith(es.store). - Values(h.Header.Height, es.chainID, ts). - QueryRow() // any error will be reported via Scan, below + return runInTransaction(es.store, func(tx *sql.Tx) error { + // Add the block to the blocks table and report back its row ID for use + // in indexing the events for the block. + blockID, err := queryWithID(tx, ` +INSERT INTO `+tableBlocks+` (height, chain_id, created_at) + VALUES (?, ?, ?) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, h.Header.Height, es.chainID, ts) + if err != nil { + return fmt.Errorf("indexing block header: %w", err) + } - var blockID int - if err := row.Scan(&blockID); err != nil { - return err - } - return es.indexEvents(blockID, -1, ts, - append(h.ResultBeginBlock.Events, h.ResultEndBlock.Events...)) + // Insert all the block events. Order is important here, + if err := insertEvents(tx, blockID, 0, h.ResultBeginBlock.Events); err != nil { + return fmt.Errorf("begin block events: %w", err) + } + if err := insertEvents(tx, blockID, 0, h.ResultEndBlock.Events); err != nil { + return fmt.Errorf("end block events: %w", err) + } + return nil + }) } -func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { +func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error { ts := time.Now().UTC() - // index the tx result - var txid uint32 - sqlStmtTxResult := sq. - Insert(tableResultTx). - Columns("tx_result", "created_at"). - PlaceholderFormat(sq.Dollar). - RunWith(es.store). - Suffix("ON CONFLICT (tx_result)"). - Suffix("DO NOTHING"). - Suffix("RETURNING \"id\"") - - sqlStmtEvents := sq. - Insert(tableEventTx). - Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT (key,hash)"). - Suffix("DO NOTHING") - - for _, tx := range txr { - txBz, err := proto.Marshal(tx) + for _, txr := range txrs { + // Encode the result message in protobuf wire format for indexing. + resultData, err := proto.Marshal(txr) if err != nil { - return err + return fmt.Errorf("marshaling tx_result: %w", err) } - sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) + // Index the hash of the underlying transaction as a hex string. + txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash()) - // execute sqlStmtTxResult db query and retrieve the txid - r, err := sqlStmtTxResult.Query() - if err != nil { - return err - } - defer r.Close() + if err := runInTransaction(es.store, func(tx *sql.Tx) error { + // Find the block associated with this transaction. + blockID, err := queryWithID(tx, ` +SELECT rowid FROM `+tableBlocks+` WHERE height=? AND chain_id=?; +`, txr.Height, es.chainID) + if err != nil { + return fmt.Errorf("finding block ID: %w", err) + } - if !r.Next() { + // Insert a record for this tx_result and capture its ID for indexing events. + txID, err := queryWithID(tx, ` +INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result) + VALUES (?, ?, ?, ?); +`, blockID, txr.Index, ts, txHash, resultData) + if err != nil { + return fmt.Errorf("indexing tx_result: %w", err) + } + + // Index any events associated with the transaction. + if err := insertEvents(tx, blockID, txID, txr.Result.Events); err != nil { + return fmt.Errorf("indexing transaction events: %w", err) + } return nil - } - if err := r.Scan(&txid); err != nil { + }); err != nil { return err } - - // index the reserved height and hash indices - hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash()) - - sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID) - sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID) - for _, event := range tx.Result.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index if `index: true` is set - compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) - - // ensure event does not conflict with a reserved prefix key - if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) - } - - if attr.GetIndex() { - sqlStmtEvents = sqlStmtEvents.Values(compositeTag, attr.Value, tx.Height, hash, txid, ts, es.chainID) - } - } - } } - - // execute sqlStmtEvents db query... - _, err := sqlStmtEvents.RunWith(es.store).Exec() - return err + return nil } // SearchBlockEvents is not implemented by this sink, and reports an error for all queries. @@ -182,39 +212,5 @@ func (es *EventSink) HasBlock(h int64) (bool, error) { return false, errors.New("hasBlock is not supported via the postgres event sink") } -func indexBlockEvents( - sqlStmt sq.InsertBuilder, - events []abci.Event, - ty string, - height int64, - ts time.Time, - chainID string, -) (sq.InsertBuilder, error) { - for _, event := range events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index iff the event specified index:true and it's not a reserved event - compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key) - if compositeKey == types.BlockHeightKey { - return sqlStmt, fmt.Errorf( - "event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) - } - - if attr.GetIndex() { - sqlStmt = sqlStmt.Values(compositeKey, attr.Value, height, ty, ts, chainID) - } - } - } - return sqlStmt, nil -} - // Stop closes the underlying PostgreSQL database. func (es *EventSink) Stop() error { return es.store.Close() }