From a55dab76f9a78403b05efbe7c7f6b54d90645634 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 22 Jun 2022 13:50:27 -0400 Subject: [PATCH] updates --- state/indexer/sink/pubsub/pubsub.go | 48 +++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/state/indexer/sink/pubsub/pubsub.go b/state/indexer/sink/pubsub/pubsub.go index eab30af06..83362e60e 100644 --- a/state/indexer/sink/pubsub/pubsub.go +++ b/state/indexer/sink/pubsub/pubsub.go @@ -20,13 +20,13 @@ const ( AttrKeyChainID = "chain_id" AttrKeyBlockHeight = "block_height" + AttrKeyTxHash = "tx_hash" MsgType = "message_type" MsgTypeBeginBlock = "begin_block" MsgTypeEndBlock = "end_block" - - // MsgTypeTxResult = "tx_result" - // MsgTypeTxEvents = "tx_events" + MsgTypeTxResult = "tx_result" + MsgTypeTxEvents = "tx_events" ) var jsonpbMarshaller = jsonpb.Marshaler{ @@ -96,7 +96,7 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error { var results []*pubsub.PublishResult res := es.topic.Publish( - context.Background(), + context.Background(), // NOTE: contexts aren't used in Publish &pubsub.Message{ Data: buf.Bytes(), Attributes: map[string]string{ @@ -116,7 +116,7 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error { } res = es.topic.Publish( - context.Background(), + context.Background(), // NOTE: contexts aren't used in Publish &pubsub.Message{ Data: buf.Bytes(), Attributes: map[string]string{ @@ -139,5 +139,41 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error { } func (es *EventSink) IndexTxs(txrs []*abci.TxResult) error { - panic("implement me!") + buf := new(bytes.Buffer) + + results := make([]*pubsub.PublishResult, len(txrs)) + for i, txr := range txrs { + buf.Reset() // reset buffer prior to next Marshal call + + blockHeightStr := strconv.Itoa(int(txr.Height)) + txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash()) + + if err := jsonpbMarshaller.Marshal(buf, txr); err != nil { + return fmt.Errorf("failed to JSON marshal TxResult: %w", err) + } + + res := es.topic.Publish( + context.Background(), // NOTE: contexts aren't used in Publish + &pubsub.Message{ + Data: buf.Bytes(), + Attributes: map[string]string{ + MsgType: MsgTypeTxResult, + AttrKeyChainID: es.chainID, + AttrKeyBlockHeight: blockHeightStr, + AttrKeyTxHash: txHash, + }, + }, + ) + + results[i] = res + } + + // wait for all messages to be be sent (or failed to be sent) to the server + for _, r := range results { + if _, err := r.Get(context.Background()); err != nil { + return fmt.Errorf("failed to publish pubsub message: %w", err) + } + } + + return nil }