diff --git a/state/indexer/sink/pubsub/pubsub.go b/state/indexer/sink/pubsub/pubsub.go index 1e8ba6f01..40d547bd5 100644 --- a/state/indexer/sink/pubsub/pubsub.go +++ b/state/indexer/sink/pubsub/pubsub.go @@ -1,12 +1,16 @@ package pubsub import ( + "bytes" "context" "fmt" "os" + "strconv" "time" "cloud.google.com/go/pubsub" + "github.com/gogo/protobuf/jsonpb" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/types" ) @@ -14,12 +18,21 @@ import ( const ( credsEnvVar = "GOOGLE_APPLICATION_CREDENTIALS" - MsgTypeBlock = "block" - MsgTypeTxResult = "tx_result" - MsgTypeTxEvents = "tx_events" - MsgTypeBlockEvents = "block_events" + AttrKeyChainID = "chain_id" + AttrKeyBlockHeight = "block_height" + + MsgType = "message_type" + MsgTypeBeginBlock = "begin_block" + MsgTypeEndBlock = "end_block" + // MsgTypeTxResult = "tx_result" + // MsgTypeTxEvents = "tx_events" ) +var jsonpbMarshaller = jsonpb.Marshaler{ + EnumsAsInts: true, + EmitDefaults: true, +} + type EventSink struct { client *pubsub.Client topic *pubsub.Topic @@ -66,7 +79,55 @@ func NewEventSink(projectID, topic, chainID string) (*EventSink, error) { } func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error { - panic("implement me!") + buf := new(bytes.Buffer) + + // publish BeginBlock Events + if err := jsonpbMarshaller.Marshal(buf, &h.ResultBeginBlock); err != nil { + return fmt.Errorf("failed to JSON marshal ResultBeginBlock: %w", err) + } + + res := es.topic.Publish( + context.Background(), + &pubsub.Message{ + Data: buf.Bytes(), + Attributes: map[string]string{ + MsgType: MsgTypeBeginBlock, + AttrKeyChainID: es.chainID, + AttrKeyBlockHeight: strconv.Itoa(int(h.Header.Height)), + }, + }, + ) + + // TODO: Should we wait for the write to complete or just fire and forget? + if _, err := res.Get(context.Background()); err != nil { + return fmt.Errorf("failed to publish pubsub message: %w", err) + } + + // publish EndBlock Events + buf.Reset() + + if err := jsonpbMarshaller.Marshal(buf, &h.ResultEndBlock); err != nil { + return fmt.Errorf("failed to JSON marshal ResultBeginBlock: %w", err) + } + + res = es.topic.Publish( + context.Background(), + &pubsub.Message{ + Data: buf.Bytes(), + Attributes: map[string]string{ + MsgType: MsgTypeEndBlock, + AttrKeyChainID: es.chainID, + AttrKeyBlockHeight: strconv.Itoa(int(h.Header.Height)), + }, + }, + ) + + // TODO: Should we wait for the write to complete or just fire and forget? + if _, err := res.Get(context.Background()); err != nil { + return fmt.Errorf("failed to publish pubsub message: %w", err) + } + + return nil } func (es *EventSink) IndexTxs(txrs []*abci.TxResult) error {