mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-13 07:11:13 +00:00
Merge branch 'master' into bez/p2p-refactor-blockchain-v0-reactor
This commit is contained in:
@@ -49,7 +49,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
|
||||
- [crypto/ed25519] \#5632 Adopt zip215 `ed25519` verification. (@marbar3778)
|
||||
- [privval] \#5603 Add `--key` to `init`, `gen_validator`, `testnet` & `unsafe_reset_priv_validator` for use in generating `secp256k1` keys.
|
||||
- [privval] \#5725 add gRPC support to private validator.
|
||||
- [privval] \#5725 Add gRPC support to private validator.
|
||||
- [privval] \#5876 `tendermint show-validator` will query the remote signer if gRPC is being used (@marbar3778)
|
||||
- [abci/client] \#5673 `Async` requests return an error if queue is full (@melekes)
|
||||
- [mempool] \#5673 Cancel `CheckTx` requests if RPC client disconnects or times out (@melekes)
|
||||
- [abci] \#5706 Added `AbciVersion` to `RequestInfo` allowing applications to check ABCI version when connecting to Tendermint. (@marbar3778)
|
||||
@@ -65,3 +66,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash)
|
||||
- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
|
||||
- [blockchain/v1] \#5711 Fix deadlock (@melekes)
|
||||
- [evidence] \#5890 Add a buffer to evidence from consensus to avoid broadcasting and proposing evidence before the
|
||||
height of such an evidence has finished (@cmwaters)
|
||||
- [statesync] \#5889 Set `LastHeightConsensusParamsChanged` when bootstrapping Tendermint state (@cmwaters)
|
||||
@@ -5,9 +5,12 @@ import (
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
tmos "github.com/tendermint/tendermint/libs/os"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
tmgrpc "github.com/tendermint/tendermint/privval/grpc"
|
||||
)
|
||||
|
||||
// ShowValidatorCmd adds capabilities for showing the validator info.
|
||||
@@ -20,16 +23,36 @@ var ShowValidatorCmd = &cobra.Command{
|
||||
}
|
||||
|
||||
func showValidator(cmd *cobra.Command, args []string) error {
|
||||
keyFilePath := config.PrivValidatorKeyFile()
|
||||
if !tmos.FileExists(keyFilePath) {
|
||||
return fmt.Errorf("private validator file %s does not exist", keyFilePath)
|
||||
}
|
||||
var (
|
||||
pubKey crypto.PubKey
|
||||
err error
|
||||
)
|
||||
|
||||
pv := privval.LoadFilePV(keyFilePath, config.PrivValidatorStateFile())
|
||||
//TODO: remove once gRPC is the only supported protocol
|
||||
protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr)
|
||||
switch protocol {
|
||||
case "grpc":
|
||||
pvsc, err := tmgrpc.DialRemoteSigner(config, config.ChainID(), logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't connect to remote validator %w", err)
|
||||
}
|
||||
pubKey, err = pvsc.GetPubKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
default:
|
||||
|
||||
pubKey, err := pv.GetPubKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get pubkey: %w", err)
|
||||
keyFilePath := config.PrivValidatorKeyFile()
|
||||
if !tmos.FileExists(keyFilePath) {
|
||||
return fmt.Errorf("private validator file %s does not exist", keyFilePath)
|
||||
}
|
||||
|
||||
pv := privval.LoadFilePV(keyFilePath, config.PrivValidatorStateFile())
|
||||
|
||||
pubKey, err = pv.GetPubKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
bz, err := tmjson.Marshal(pubKey)
|
||||
|
||||
@@ -43,6 +43,10 @@ type Pool struct {
|
||||
mtx sync.Mutex
|
||||
// latest state
|
||||
state sm.State
|
||||
// evidence from consensus if buffered to this slice, awaiting until the next height
|
||||
// before being flushed to the pool. This prevents broadcasting and proposing of
|
||||
// evidence before the height with which the evidence happened is finished.
|
||||
consensusBuffer []types.Evidence
|
||||
|
||||
pruningHeight int64
|
||||
pruningTime time.Time
|
||||
@@ -57,12 +61,13 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore
|
||||
}
|
||||
|
||||
pool := &Pool{
|
||||
stateDB: stateDB,
|
||||
blockStore: blockStore,
|
||||
state: state,
|
||||
logger: logger,
|
||||
evidenceStore: evidenceDB,
|
||||
evidenceList: clist.New(),
|
||||
stateDB: stateDB,
|
||||
blockStore: blockStore,
|
||||
state: state,
|
||||
logger: logger,
|
||||
evidenceStore: evidenceDB,
|
||||
evidenceList: clist.New(),
|
||||
consensusBuffer: make([]types.Evidence, 0),
|
||||
}
|
||||
|
||||
// If pending evidence already in db, in event of prior failure, then check
|
||||
@@ -115,7 +120,14 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
|
||||
"last_block_time", state.LastBlockTime,
|
||||
)
|
||||
|
||||
evpool.updateState(state)
|
||||
evpool.mtx.Lock()
|
||||
// flush awaiting evidence from consensus into pool
|
||||
evpool.flushConsensusBuffer()
|
||||
// update state
|
||||
evpool.state = state
|
||||
evpool.mtx.Unlock()
|
||||
|
||||
// move committed evidence out from the pending pool and into the committed pool
|
||||
evpool.markEvidenceAsCommitted(ev)
|
||||
|
||||
// Prune pending evidence when it has expired. This also updates when the next
|
||||
@@ -170,14 +182,14 @@ func (evpool *Pool) AddEvidenceFromConsensus(ev types.Evidence) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := evpool.addPendingEvidence(ev); err != nil {
|
||||
return fmt.Errorf("failed to add evidence to pending list: %w", err)
|
||||
}
|
||||
// add evidence to a buffer which will pass the evidence to the pool at the following height.
|
||||
// This avoids the issue of some nodes verifying and proposing evidence at a height where the
|
||||
// block hasn't been committed on cause others to potentially fail.
|
||||
evpool.mtx.Lock()
|
||||
defer evpool.mtx.Unlock()
|
||||
evpool.consensusBuffer = append(evpool.consensusBuffer, ev)
|
||||
evpool.logger.Info("received new evidence of byzantine behavior from consensus", "evidence", ev)
|
||||
|
||||
// add evidence to be gossiped with peers
|
||||
evpool.evidenceList.PushBack(ev)
|
||||
|
||||
evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -520,10 +532,19 @@ func (evpool *Pool) removeEvidenceFromList(
|
||||
}
|
||||
}
|
||||
|
||||
func (evpool *Pool) updateState(state sm.State) {
|
||||
evpool.mtx.Lock()
|
||||
defer evpool.mtx.Unlock()
|
||||
evpool.state = state
|
||||
// flushConsensusBuffer moves the evidence produced from consensus into the evidence pool
|
||||
// and list so that it can be broadcasted and proposed
|
||||
func (evpool *Pool) flushConsensusBuffer() {
|
||||
for _, ev := range evpool.consensusBuffer {
|
||||
if err := evpool.addPendingEvidence(ev); err != nil {
|
||||
evpool.logger.Error("failed to flush evidence from consensus buffer to pending list: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
evpool.evidenceList.PushBack(ev)
|
||||
}
|
||||
// reset consensus buffer
|
||||
evpool.consensusBuffer = make([]types.Evidence, 0)
|
||||
}
|
||||
|
||||
func bytesToEv(evBytes []byte) (types.Evidence, error) {
|
||||
|
||||
@@ -143,13 +143,31 @@ func TestAddEvidenceFromConsensus(t *testing.T) {
|
||||
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, val, evidenceChainID)
|
||||
|
||||
require.NoError(t, pool.AddEvidenceFromConsensus(ev))
|
||||
|
||||
// evidence from consensus should not be added immediately but reside in the consensus buffer
|
||||
evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes)
|
||||
require.Empty(t, evList)
|
||||
require.Zero(t, evSize)
|
||||
|
||||
next := pool.EvidenceFront()
|
||||
require.Equal(t, ev, next.Value.(types.Evidence))
|
||||
require.Nil(t, next)
|
||||
|
||||
// move to next height and update state and evidence pool
|
||||
state := pool.State()
|
||||
state.LastBlockHeight++
|
||||
pool.Update(state, []types.Evidence{})
|
||||
|
||||
// should be able to retrieve evidence from pool
|
||||
evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes)
|
||||
require.Equal(t, []types.Evidence{ev}, evList)
|
||||
|
||||
// shouldn't be able to submit the same evidence twice
|
||||
require.NoError(t, pool.AddEvidenceFromConsensus(ev))
|
||||
evs, _ := pool.PendingEvidence(defaultEvidenceMaxBytes)
|
||||
require.Equal(t, 1, len(evs))
|
||||
state = pool.State()
|
||||
state.LastBlockHeight++
|
||||
pool.Update(state, []types.Evidence{})
|
||||
evList2, _ := pool.PendingEvidence(defaultEvidenceMaxBytes)
|
||||
require.Equal(t, evList, evList2)
|
||||
}
|
||||
|
||||
func TestEvidencePoolUpdate(t *testing.T) {
|
||||
|
||||
31
node/node.go
31
node/node.go
@@ -10,11 +10,9 @@ import (
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
"time"
|
||||
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/cors"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
@@ -691,11 +689,11 @@ func NewNode(config *cfg.Config,
|
||||
// If an address is provided, listen on the socket for a connection from an
|
||||
// external signing process.
|
||||
if config.PrivValidatorListenAddr != "" {
|
||||
protocol, address := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr)
|
||||
protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr)
|
||||
// FIXME: we should start services inside OnStart
|
||||
switch protocol {
|
||||
case "grpc":
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(config, address, genDoc.ChainID, logger)
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(config, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator grpc client: %w", err)
|
||||
}
|
||||
@@ -1475,33 +1473,10 @@ func createAndStartPrivValidatorSocketClient(
|
||||
|
||||
func createAndStartPrivValidatorGRPCClient(
|
||||
config *cfg.Config,
|
||||
address,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
var transportSecurity grpc.DialOption
|
||||
if config.BaseConfig.ArePrivValidatorClientSecurityOptionsPresent() {
|
||||
transportSecurity = tmgrpc.GenerateTLS(config.PrivValidatorClientCertificateFile(),
|
||||
config.PrivValidatorClientKeyFile(), config.PrivValidatorRootCAFile(), logger)
|
||||
} else {
|
||||
transportSecurity = grpc.WithInsecure()
|
||||
logger.Info("Using an insecure gRPC connection!")
|
||||
}
|
||||
dialOptions := tmgrpc.DefaultDialOptions()
|
||||
if config.Instrumentation.Prometheus {
|
||||
grpcMetrics := grpc_prometheus.DefaultClientMetrics
|
||||
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor()))
|
||||
}
|
||||
|
||||
dialOptions = append(dialOptions, transportSecurity)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, address, dialOptions...)
|
||||
if err != nil {
|
||||
logger.Error("unable to connect to server", "target", address, "err", err)
|
||||
}
|
||||
pvsc, err := tmgrpc.NewSignerClient(conn, chainID, logger)
|
||||
pvsc, err := tmgrpc.DialRemoteSigner(config, chainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"io/ioutil"
|
||||
@@ -8,7 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmnet "github.com/tendermint/tendermint/libs/net"
|
||||
grpc "google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -80,3 +85,36 @@ func GenerateTLS(certPath, keyPath, ca string, log log.Logger) grpc.DialOption {
|
||||
|
||||
return grpc.WithTransportCredentials(transportCreds)
|
||||
}
|
||||
|
||||
// DialRemoteSigner is a generalized function to dial the gRPC server.
|
||||
func DialRemoteSigner(
|
||||
config *cfg.Config,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
) (*SignerClient, error) {
|
||||
var transportSecurity grpc.DialOption
|
||||
if config.BaseConfig.ArePrivValidatorClientSecurityOptionsPresent() {
|
||||
transportSecurity = GenerateTLS(config.PrivValidatorClientCertificateFile(),
|
||||
config.PrivValidatorClientKeyFile(), config.PrivValidatorRootCAFile(), logger)
|
||||
} else {
|
||||
transportSecurity = grpc.WithInsecure()
|
||||
logger.Info("Using an insecure gRPC connection!")
|
||||
}
|
||||
|
||||
dialOptions := DefaultDialOptions()
|
||||
if config.Instrumentation.Prometheus {
|
||||
grpcMetrics := grpc_prometheus.DefaultClientMetrics
|
||||
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor()))
|
||||
}
|
||||
|
||||
dialOptions = append(dialOptions, transportSecurity)
|
||||
|
||||
ctx := context.Background()
|
||||
_, address := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr)
|
||||
conn, err := grpc.DialContext(ctx, address, dialOptions...)
|
||||
if err != nil {
|
||||
logger.Error("unable to connect to server", "target", address, "err", err)
|
||||
}
|
||||
|
||||
return NewSignerClient(conn, chainID, logger)
|
||||
}
|
||||
|
||||
@@ -222,7 +222,8 @@ func (store dbStore) Bootstrap(state State) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := store.saveConsensusParamsInfo(height, height, state.ConsensusParams); err != nil {
|
||||
if err := store.saveConsensusParamsInfo(height,
|
||||
state.LastHeightConsensusParamsChanged, state.ConsensusParams); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
@@ -10,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/google/orderedcode"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
@@ -18,13 +18,12 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
tagKeySeparator = "/"
|
||||
)
|
||||
|
||||
var _ txindex.TxIndexer = (*TxIndex)(nil)
|
||||
|
||||
// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
|
||||
// TxIndex is the simplest possible indexer
|
||||
// It is backed by two kv stores:
|
||||
// 1. txhash - result (primary key)
|
||||
// 2. event - txhash (secondary key)
|
||||
type TxIndex struct {
|
||||
store dbm.DB
|
||||
}
|
||||
@@ -43,7 +42,7 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) {
|
||||
return nil, txindex.ErrorEmptyHash
|
||||
}
|
||||
|
||||
rawBytes, err := txi.store.Get(hash)
|
||||
rawBytes, err := txi.store.Get(primaryKey(hash))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -78,7 +77,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
|
||||
}
|
||||
|
||||
// index by height (always)
|
||||
err = storeBatch.Set(keyForHeight(result), hash)
|
||||
err = storeBatch.Set(keyFromHeight(result), hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -88,7 +87,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
|
||||
return err
|
||||
}
|
||||
// index by hash (always)
|
||||
err = storeBatch.Set(hash, rawBytes)
|
||||
err = storeBatch.Set(primaryKey(hash), rawBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -114,7 +113,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
|
||||
}
|
||||
|
||||
// index by height (always)
|
||||
err = b.Set(keyForHeight(result), hash)
|
||||
err = b.Set(keyFromHeight(result), hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -124,7 +123,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
|
||||
return err
|
||||
}
|
||||
// index by hash (always)
|
||||
err = b.Set(hash, rawBytes)
|
||||
err = b.Set(primaryKey(hash), rawBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -146,8 +145,12 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba
|
||||
|
||||
// index if `index: true` is set
|
||||
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
|
||||
// ensure event does not conflict with a reserved prefix key
|
||||
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
|
||||
return fmt.Errorf("event type and attribute key \"%s\" is reserved. Please use a different key", compositeTag)
|
||||
}
|
||||
if attr.GetIndex() {
|
||||
err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash)
|
||||
err := store.Set(keyFromEvent(compositeTag, attr.Value, result), hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -215,7 +218,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
|
||||
for _, r := range ranges {
|
||||
if !hashesInitialized {
|
||||
filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, true)
|
||||
filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, true)
|
||||
hashesInitialized = true
|
||||
|
||||
// Ignore any remaining conditions if the first condition resulted
|
||||
@@ -224,7 +227,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, false)
|
||||
filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -239,7 +242,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
}
|
||||
|
||||
if !hashesInitialized {
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true)
|
||||
filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, true)
|
||||
hashesInitialized = true
|
||||
|
||||
// Ignore any remaining conditions if the first condition resulted
|
||||
@@ -248,7 +251,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
|
||||
break
|
||||
}
|
||||
} else {
|
||||
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false)
|
||||
filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,7 +433,7 @@ func (txi *TxIndex) match(
|
||||
case c.Op == query.OpExists:
|
||||
// XXX: can't use startKeyBz here because c.Operand is nil
|
||||
// (e.g. "account.owner/<nil>/" won't match w/ a single row)
|
||||
it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey))
|
||||
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -454,18 +457,18 @@ func (txi *TxIndex) match(
|
||||
// XXX: startKey does not apply here.
|
||||
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
|
||||
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
|
||||
it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey))
|
||||
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer it.Close()
|
||||
|
||||
for ; it.Valid(); it.Next() {
|
||||
if !isTagKey(it.Key()) {
|
||||
value, err := parseValueFromKey(it.Key())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
|
||||
if strings.Contains(value, c.Operand.(string)) {
|
||||
tmpHashes[string(it.Value())] = it.Value()
|
||||
}
|
||||
|
||||
@@ -542,12 +545,12 @@ func (txi *TxIndex) matchRange(
|
||||
|
||||
LOOP:
|
||||
for ; it.Valid(); it.Next() {
|
||||
if !isTagKey(it.Key()) {
|
||||
value, err := parseValueFromKey(it.Key())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := r.AnyBound().(int64); ok {
|
||||
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
||||
v, err := strconv.ParseInt(value, 10, 64)
|
||||
if err != nil {
|
||||
continue LOOP
|
||||
}
|
||||
@@ -613,46 +616,102 @@ LOOP:
|
||||
return filteredHashes
|
||||
}
|
||||
|
||||
// Keys
|
||||
// ########################## Keys #############################
|
||||
//
|
||||
// The indexer has two types of kv stores:
|
||||
// 1. txhash - result (primary key)
|
||||
// 2. event - txhash (secondary key)
|
||||
//
|
||||
// The event key can be decomposed into 4 parts.
|
||||
// 1. A composite key which can be any string.
|
||||
// Usually something like "tx.height" or "account.owner"
|
||||
// 2. A value. That corresponds to the key. In the above
|
||||
// example the value could be "5" or "Ivan"
|
||||
// 3. The height of the Tx that aligns with the key and value.
|
||||
// 4. The index of the Tx that aligns with the key and value
|
||||
|
||||
func isTagKey(key []byte) bool {
|
||||
return strings.Count(string(key), tagKeySeparator) == 3
|
||||
// the hash/primary key
|
||||
func primaryKey(hash []byte) []byte {
|
||||
key, err := orderedcode.Append(
|
||||
nil,
|
||||
types.TxHashKey,
|
||||
string(hash),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func extractValueFromKey(key []byte) string {
|
||||
parts := strings.SplitN(string(key), tagKeySeparator, 3)
|
||||
return parts[1]
|
||||
}
|
||||
|
||||
func keyForEvent(key string, value []byte, result *abci.TxResult) []byte {
|
||||
return []byte(fmt.Sprintf("%s/%s/%d/%d",
|
||||
key,
|
||||
// The event/secondary key
|
||||
func secondaryKey(compositeKey, value string, height int64, index uint32) []byte {
|
||||
key, err := orderedcode.Append(
|
||||
nil,
|
||||
compositeKey,
|
||||
value,
|
||||
result.Height,
|
||||
result.Index,
|
||||
))
|
||||
height,
|
||||
int64(index),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func keyForHeight(result *abci.TxResult) []byte {
|
||||
return []byte(fmt.Sprintf("%s/%d/%d/%d",
|
||||
types.TxHeightKey,
|
||||
result.Height,
|
||||
result.Height,
|
||||
result.Index,
|
||||
))
|
||||
// parseValueFromKey parses an event key and extracts out the value, returning an error if one arises.
|
||||
// This will also involve ensuring that the key has the correct format.
|
||||
// CONTRACT: function doesn't check that the prefix is correct. This should have already been done by the iterator
|
||||
func parseValueFromKey(key []byte) (string, error) {
|
||||
var (
|
||||
compositeKey, value string
|
||||
height, index int64
|
||||
)
|
||||
remaining, err := orderedcode.Parse(string(key), &compositeKey, &value, &height, &index)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(remaining) != 0 {
|
||||
return "", fmt.Errorf("unexpected remainder in key: %s", remaining)
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func startKeyForCondition(c query.Condition, height int64) []byte {
|
||||
func keyFromEvent(compositeKey string, value []byte, result *abci.TxResult) []byte {
|
||||
return secondaryKey(compositeKey, string(value), result.Height, result.Index)
|
||||
}
|
||||
|
||||
func keyFromHeight(result *abci.TxResult) []byte {
|
||||
return secondaryKey(types.TxHeightKey, fmt.Sprintf("%d", result.Height), result.Height, result.Index)
|
||||
}
|
||||
|
||||
// Prefixes: these represent an initial part of the key and are used by iterators to iterate over a small
|
||||
// section of the kv store during searches.
|
||||
|
||||
func prefixFromCompositeKey(compositeKey string) []byte {
|
||||
key, err := orderedcode.Append(nil, compositeKey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func prefixFromCompositeKeyAndValue(compositeKey, value string) []byte {
|
||||
key, err := orderedcode.Append(nil, compositeKey, value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// a small utility function for getting a keys prefix based on a condition and a height
|
||||
func prefixForCondition(c query.Condition, height int64) []byte {
|
||||
key := prefixFromCompositeKeyAndValue(c.CompositeKey, fmt.Sprintf("%v", c.Operand))
|
||||
if height > 0 {
|
||||
return startKey(c.CompositeKey, c.Operand, height)
|
||||
var err error
|
||||
key, err = orderedcode.Append(key, height)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return startKey(c.CompositeKey, c.Operand)
|
||||
}
|
||||
|
||||
func startKey(fields ...interface{}) []byte {
|
||||
var b bytes.Buffer
|
||||
for _, f := range fields {
|
||||
b.Write([]byte(fmt.Sprintf("%v", f) + tagKeySeparator))
|
||||
}
|
||||
return b.Bytes()
|
||||
return key
|
||||
}
|
||||
|
||||
@@ -120,6 +120,12 @@ func TestTxSearch(t *testing.T) {
|
||||
{"account.number EXISTS", 1},
|
||||
// search using EXISTS for non existing key
|
||||
{"account.date EXISTS", 0},
|
||||
// search using height
|
||||
{"account.number = 1 AND tx.height = 1", 1},
|
||||
// search using incorrect height
|
||||
{"account.number = 1 AND tx.height = 3", 0},
|
||||
// search using height only
|
||||
{"tx.height = 1", 1},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
@@ -189,7 +195,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
|
||||
|
||||
err = b.Set(depKey, hash2)
|
||||
require.NoError(t, err)
|
||||
err = b.Set(keyForHeight(txResult2), hash2)
|
||||
err = b.Set(keyFromHeight(txResult2), hash2)
|
||||
require.NoError(t, err)
|
||||
err = b.Set(hash2, rawBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -150,7 +150,7 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
curLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+1), time.Now())
|
||||
currentLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+1), time.Now())
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
@@ -162,10 +162,10 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm
|
||||
state.LastBlockHeight = lastLightBlock.Height
|
||||
state.LastBlockTime = lastLightBlock.Time
|
||||
state.LastBlockID = lastLightBlock.Commit.BlockID
|
||||
state.AppHash = curLightBlock.AppHash
|
||||
state.LastResultsHash = curLightBlock.LastResultsHash
|
||||
state.AppHash = currentLightBlock.AppHash
|
||||
state.LastResultsHash = currentLightBlock.LastResultsHash
|
||||
state.LastValidators = lastLightBlock.ValidatorSet
|
||||
state.Validators = curLightBlock.ValidatorSet
|
||||
state.Validators = currentLightBlock.ValidatorSet
|
||||
state.NextValidators = nextLightBlock.ValidatorSet
|
||||
state.LastHeightValidatorsChanged = nextLightBlock.Height
|
||||
|
||||
@@ -179,12 +179,13 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm
|
||||
return sm.State{}, fmt.Errorf("unable to create RPC client: %w", err)
|
||||
}
|
||||
rpcclient := lightrpc.NewClient(primaryRPC, s.lc)
|
||||
result, err := rpcclient.ConsensusParams(ctx, &nextLightBlock.Height)
|
||||
result, err := rpcclient.ConsensusParams(ctx, ¤tLightBlock.Height)
|
||||
if err != nil {
|
||||
return sm.State{}, fmt.Errorf("unable to fetch consensus parameters for height %v: %w",
|
||||
nextLightBlock.Height, err)
|
||||
}
|
||||
state.ConsensusParams = result.ConsensusParams
|
||||
state.LastHeightConsensusParamsChanged = currentLightBlock.Height
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user