sink/psql: json marshal instead of proto (#8637)

Storing transaction records as JSON makes it simpler for clients of the index.
This commit is contained in:
Marko
2022-06-13 19:20:54 +02:00
committed by GitHub
parent 06175129ed
commit 82907c84fa
3 changed files with 16 additions and 8 deletions

View File

@@ -32,7 +32,8 @@ Special thanks to external contributors on this release:
- [abci] \#8605 Remove info, log, events, gasUsed and mempoolError fields from ResponseCheckTx as they are not used by Tendermint. (@jmalicevic)
- [abci] \#8664 Move `app_hash` parameter from `Commit` to `FinalizeBlock`. (@sergio-mena)
- [abci] \#8656 Added cli command for `PrepareProposal`. (@jmalicevic)
- [sink/psql] \#8637 tx_results emitted from psql sink are now json encoded, previously they were protobuf encoded
- P2P Protocol
- [p2p] \#7035 Remove legacy P2P routing implementation and associated configuration options. (@tychoish)

View File

@@ -9,8 +9,7 @@ import (
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/jsonpb"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/pubsub/query"
"github.com/tendermint/tendermint/internal/state/indexer"
@@ -177,12 +176,16 @@ INSERT INTO `+tableBlocks+` (height, chain_id, created_at)
})
}
var (
jsonpbMarshaller = jsonpb.Marshaler{}
)
func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error {
ts := time.Now().UTC()
for _, txr := range txrs {
// Encode the result message in protobuf wire format for indexing.
resultData, err := proto.Marshal(txr)
resultData, err := jsonpbMarshaller.MarshalToString(txr)
if err != nil {
return fmt.Errorf("marshaling tx_result: %w", err)
}

View File

@@ -1,6 +1,7 @@
package psql
import (
"bytes"
"context"
"database/sql"
"flag"
@@ -12,7 +13,7 @@ import (
"time"
"github.com/adlio/schema"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/jsonpb"
"github.com/ory/dockertest"
"github.com/ory/dockertest/docker"
"github.com/stretchr/testify/assert"
@@ -151,6 +152,8 @@ func TestType(t *testing.T) {
assert.Equal(t, indexer.PSQL, psqlSink.Type())
}
var jsonpbUnmarshaller = jsonpb.Unmarshaler{}
func TestIndexing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -278,13 +281,14 @@ func loadTxResult(hash []byte) (*abci.TxResult, error) {
hashString := fmt.Sprintf("%X", hash)
var resultData []byte
if err := testDB().QueryRow(`
SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1;
`, hashString).Scan(&resultData); err != nil {
SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1;
`, hashString).Scan(&resultData); err != nil {
return nil, fmt.Errorf("lookup transaction for hash %q failed: %v", hashString, err)
}
reader := bytes.NewBuffer(resultData)
txr := new(abci.TxResult)
if err := proto.Unmarshal(resultData, txr); err != nil {
if err := jsonpbUnmarshaller.Unmarshal(reader, txr); err != nil {
return nil, fmt.Errorf("unmarshaling txr: %w", err)
}