diff --git a/state/indexer/sink/pubsub/pubsub.go b/state/indexer/sink/pubsub/pubsub.go index 68ff836cc..44e423fa0 100644 --- a/state/indexer/sink/pubsub/pubsub.go +++ b/state/indexer/sink/pubsub/pubsub.go @@ -7,18 +7,12 @@ import ( "time" "cloud.google.com/go/pubsub" - - "github.com/tendermint/tendermint/state/indexer" - "github.com/tendermint/tendermint/state/txindex" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/types" ) const credsEnvVar = "GOOGLE_APPLICATION_CREDENTIALS" -var ( - _ indexer.BlockIndexer = (*EventSink)(nil) - _ txindex.TxIndexer = (*EventSink)(nil) -) - type EventSink struct { client *pubsub.Client chainID string @@ -42,3 +36,11 @@ func NewEventSink(connStr, chainID string) (*EventSink, error) { chainID: chainID, }, nil } + +func (es *EventSink) IndexBlock(h types.EventDataNewBlockHeader) error { + panic("implement me!") +} + +func (es *EventSink) IndexTxs(txrs []*abci.TxResult) error { + panic("implement me!") +} diff --git a/state/indexer/sink/pubsub/wrapper.go b/state/indexer/sink/pubsub/wrapper.go new file mode 100644 index 000000000..0d0f66eaf --- /dev/null +++ b/state/indexer/sink/pubsub/wrapper.go @@ -0,0 +1,57 @@ +package pubsub + +import ( + "context" + "errors" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/types" +) + +var ( + _ indexer.BlockIndexer = (*BlockIndexer)(nil) + _ txindex.TxIndexer = (*TxIndexer)(nil) +) + +// BlockIndexer implements a wrapper around the Pubsub sink and supports block +// indexing by implementing the indexer.BlockIndexer interface. +type BlockIndexer struct { + sink *EventSink +} + +func (bi *BlockIndexer) Has(_ int64) (bool, error) { + return false, errors.New("the Has method is not supported for the Pubsub indexer") +} + +func (bi *BlockIndexer) Search(_ context.Context, _ *query.Query) ([]int64, error) { + return nil, errors.New("the Search method is not supported for the Pubsub indexer") +} + +func (bi *BlockIndexer) Index(block types.EventDataNewBlockHeader) error { + return bi.sink.IndexBlock(block) +} + +// TxIndexer implements a wrapper around the Pubsub sink and supports tx +// indexing by implementing the txindex.TxIndexer interface. +type TxIndexer struct { + sink *EventSink +} + +func (ti *TxIndexer) AddBatch(batch *txindex.Batch) error { + return ti.sink.IndexTxs(batch.Ops) +} + +func (ti *TxIndexer) Index(txr *abci.TxResult) error { + return ti.sink.IndexTxs([]*abci.TxResult{txr}) +} + +func (ti *TxIndexer) Get(hash []byte) (*abci.TxResult, error) { + return nil, errors.New("the Get method is not supported for the Pubsub indexer") +} + +func (ti *TxIndexer) Search(_ context.Context, _ *query.Query) ([]*abci.TxResult, error) { + return nil, errors.New("the Search method is not supported for the Pubsub indexer") +}