mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-10 22:10:11 +00:00
WIP: update for new schema, builds but untested
This commit is contained in:
@@ -8,7 +8,6 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
@@ -17,11 +16,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
tableBlocks = "blocks"
|
||||
tableTxResults = "tx_results"
|
||||
tableEvents = "events"
|
||||
tableEventAttribute = "event_attributes"
|
||||
driverName = "postgres"
|
||||
tableBlocks = "blocks"
|
||||
tableTxResults = "tx_results"
|
||||
tableEvents = "events"
|
||||
tableAttributes = "attributes"
|
||||
driverName = "postgres"
|
||||
)
|
||||
|
||||
// EventSink is an indexer backend providing the tx/block index services. This
|
||||
@@ -54,8 +53,70 @@ func (es *EventSink) DB() *sql.DB { return es.store }
|
||||
// Type returns the structure type for this sink, which is Postgres.
|
||||
func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL }
|
||||
|
||||
func (es *EventSink) indexEvents(blockID, txID int, ts time.Time, events []abci.Event) error {
|
||||
panic("unimplemented")
|
||||
// runInTransaction executes query in a fresh database transaction.
|
||||
// If query reports an error, the transaction is rolled back and the
|
||||
// error from query is reported to the caller.
|
||||
// Otherwise, the result of committing the transaction is returned.
|
||||
func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error {
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := query(tx); err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// queryWithID executes the specified SQL query with the given arguments,
|
||||
// expecting a single one-column result containing an ID. If the query
|
||||
// succeeds, the ID from the result is returned.
|
||||
func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) {
|
||||
var id uint32
|
||||
if err := tx.QueryRow(query, args...).Scan(&id); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// insertEvents inserts a slice of events and any indexed attributes of those
|
||||
// events into the database associated with tx.
|
||||
//
|
||||
// If txID > 0, the event is attributed to the transaction with that ID;
|
||||
// otherwise it is recorded as a block event.
|
||||
func insertEvents(tx *sql.Tx, blockID, txID uint32, evts []abci.Event) error {
|
||||
// Populate the transaction ID field iff one is defined (> 0).
|
||||
var txIDArg interface{}
|
||||
if txID > 0 {
|
||||
txIDArg = txID
|
||||
}
|
||||
|
||||
// Add each event to the events table, and retrieve its row ID to use when
|
||||
// adding any attributes the event provides.
|
||||
for _, evt := range evts {
|
||||
eid, err := queryWithID(tx, `
|
||||
INSERT INTO `+tableEvents+` (block_id, tx_id, type) VALUES (?, ?, ?);
|
||||
`, blockID, txIDArg, evt.Type)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add any attributes flagged for indexing.
|
||||
for _, attr := range evt.Attributes {
|
||||
if !attr.Index {
|
||||
continue
|
||||
}
|
||||
compositeKey := evt.Type + "." + attr.Key
|
||||
if _, err := tx.Exec(`
|
||||
INSERT INTO `+tableAttributes+` (event_id, key, composite_key, value)
|
||||
VALUES (?, ?, ?);
|
||||
`, eid, attr.Key, compositeKey, attr.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IndexBlockEvents indexes the specified block header, part of the
|
||||
@@ -63,103 +124,72 @@ func (es *EventSink) indexEvents(blockID, txID int, ts time.Time, events []abci.
|
||||
func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
|
||||
ts := time.Now().UTC()
|
||||
|
||||
// Add the block to the blocks table and report back its row ID for use in
|
||||
// indexing the events for the block.
|
||||
row := sq.Insert(tableBlocks).
|
||||
Columns("height", "chain_id", "created_at").
|
||||
PlaceholderFormat(sq.Dollar).
|
||||
Suffix("ON CONFLICT DO NOTHING").
|
||||
Suffix("RETURNING rowid").
|
||||
RunWith(es.store).
|
||||
Values(h.Header.Height, es.chainID, ts).
|
||||
QueryRow() // any error will be reported via Scan, below
|
||||
return runInTransaction(es.store, func(tx *sql.Tx) error {
|
||||
// Add the block to the blocks table and report back its row ID for use
|
||||
// in indexing the events for the block.
|
||||
blockID, err := queryWithID(tx, `
|
||||
INSERT INTO `+tableBlocks+` (height, chain_id, created_at)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT DO NOTHING
|
||||
RETURNING rowid;
|
||||
`, h.Header.Height, es.chainID, ts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("indexing block header: %w", err)
|
||||
}
|
||||
|
||||
var blockID int
|
||||
if err := row.Scan(&blockID); err != nil {
|
||||
return err
|
||||
}
|
||||
return es.indexEvents(blockID, -1, ts,
|
||||
append(h.ResultBeginBlock.Events, h.ResultEndBlock.Events...))
|
||||
// Insert all the block events. Order is important here,
|
||||
if err := insertEvents(tx, blockID, 0, h.ResultBeginBlock.Events); err != nil {
|
||||
return fmt.Errorf("begin block events: %w", err)
|
||||
}
|
||||
if err := insertEvents(tx, blockID, 0, h.ResultEndBlock.Events); err != nil {
|
||||
return fmt.Errorf("end block events: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
|
||||
func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error {
|
||||
ts := time.Now().UTC()
|
||||
|
||||
// index the tx result
|
||||
var txid uint32
|
||||
sqlStmtTxResult := sq.
|
||||
Insert(tableResultTx).
|
||||
Columns("tx_result", "created_at").
|
||||
PlaceholderFormat(sq.Dollar).
|
||||
RunWith(es.store).
|
||||
Suffix("ON CONFLICT (tx_result)").
|
||||
Suffix("DO NOTHING").
|
||||
Suffix("RETURNING \"id\"")
|
||||
|
||||
sqlStmtEvents := sq.
|
||||
Insert(tableEventTx).
|
||||
Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id").
|
||||
PlaceholderFormat(sq.Dollar).
|
||||
Suffix("ON CONFLICT (key,hash)").
|
||||
Suffix("DO NOTHING")
|
||||
|
||||
for _, tx := range txr {
|
||||
txBz, err := proto.Marshal(tx)
|
||||
for _, txr := range txrs {
|
||||
// Encode the result message in protobuf wire format for indexing.
|
||||
resultData, err := proto.Marshal(txr)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("marshaling tx_result: %w", err)
|
||||
}
|
||||
|
||||
sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts)
|
||||
// Index the hash of the underlying transaction as a hex string.
|
||||
txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash())
|
||||
|
||||
// execute sqlStmtTxResult db query and retrieve the txid
|
||||
r, err := sqlStmtTxResult.Query()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
if err := runInTransaction(es.store, func(tx *sql.Tx) error {
|
||||
// Find the block associated with this transaction.
|
||||
blockID, err := queryWithID(tx, `
|
||||
SELECT rowid FROM `+tableBlocks+` WHERE height=? AND chain_id=?;
|
||||
`, txr.Height, es.chainID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("finding block ID: %w", err)
|
||||
}
|
||||
|
||||
if !r.Next() {
|
||||
// Insert a record for this tx_result and capture its ID for indexing events.
|
||||
txID, err := queryWithID(tx, `
|
||||
INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result)
|
||||
VALUES (?, ?, ?, ?);
|
||||
`, blockID, txr.Index, ts, txHash, resultData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("indexing tx_result: %w", err)
|
||||
}
|
||||
|
||||
// Index any events associated with the transaction.
|
||||
if err := insertEvents(tx, blockID, txID, txr.Result.Events); err != nil {
|
||||
return fmt.Errorf("indexing transaction events: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.Scan(&txid); err != nil {
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// index the reserved height and hash indices
|
||||
hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash())
|
||||
|
||||
sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID)
|
||||
sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID)
|
||||
for _, event := range tx.Result.Events {
|
||||
// only index events with a non-empty type
|
||||
if len(event.Type) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, attr := range event.Attributes {
|
||||
if len(attr.Key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// index if `index: true` is set
|
||||
compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key)
|
||||
|
||||
// ensure event does not conflict with a reserved prefix key
|
||||
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
|
||||
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
|
||||
}
|
||||
|
||||
if attr.GetIndex() {
|
||||
sqlStmtEvents = sqlStmtEvents.Values(compositeTag, attr.Value, tx.Height, hash, txid, ts, es.chainID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// execute sqlStmtEvents db query...
|
||||
_, err := sqlStmtEvents.RunWith(es.store).Exec()
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
|
||||
@@ -182,39 +212,5 @@ func (es *EventSink) HasBlock(h int64) (bool, error) {
|
||||
return false, errors.New("hasBlock is not supported via the postgres event sink")
|
||||
}
|
||||
|
||||
func indexBlockEvents(
|
||||
sqlStmt sq.InsertBuilder,
|
||||
events []abci.Event,
|
||||
ty string,
|
||||
height int64,
|
||||
ts time.Time,
|
||||
chainID string,
|
||||
) (sq.InsertBuilder, error) {
|
||||
for _, event := range events {
|
||||
// only index events with a non-empty type
|
||||
if len(event.Type) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, attr := range event.Attributes {
|
||||
if len(attr.Key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// index iff the event specified index:true and it's not a reserved event
|
||||
compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key)
|
||||
if compositeKey == types.BlockHeightKey {
|
||||
return sqlStmt, fmt.Errorf(
|
||||
"event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
|
||||
}
|
||||
|
||||
if attr.GetIndex() {
|
||||
sqlStmt = sqlStmt.Values(compositeKey, attr.Value, height, ty, ts, chainID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return sqlStmt, nil
|
||||
}
|
||||
|
||||
// Stop closes the underlying PostgreSQL database.
|
||||
func (es *EventSink) Stop() error { return es.store.Close() }
|
||||
|
||||
Reference in New Issue
Block a user