This commit is contained in:
Aleksandr Bezobchuk
2022-06-17 15:57:00 -04:00
parent a65f74ad95
commit ac01008f34

View File

@@ -1,12 +1,16 @@
package pubsub
import (
"bytes"
"context"
"fmt"
"os"
"strconv"
"time"
"cloud.google.com/go/pubsub"
"github.com/gogo/protobuf/jsonpb"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/types"
)
@@ -14,12 +18,21 @@ import (
const (
credsEnvVar = "GOOGLE_APPLICATION_CREDENTIALS"
MsgTypeBlock = "block"
MsgTypeTxResult = "tx_result"
MsgTypeTxEvents = "tx_events"
MsgTypeBlockEvents = "block_events"
AttrKeyChainID = "chain_id"
AttrKeyBlockHeight = "block_height"
MsgType = "message_type"
MsgTypeBeginBlock = "begin_block"
MsgTypeEndBlock = "end_block"
// MsgTypeTxResult = "tx_result"
// MsgTypeTxEvents = "tx_events"
)
var jsonpbMarshaller = jsonpb.Marshaler{
EnumsAsInts: true,
EmitDefaults: true,
}
type EventSink struct {
client *pubsub.Client
topic *pubsub.Topic
@@ -66,7 +79,55 @@ func NewEventSink(projectID, topic, chainID string) (*EventSink, error) {
}
func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error {
panic("implement me!")
buf := new(bytes.Buffer)
// publish BeginBlock Events
if err := jsonpbMarshaller.Marshal(buf, &h.ResultBeginBlock); err != nil {
return fmt.Errorf("failed to JSON marshal ResultBeginBlock: %w", err)
}
res := es.topic.Publish(
context.Background(),
&pubsub.Message{
Data: buf.Bytes(),
Attributes: map[string]string{
MsgType: MsgTypeBeginBlock,
AttrKeyChainID: es.chainID,
AttrKeyBlockHeight: strconv.Itoa(int(h.Header.Height)),
},
},
)
// TODO: Should we wait for the write to complete or just fire and forget?
if _, err := res.Get(context.Background()); err != nil {
return fmt.Errorf("failed to publish pubsub message: %w", err)
}
// publish EndBlock Events
buf.Reset()
if err := jsonpbMarshaller.Marshal(buf, &h.ResultEndBlock); err != nil {
return fmt.Errorf("failed to JSON marshal ResultBeginBlock: %w", err)
}
res = es.topic.Publish(
context.Background(),
&pubsub.Message{
Data: buf.Bytes(),
Attributes: map[string]string{
MsgType: MsgTypeEndBlock,
AttrKeyChainID: es.chainID,
AttrKeyBlockHeight: strconv.Itoa(int(h.Header.Height)),
},
},
)
// TODO: Should we wait for the write to complete or just fire and forget?
if _, err := res.Get(context.Background()); err != nil {
return fmt.Errorf("failed to publish pubsub message: %w", err)
}
return nil
}
func (es *EventSink) IndexTxs(txrs []*abci.TxResult) error {