From b62e1855783bcebad53409fa51e57cc39e9e962f Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 25 Aug 2021 19:56:44 -0700 Subject: [PATCH] WIP: start rewriting queries --- state/indexer/sink/psql/psql.go | 55 ++++++++++++++++----------------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index d0d39f626..b9a12d719 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -17,10 +17,11 @@ import ( ) const ( - tableEventBlock = "block_events" - tableEventTx = "tx_events" - tableResultTx = "tx_results" - driverName = "postgres" + tableBlocks = "blocks" + tableTxResults = "tx_results" + tableEvents = "events" + tableEventAttribute = "event_attributes" + driverName = "postgres" ) // EventSink is an indexer backend providing the tx/block index services. This @@ -53,40 +54,37 @@ func (es *EventSink) DB() *sql.DB { return es.store } // Type returns the structure type for this sink, which is Postgres. func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL } +func (es *EventSink) indexEvents(blockID, txID int, ts time.Time, events []abci.Event) error { + panic("unimplemented") +} + // IndexBlockEvents indexes the specified block header, part of the // indexer.EventSink interface. func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { - sqlStmt := sq. - Insert(tableEventBlock). - Columns("key", "value", "height", "type", "created_at", "chain_id"). + ts := time.Now().UTC() + + // Add the block to the blocks table and report back its row ID for use in + // indexing the events for the block. + row := sq.Insert(tableBlocks). + Columns("height", "chain_id", "created_at"). PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT (key,height)"). - Suffix("DO NOTHING") + Suffix("ON CONFLICT DO NOTHING"). + Suffix("RETURNING rowid"). + RunWith(es.store). + Values(h.Header.Height, es.chainID, ts). + QueryRow() // any error will be reported via Scan, below - ts := time.Now() - // index the reserved block height index - sqlStmt = sqlStmt. - Values(types.BlockHeightKey, fmt.Sprint(h.Header.Height), h.Header.Height, "", ts, es.chainID) - - // index begin_block events - sqlStmt, err := indexBlockEvents( - sqlStmt, h.ResultBeginBlock.Events, types.EventTypeBeginBlock, h.Header.Height, ts, es.chainID) - if err != nil { + var blockID int + if err := row.Scan(&blockID); err != nil { return err } - - // index end_block events - sqlStmt, err = indexBlockEvents( - sqlStmt, h.ResultEndBlock.Events, types.EventTypeEndBlock, h.Header.Height, ts, es.chainID) - if err != nil { - return err - } - - _, err = sqlStmt.RunWith(es.store).Exec() - return err + return es.indexEvents(blockID, -1, ts, + append(h.ResultBeginBlock.Events, h.ResultEndBlock.Events...)) } func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { + ts := time.Now().UTC() + // index the tx result var txid uint32 sqlStmtTxResult := sq. @@ -105,7 +103,6 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { Suffix("ON CONFLICT (key,hash)"). Suffix("DO NOTHING") - ts := time.Now() for _, tx := range txr { txBz, err := proto.Marshal(tx) if err != nil {