WIP: start rewriting queries

This commit is contained in:
M. J. Fromberger
2021-08-25 19:56:44 -07:00
parent 2351d3da61
commit b62e185578

View File

@@ -17,10 +17,11 @@ import (
)
const (
tableEventBlock = "block_events"
tableEventTx = "tx_events"
tableResultTx = "tx_results"
driverName = "postgres"
tableBlocks = "blocks"
tableTxResults = "tx_results"
tableEvents = "events"
tableEventAttribute = "event_attributes"
driverName = "postgres"
)
// EventSink is an indexer backend providing the tx/block index services. This
@@ -53,40 +54,37 @@ func (es *EventSink) DB() *sql.DB { return es.store }
// Type returns the structure type for this sink, which is Postgres.
func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL }
func (es *EventSink) indexEvents(blockID, txID int, ts time.Time, events []abci.Event) error {
panic("unimplemented")
}
// IndexBlockEvents indexes the specified block header, part of the
// indexer.EventSink interface.
func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
sqlStmt := sq.
Insert(tableEventBlock).
Columns("key", "value", "height", "type", "created_at", "chain_id").
ts := time.Now().UTC()
// Add the block to the blocks table and report back its row ID for use in
// indexing the events for the block.
row := sq.Insert(tableBlocks).
Columns("height", "chain_id", "created_at").
PlaceholderFormat(sq.Dollar).
Suffix("ON CONFLICT (key,height)").
Suffix("DO NOTHING")
Suffix("ON CONFLICT DO NOTHING").
Suffix("RETURNING rowid").
RunWith(es.store).
Values(h.Header.Height, es.chainID, ts).
QueryRow() // any error will be reported via Scan, below
ts := time.Now()
// index the reserved block height index
sqlStmt = sqlStmt.
Values(types.BlockHeightKey, fmt.Sprint(h.Header.Height), h.Header.Height, "", ts, es.chainID)
// index begin_block events
sqlStmt, err := indexBlockEvents(
sqlStmt, h.ResultBeginBlock.Events, types.EventTypeBeginBlock, h.Header.Height, ts, es.chainID)
if err != nil {
var blockID int
if err := row.Scan(&blockID); err != nil {
return err
}
// index end_block events
sqlStmt, err = indexBlockEvents(
sqlStmt, h.ResultEndBlock.Events, types.EventTypeEndBlock, h.Header.Height, ts, es.chainID)
if err != nil {
return err
}
_, err = sqlStmt.RunWith(es.store).Exec()
return err
return es.indexEvents(blockID, -1, ts,
append(h.ResultBeginBlock.Events, h.ResultEndBlock.Events...))
}
func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
ts := time.Now().UTC()
// index the tx result
var txid uint32
sqlStmtTxResult := sq.
@@ -105,7 +103,6 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
Suffix("ON CONFLICT (key,hash)").
Suffix("DO NOTHING")
ts := time.Now()
for _, tx := range txr {
txBz, err := proto.Marshal(tx)
if err != nil {