This commit is contained in:
Aleksandr Bezobchuk
2022-06-22 13:38:55 -04:00
parent 655c00e5ba
commit 126dfead53

View File

@@ -24,6 +24,7 @@ const (
MsgType = "message_type"
MsgTypeBeginBlock = "begin_block"
MsgTypeEndBlock = "end_block"
// MsgTypeTxResult = "tx_result"
// MsgTypeTxEvents = "tx_events"
)
@@ -87,6 +88,8 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
return fmt.Errorf("failed to JSON marshal ResultBeginBlock: %w", err)
}
var results []*pubsub.PublishResult
res := es.topic.Publish(
context.Background(),
&pubsub.Message{
@@ -98,17 +101,13 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
},
},
)
results = append(results, res)
// TODO: Should we wait for the write to complete or just fire and forget?
if _, err := res.Get(context.Background()); err != nil {
return fmt.Errorf("failed to publish pubsub message: %w", err)
}
buf.Reset()
buf.Reset() // reset buffer prior to next Marshal call
// publish EndBlock Events
if err := jsonpbMarshaller.Marshal(buf, &h.ResultEndBlock); err != nil {
return fmt.Errorf("failed to JSON marshal ResultBeginBlock: %w", err)
return fmt.Errorf("failed to JSON marshal ResultEndBlock: %w", err)
}
res = es.topic.Publish(
@@ -122,10 +121,13 @@ func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
},
},
)
results = append(results, res)
// TODO: Should we wait for the write to complete or just fire and forget?
if _, err := res.Get(context.Background()); err != nil {
return fmt.Errorf("failed to publish pubsub message: %w", err)
// wait for all messages to be be sent (or failed to be sent) to the server
for _, r := range results {
if _, err := r.Get(context.Background()); err != nil {
return fmt.Errorf("failed to publish pubsub message: %w", err)
}
}
return nil