From 5b698ed13bf825e8e681e6818d9ad897275c01a5 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 12 Jan 2021 10:55:47 +0100 Subject: [PATCH 1/4] tx indexer: use different field separator for keys (#5865) --- state/txindex/kv/kv.go | 171 ++++++++++++++++++++++++------------ state/txindex/kv/kv_test.go | 8 +- 2 files changed, 122 insertions(+), 57 deletions(-) diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index b056e9dd4..bef0a2f87 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -1,7 +1,6 @@ package kv import ( - "bytes" "context" "encoding/hex" "fmt" @@ -10,6 +9,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/google/orderedcode" dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" @@ -18,13 +18,12 @@ import ( "github.com/tendermint/tendermint/types" ) -const ( - tagKeySeparator = "/" -) - var _ txindex.TxIndexer = (*TxIndex)(nil) -// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). +// TxIndex is the simplest possible indexer +// It is backed by two kv stores: +// 1. txhash - result (primary key) +// 2. event - txhash (secondary key) type TxIndex struct { store dbm.DB } @@ -43,7 +42,7 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { return nil, txindex.ErrorEmptyHash } - rawBytes, err := txi.store.Get(hash) + rawBytes, err := txi.store.Get(primaryKey(hash)) if err != nil { panic(err) } @@ -78,7 +77,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { } // index by height (always) - err = storeBatch.Set(keyForHeight(result), hash) + err = storeBatch.Set(keyFromHeight(result), hash) if err != nil { return err } @@ -88,7 +87,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { return err } // index by hash (always) - err = storeBatch.Set(hash, rawBytes) + err = storeBatch.Set(primaryKey(hash), rawBytes) if err != nil { return err } @@ -114,7 +113,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { } // index by height (always) - err = b.Set(keyForHeight(result), hash) + err = b.Set(keyFromHeight(result), hash) if err != nil { return err } @@ -124,7 +123,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { return err } // index by hash (always) - err = b.Set(hash, rawBytes) + err = b.Set(primaryKey(hash), rawBytes) if err != nil { return err } @@ -146,8 +145,12 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba // index if `index: true` is set compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) + // ensure event does not conflict with a reserved prefix key + if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { + return fmt.Errorf("event type and attribute key \"%s\" is reserved. Please use a different key", compositeTag) + } if attr.GetIndex() { - err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash) + err := store.Set(keyFromEvent(compositeTag, attr.Value, result), hash) if err != nil { return err } @@ -215,7 +218,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul for _, r := range ranges { if !hashesInitialized { - filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, true) + filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -224,7 +227,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul break } } else { - filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, false) + filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, false) } } } @@ -239,7 +242,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul } if !hashesInitialized { - filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true) + filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -248,7 +251,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul break } } else { - filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false) + filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, false) } } @@ -430,7 +433,7 @@ func (txi *TxIndex) match( case c.Op == query.OpExists: // XXX: can't use startKeyBz here because c.Operand is nil // (e.g. "account.owner//" won't match w/ a single row) - it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey)) + it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey)) if err != nil { panic(err) } @@ -454,18 +457,18 @@ func (txi *TxIndex) match( // XXX: startKey does not apply here. // For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" - it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey)) + it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey)) if err != nil { panic(err) } defer it.Close() for ; it.Valid(); it.Next() { - if !isTagKey(it.Key()) { + value, err := parseValueFromKey(it.Key()) + if err != nil { continue } - - if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { + if strings.Contains(value, c.Operand.(string)) { tmpHashes[string(it.Value())] = it.Value() } @@ -542,12 +545,12 @@ func (txi *TxIndex) matchRange( LOOP: for ; it.Valid(); it.Next() { - if !isTagKey(it.Key()) { + value, err := parseValueFromKey(it.Key()) + if err != nil { continue } - if _, ok := r.AnyBound().(int64); ok { - v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + v, err := strconv.ParseInt(value, 10, 64) if err != nil { continue LOOP } @@ -613,46 +616,102 @@ LOOP: return filteredHashes } -// Keys +// ########################## Keys ############################# +// +// The indexer has two types of kv stores: +// 1. txhash - result (primary key) +// 2. event - txhash (secondary key) +// +// The event key can be decomposed into 4 parts. +// 1. A composite key which can be any string. +// Usually something like "tx.height" or "account.owner" +// 2. A value. That corresponds to the key. In the above +// example the value could be "5" or "Ivan" +// 3. The height of the Tx that aligns with the key and value. +// 4. The index of the Tx that aligns with the key and value -func isTagKey(key []byte) bool { - return strings.Count(string(key), tagKeySeparator) == 3 +// the hash/primary key +func primaryKey(hash []byte) []byte { + key, err := orderedcode.Append( + nil, + types.TxHashKey, + string(hash), + ) + if err != nil { + panic(err) + } + return key } -func extractValueFromKey(key []byte) string { - parts := strings.SplitN(string(key), tagKeySeparator, 3) - return parts[1] -} - -func keyForEvent(key string, value []byte, result *abci.TxResult) []byte { - return []byte(fmt.Sprintf("%s/%s/%d/%d", - key, +// The event/secondary key +func secondaryKey(compositeKey, value string, height int64, index uint32) []byte { + key, err := orderedcode.Append( + nil, + compositeKey, value, - result.Height, - result.Index, - )) + height, + int64(index), + ) + if err != nil { + panic(err) + } + return key } -func keyForHeight(result *abci.TxResult) []byte { - return []byte(fmt.Sprintf("%s/%d/%d/%d", - types.TxHeightKey, - result.Height, - result.Height, - result.Index, - )) +// parseValueFromKey parses an event key and extracts out the value, returning an error if one arises. +// This will also involve ensuring that the key has the correct format. +// CONTRACT: function doesn't check that the prefix is correct. This should have already been done by the iterator +func parseValueFromKey(key []byte) (string, error) { + var ( + compositeKey, value string + height, index int64 + ) + remaining, err := orderedcode.Parse(string(key), &compositeKey, &value, &height, &index) + if err != nil { + return "", err + } + if len(remaining) != 0 { + return "", fmt.Errorf("unexpected remainder in key: %s", remaining) + } + return value, nil } -func startKeyForCondition(c query.Condition, height int64) []byte { +func keyFromEvent(compositeKey string, value []byte, result *abci.TxResult) []byte { + return secondaryKey(compositeKey, string(value), result.Height, result.Index) +} + +func keyFromHeight(result *abci.TxResult) []byte { + return secondaryKey(types.TxHeightKey, fmt.Sprintf("%d", result.Height), result.Height, result.Index) +} + +// Prefixes: these represent an initial part of the key and are used by iterators to iterate over a small +// section of the kv store during searches. + +func prefixFromCompositeKey(compositeKey string) []byte { + key, err := orderedcode.Append(nil, compositeKey) + if err != nil { + panic(err) + } + return key +} + +func prefixFromCompositeKeyAndValue(compositeKey, value string) []byte { + key, err := orderedcode.Append(nil, compositeKey, value) + if err != nil { + panic(err) + } + return key +} + +// a small utility function for getting a keys prefix based on a condition and a height +func prefixForCondition(c query.Condition, height int64) []byte { + key := prefixFromCompositeKeyAndValue(c.CompositeKey, fmt.Sprintf("%v", c.Operand)) if height > 0 { - return startKey(c.CompositeKey, c.Operand, height) + var err error + key, err = orderedcode.Append(key, height) + if err != nil { + panic(err) + } } - return startKey(c.CompositeKey, c.Operand) -} - -func startKey(fields ...interface{}) []byte { - var b bytes.Buffer - for _, f := range fields { - b.Write([]byte(fmt.Sprintf("%v", f) + tagKeySeparator)) - } - return b.Bytes() + return key } diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 9b15c1971..d3980c646 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -120,6 +120,12 @@ func TestTxSearch(t *testing.T) { {"account.number EXISTS", 1}, // search using EXISTS for non existing key {"account.date EXISTS", 0}, + // search using height + {"account.number = 1 AND tx.height = 1", 1}, + // search using incorrect height + {"account.number = 1 AND tx.height = 3", 0}, + // search using height only + {"tx.height = 1", 1}, } ctx := context.Background() @@ -189,7 +195,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { err = b.Set(depKey, hash2) require.NoError(t, err) - err = b.Set(keyForHeight(txResult2), hash2) + err = b.Set(keyFromHeight(txResult2), hash2) require.NoError(t, err) err = b.Set(hash2, rawBytes) require.NoError(t, err) From f05788e632d482d5a6009ec2252ad2d65fb00353 Mon Sep 17 00:00:00 2001 From: Marko Date: Tue, 12 Jan 2021 11:06:33 +0100 Subject: [PATCH 2/4] privval: Query validator key (#5876) ## Description - Query validator key when a remote signer is used. This is supported gRPC remote signing and filePV only. Closes: #3009 --- CHANGELOG_PENDING.md | 3 +- cmd/tendermint/commands/show_validator.go | 39 ++++++++++++++++++----- node/node.go | 31 ++---------------- privval/grpc/util.go | 38 ++++++++++++++++++++++ 4 files changed, 74 insertions(+), 37 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index d91ad4b76..7c76703f1 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -49,7 +49,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [crypto/ed25519] \#5632 Adopt zip215 `ed25519` verification. (@marbar3778) - [privval] \#5603 Add `--key` to `init`, `gen_validator`, `testnet` & `unsafe_reset_priv_validator` for use in generating `secp256k1` keys. -- [privval] \#5725 add gRPC support to private validator. +- [privval] \#5725 Add gRPC support to private validator. +- [privval] \#5876 `tendermint show-validator` will query the remote signer if gRPC is being used (@marbar3778) - [abci/client] \#5673 `Async` requests return an error if queue is full (@melekes) - [mempool] \#5673 Cancel `CheckTx` requests if RPC client disconnects or times out (@melekes) - [abci] \#5706 Added `AbciVersion` to `RequestInfo` allowing applications to check ABCI version when connecting to Tendermint. (@marbar3778) diff --git a/cmd/tendermint/commands/show_validator.go b/cmd/tendermint/commands/show_validator.go index f29976bb3..54b027961 100644 --- a/cmd/tendermint/commands/show_validator.go +++ b/cmd/tendermint/commands/show_validator.go @@ -5,9 +5,12 @@ import ( "github.com/spf13/cobra" + "github.com/tendermint/tendermint/crypto" tmjson "github.com/tendermint/tendermint/libs/json" + tmnet "github.com/tendermint/tendermint/libs/net" tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/privval" + tmgrpc "github.com/tendermint/tendermint/privval/grpc" ) // ShowValidatorCmd adds capabilities for showing the validator info. @@ -20,16 +23,36 @@ var ShowValidatorCmd = &cobra.Command{ } func showValidator(cmd *cobra.Command, args []string) error { - keyFilePath := config.PrivValidatorKeyFile() - if !tmos.FileExists(keyFilePath) { - return fmt.Errorf("private validator file %s does not exist", keyFilePath) - } + var ( + pubKey crypto.PubKey + err error + ) - pv := privval.LoadFilePV(keyFilePath, config.PrivValidatorStateFile()) + //TODO: remove once gRPC is the only supported protocol + protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr) + switch protocol { + case "grpc": + pvsc, err := tmgrpc.DialRemoteSigner(config, config.ChainID(), logger) + if err != nil { + return fmt.Errorf("can't connect to remote validator %w", err) + } + pubKey, err = pvsc.GetPubKey() + if err != nil { + return fmt.Errorf("can't get pubkey: %w", err) + } + default: - pubKey, err := pv.GetPubKey() - if err != nil { - return fmt.Errorf("can't get pubkey: %w", err) + keyFilePath := config.PrivValidatorKeyFile() + if !tmos.FileExists(keyFilePath) { + return fmt.Errorf("private validator file %s does not exist", keyFilePath) + } + + pv := privval.LoadFilePV(keyFilePath, config.PrivValidatorStateFile()) + + pubKey, err = pv.GetPubKey() + if err != nil { + return fmt.Errorf("can't get pubkey: %w", err) + } } bz, err := tmjson.Marshal(pubKey) diff --git a/node/node.go b/node/node.go index d9e504758..06161cde7 100644 --- a/node/node.go +++ b/node/node.go @@ -10,11 +10,9 @@ import ( _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port "time" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" - "google.golang.org/grpc" dbm "github.com/tendermint/tm-db" @@ -678,11 +676,11 @@ func NewNode(config *cfg.Config, // If an address is provided, listen on the socket for a connection from an // external signing process. if config.PrivValidatorListenAddr != "" { - protocol, address := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr) + protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr) // FIXME: we should start services inside OnStart switch protocol { case "grpc": - privValidator, err = createAndStartPrivValidatorGRPCClient(config, address, genDoc.ChainID, logger) + privValidator, err = createAndStartPrivValidatorGRPCClient(config, genDoc.ChainID, logger) if err != nil { return nil, fmt.Errorf("error with private validator grpc client: %w", err) } @@ -1435,33 +1433,10 @@ func createAndStartPrivValidatorSocketClient( func createAndStartPrivValidatorGRPCClient( config *cfg.Config, - address, chainID string, logger log.Logger, ) (types.PrivValidator, error) { - var transportSecurity grpc.DialOption - if config.BaseConfig.ArePrivValidatorClientSecurityOptionsPresent() { - transportSecurity = tmgrpc.GenerateTLS(config.PrivValidatorClientCertificateFile(), - config.PrivValidatorClientKeyFile(), config.PrivValidatorRootCAFile(), logger) - } else { - transportSecurity = grpc.WithInsecure() - logger.Info("Using an insecure gRPC connection!") - } - dialOptions := tmgrpc.DefaultDialOptions() - if config.Instrumentation.Prometheus { - grpcMetrics := grpc_prometheus.DefaultClientMetrics - dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor())) - } - - dialOptions = append(dialOptions, transportSecurity) - - ctx := context.Background() - - conn, err := grpc.DialContext(ctx, address, dialOptions...) - if err != nil { - logger.Error("unable to connect to server", "target", address, "err", err) - } - pvsc, err := tmgrpc.NewSignerClient(conn, chainID, logger) + pvsc, err := tmgrpc.DialRemoteSigner(config, chainID, logger) if err != nil { return nil, fmt.Errorf("failed to start private validator: %w", err) } diff --git a/privval/grpc/util.go b/privval/grpc/util.go index 66b5397b6..a70ab54bf 100644 --- a/privval/grpc/util.go +++ b/privval/grpc/util.go @@ -1,6 +1,7 @@ package grpc import ( + "context" "crypto/tls" "crypto/x509" "io/ioutil" @@ -8,7 +9,11 @@ import ( "time" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" + tmnet "github.com/tendermint/tendermint/libs/net" grpc "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -80,3 +85,36 @@ func GenerateTLS(certPath, keyPath, ca string, log log.Logger) grpc.DialOption { return grpc.WithTransportCredentials(transportCreds) } + +// DialRemoteSigner is a generalized function to dial the gRPC server. +func DialRemoteSigner( + config *cfg.Config, + chainID string, + logger log.Logger, +) (*SignerClient, error) { + var transportSecurity grpc.DialOption + if config.BaseConfig.ArePrivValidatorClientSecurityOptionsPresent() { + transportSecurity = GenerateTLS(config.PrivValidatorClientCertificateFile(), + config.PrivValidatorClientKeyFile(), config.PrivValidatorRootCAFile(), logger) + } else { + transportSecurity = grpc.WithInsecure() + logger.Info("Using an insecure gRPC connection!") + } + + dialOptions := DefaultDialOptions() + if config.Instrumentation.Prometheus { + grpcMetrics := grpc_prometheus.DefaultClientMetrics + dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor())) + } + + dialOptions = append(dialOptions, transportSecurity) + + ctx := context.Background() + _, address := tmnet.ProtocolAndAddress(config.PrivValidatorListenAddr) + conn, err := grpc.DialContext(ctx, address, dialOptions...) + if err != nil { + logger.Error("unable to connect to server", "target", address, "err", err) + } + + return NewSignerClient(conn, chainID, logger) +} From 956b59af87b4de06e7e72d61c2fad2f6a6d20241 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 12 Jan 2021 12:50:49 +0100 Subject: [PATCH 3/4] evidence: buffer evidence from consensus (#5890) --- CHANGELOG_PENDING.md | 2 ++ evidence/pool.go | 57 +++++++++++++++++++++++++++++-------------- evidence/pool_test.go | 24 +++++++++++++++--- 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 7c76703f1..1da463e37 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -66,3 +66,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash) - [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes) - [blockchain/v1] \#5711 Fix deadlock (@melekes) +- [evidence] \#5890 Add a buffer to evidence from consensus to avoid broadcasting and proposing evidence before the +height of such an evidence has finished (@cmwaters) \ No newline at end of file diff --git a/evidence/pool.go b/evidence/pool.go index cf425d988..6cb47069f 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -43,6 +43,10 @@ type Pool struct { mtx sync.Mutex // latest state state sm.State + // evidence from consensus if buffered to this slice, awaiting until the next height + // before being flushed to the pool. This prevents broadcasting and proposing of + // evidence before the height with which the evidence happened is finished. + consensusBuffer []types.Evidence pruningHeight int64 pruningTime time.Time @@ -57,12 +61,13 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore } pool := &Pool{ - stateDB: stateDB, - blockStore: blockStore, - state: state, - logger: logger, - evidenceStore: evidenceDB, - evidenceList: clist.New(), + stateDB: stateDB, + blockStore: blockStore, + state: state, + logger: logger, + evidenceStore: evidenceDB, + evidenceList: clist.New(), + consensusBuffer: make([]types.Evidence, 0), } // If pending evidence already in db, in event of prior failure, then check @@ -115,7 +120,14 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { "last_block_time", state.LastBlockTime, ) - evpool.updateState(state) + evpool.mtx.Lock() + // flush awaiting evidence from consensus into pool + evpool.flushConsensusBuffer() + // update state + evpool.state = state + evpool.mtx.Unlock() + + // move committed evidence out from the pending pool and into the committed pool evpool.markEvidenceAsCommitted(ev) // Prune pending evidence when it has expired. This also updates when the next @@ -170,14 +182,14 @@ func (evpool *Pool) AddEvidenceFromConsensus(ev types.Evidence) error { return nil } - if err := evpool.addPendingEvidence(ev); err != nil { - return fmt.Errorf("failed to add evidence to pending list: %w", err) - } + // add evidence to a buffer which will pass the evidence to the pool at the following height. + // This avoids the issue of some nodes verifying and proposing evidence at a height where the + // block hasn't been committed on cause others to potentially fail. + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + evpool.consensusBuffer = append(evpool.consensusBuffer, ev) + evpool.logger.Info("received new evidence of byzantine behavior from consensus", "evidence", ev) - // add evidence to be gossiped with peers - evpool.evidenceList.PushBack(ev) - - evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev) return nil } @@ -520,10 +532,19 @@ func (evpool *Pool) removeEvidenceFromList( } } -func (evpool *Pool) updateState(state sm.State) { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() - evpool.state = state +// flushConsensusBuffer moves the evidence produced from consensus into the evidence pool +// and list so that it can be broadcasted and proposed +func (evpool *Pool) flushConsensusBuffer() { + for _, ev := range evpool.consensusBuffer { + if err := evpool.addPendingEvidence(ev); err != nil { + evpool.logger.Error("failed to flush evidence from consensus buffer to pending list: %w", err) + continue + } + + evpool.evidenceList.PushBack(ev) + } + // reset consensus buffer + evpool.consensusBuffer = make([]types.Evidence, 0) } func bytesToEv(evBytes []byte) (types.Evidence, error) { diff --git a/evidence/pool_test.go b/evidence/pool_test.go index f9b540ac0..a246219d5 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -143,13 +143,31 @@ func TestAddEvidenceFromConsensus(t *testing.T) { ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, val, evidenceChainID) require.NoError(t, pool.AddEvidenceFromConsensus(ev)) + + // evidence from consensus should not be added immediately but reside in the consensus buffer + evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Empty(t, evList) + require.Zero(t, evSize) + next := pool.EvidenceFront() - require.Equal(t, ev, next.Value.(types.Evidence)) + require.Nil(t, next) + + // move to next height and update state and evidence pool + state := pool.State() + state.LastBlockHeight++ + pool.Update(state, []types.Evidence{}) + + // should be able to retrieve evidence from pool + evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Equal(t, []types.Evidence{ev}, evList) // shouldn't be able to submit the same evidence twice require.NoError(t, pool.AddEvidenceFromConsensus(ev)) - evs, _ := pool.PendingEvidence(defaultEvidenceMaxBytes) - require.Equal(t, 1, len(evs)) + state = pool.State() + state.LastBlockHeight++ + pool.Update(state, []types.Evidence{}) + evList2, _ := pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Equal(t, evList, evList2) } func TestEvidencePoolUpdate(t *testing.T) { From bada08c50cfc529fe6d8b6e50bfc4ff2c360e6ad Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 12 Jan 2021 14:41:16 +0100 Subject: [PATCH 4/4] state sync: last consensus params height is not set (#5889) --- CHANGELOG_PENDING.md | 3 ++- state/store.go | 3 ++- statesync/stateprovider.go | 11 ++++++----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1da463e37..3306fe280 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -67,4 +67,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes) - [blockchain/v1] \#5711 Fix deadlock (@melekes) - [evidence] \#5890 Add a buffer to evidence from consensus to avoid broadcasting and proposing evidence before the -height of such an evidence has finished (@cmwaters) \ No newline at end of file +height of such an evidence has finished (@cmwaters) +- [statesync] \#5889 Set `LastHeightConsensusParamsChanged` when bootstrapping Tendermint state (@cmwaters) \ No newline at end of file diff --git a/state/store.go b/state/store.go index 4718d8465..6c1dd5c0d 100644 --- a/state/store.go +++ b/state/store.go @@ -222,7 +222,8 @@ func (store dbStore) Bootstrap(state State) error { return err } - if err := store.saveConsensusParamsInfo(height, height, state.ConsensusParams); err != nil { + if err := store.saveConsensusParamsInfo(height, + state.LastHeightConsensusParamsChanged, state.ConsensusParams); err != nil { return err } diff --git a/statesync/stateprovider.go b/statesync/stateprovider.go index 17aeed77c..426e3361f 100644 --- a/statesync/stateprovider.go +++ b/statesync/stateprovider.go @@ -150,7 +150,7 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm if err != nil { return sm.State{}, err } - curLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+1), time.Now()) + currentLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+1), time.Now()) if err != nil { return sm.State{}, err } @@ -162,10 +162,10 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm state.LastBlockHeight = lastLightBlock.Height state.LastBlockTime = lastLightBlock.Time state.LastBlockID = lastLightBlock.Commit.BlockID - state.AppHash = curLightBlock.AppHash - state.LastResultsHash = curLightBlock.LastResultsHash + state.AppHash = currentLightBlock.AppHash + state.LastResultsHash = currentLightBlock.LastResultsHash state.LastValidators = lastLightBlock.ValidatorSet - state.Validators = curLightBlock.ValidatorSet + state.Validators = currentLightBlock.ValidatorSet state.NextValidators = nextLightBlock.ValidatorSet state.LastHeightValidatorsChanged = nextLightBlock.Height @@ -179,12 +179,13 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm return sm.State{}, fmt.Errorf("unable to create RPC client: %w", err) } rpcclient := lightrpc.NewClient(primaryRPC, s.lc) - result, err := rpcclient.ConsensusParams(ctx, &nextLightBlock.Height) + result, err := rpcclient.ConsensusParams(ctx, ¤tLightBlock.Height) if err != nil { return sm.State{}, fmt.Errorf("unable to fetch consensus parameters for height %v: %w", nextLightBlock.Height, err) } state.ConsensusParams = result.ConsensusParams + state.LastHeightConsensusParamsChanged = currentLightBlock.Height return state, nil }