This commit is contained in:
Aleksandr Bezobchuk
2022-06-22 13:50:27 -04:00
parent 30045c3c05
commit a55dab76f9

View File

@@ -20,13 +20,13 @@ const (
AttrKeyChainID = "chain_id"
AttrKeyBlockHeight = "block_height"
AttrKeyTxHash = "tx_hash"
MsgType = "message_type"
MsgTypeBeginBlock = "begin_block"
MsgTypeEndBlock = "end_block"
// MsgTypeTxResult = "tx_result"
// MsgTypeTxEvents = "tx_events"
MsgTypeTxResult = "tx_result"
MsgTypeTxEvents = "tx_events"
)
var jsonpbMarshaller = jsonpb.Marshaler{
@@ -96,7 +96,7 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
var results []*pubsub.PublishResult
res := es.topic.Publish(
context.Background(),
context.Background(), // NOTE: contexts aren't used in Publish
&pubsub.Message{
Data: buf.Bytes(),
Attributes: map[string]string{
@@ -116,7 +116,7 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
}
res = es.topic.Publish(
context.Background(),
context.Background(), // NOTE: contexts aren't used in Publish
&pubsub.Message{
Data: buf.Bytes(),
Attributes: map[string]string{
@@ -139,5 +139,41 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
}
func (es *EventSink) IndexTxs(txrs []*abci.TxResult) error {
panic("implement me!")
buf := new(bytes.Buffer)
results := make([]*pubsub.PublishResult, len(txrs))
for i, txr := range txrs {
buf.Reset() // reset buffer prior to next Marshal call
blockHeightStr := strconv.Itoa(int(txr.Height))
txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash())
if err := jsonpbMarshaller.Marshal(buf, txr); err != nil {
return fmt.Errorf("failed to JSON marshal TxResult: %w", err)
}
res := es.topic.Publish(
context.Background(), // NOTE: contexts aren't used in Publish
&pubsub.Message{
Data: buf.Bytes(),
Attributes: map[string]string{
MsgType: MsgTypeTxResult,
AttrKeyChainID: es.chainID,
AttrKeyBlockHeight: blockHeightStr,
AttrKeyTxHash: txHash,
},
},
)
results[i] = res
}
// wait for all messages to be be sent (or failed to be sent) to the server
for _, r := range results {
if _, err := r.Get(context.Background()); err != nil {
return fmt.Errorf("failed to publish pubsub message: %w", err)
}
}
return nil
}