Merge branch 'master' into conr2d/doc-go-built-in

This commit is contained in:
Jeeyong Um
2021-08-25 11:05:40 +09:00
committed by GitHub
53 changed files with 2122 additions and 356 deletions

View File

@@ -50,7 +50,7 @@ jobs:
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Publish to Docker Hub
uses: docker/build-push-action@v2.6.1
uses: docker/build-push-action@v2.7.0
with:
context: .
file: ./DOCKER/Dockerfile

View File

@@ -43,7 +43,7 @@ jobs:
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Publish to Docker Hub
uses: docker/build-push-action@v2.6.1
uses: docker/build-push-action@v2.7.0
with:
context: ./tools/proto
file: ./tools/proto/Dockerfile

View File

@@ -2,7 +2,7 @@ name: "Release"
on:
push:
branches:
branches:
- "RC[0-9]/**"
tags:
- "v[0-9]+.[0-9]+.[0-9]+" # Push events to matching v*, i.e. v1.0, v20.15.10
@@ -20,9 +20,6 @@ jobs:
with:
go-version: '1.16'
- run: echo https://github.com/tendermint/tendermint/blob/${GITHUB_REF#refs/tags/}/CHANGELOG.md#${GITHUB_REF#refs/tags/} > ../release_notes.md
if: startsWith(github.ref, 'refs/tags/')
- name: Build
uses: goreleaser/goreleaser-action@v2
if: ${{ github.event_name == 'pull_request' }}
@@ -35,6 +32,6 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
with:
version: latest
args: release --rm-dist --release-notes=../release_notes.md
args: release --rm-dist
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -24,8 +24,10 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [fastsync/rpc] \#6620 Add TotalSyncedTime & RemainingTime to SyncInfo in /status RPC (@JayT106)
- [rpc/grpc] \#6725 Mark gRPC in the RPC layer as deprecated.
- [blockchain/v2] \#6730 Fast Sync v2 is deprecated, please use v0
- [rpc] Add genesis_chunked method to support paginated and parallel fetching of large genesis documents.
- [rpc/jsonrpc/server] \#6785 `Listen` function updated to take an `int` argument, `maxOpenConnections`, instead of an entire config object. (@williambanfield)
- [rpc] \#6820 Update RPC methods to reflect changes in the p2p layer, disabling support for `UnsafeDialPeers` and `UnsafeDialPeers` when used with the new p2p layer, and changing the response format of the peer list in `NetInfo` for all users.
- [cli] \#6854 Remove deprecated snake case commands. (@tychoish)
- Apps
- [ABCI] \#6408 Change the `key` and `value` fields from `[]byte` to `string` in the `EventAttribute` type. (@alexanderbez)
- [ABCI] \#5447 Remove `SetOption` method from `ABCI.Client` interface
@@ -105,6 +107,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106)
- [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106)
- [statesync/event] \#6700 Emit statesync status start/end event (@JayT106)
- [inspect] \#6785 Add a new `inspect` command for introspecting the state and block store of a crashed tendermint node. (@williambanfield)
### IMPROVEMENTS
@@ -158,4 +161,4 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [rpc] \#6507 Ensure RPC client can handle URLs without ports (@JayT106)
- [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters)
- [fastsync] \#6590 Update the metrics during fast-sync (@JayT106)
- [gitignore] \#6668 Fix gitignore of abci-cli (@tanyabouman)
- [gitignore] \#6668 Fix gitignore of abci-cli (@tanyabouman)

View File

@@ -328,7 +328,6 @@ If there were no release candidates, begin by creating a backport branch, as des
- Bump TMVersionDefault version in `version.go`
- Bump P2P and block protocol versions in `version.go`, if necessary
- Bump ABCI protocol version in `version.go`, if necessary
- Add any release notes you would like to be added to the body of the release to `release_notes.md`.
4. Open a PR with these changes against the backport branch.
5. Once these changes are on the backport branch, push a tag with prepared release details.
This will trigger the actual release `v0.35.0`.
@@ -355,7 +354,6 @@ To create a minor release:
- Bump the ABCI version number, if necessary.
(Note that ABCI follows semver, and that ABCI versions are the only versions
which can change during minor releases, and only field additions are valid minor changes.)
- Add any release notes you would like to be added to the body of the release to `release_notes.md`.
4. Open a PR with these changes that will land them back on `v0.35.x`
5. Once this change has landed on the backport branch, make sure to pull it locally, then push a tag.
- `git tag -a v0.35.1 -m 'Release v0.35.1'`

View File

@@ -12,11 +12,9 @@ import (
// GenNodeKeyCmd allows the generation of a node key. It prints JSON-encoded
// NodeKey to the standard output.
var GenNodeKeyCmd = &cobra.Command{
Use: "gen-node-key",
Aliases: []string{"gen_node_key"},
Short: "Generate a new node key",
RunE: genNodeKey,
PreRun: deprecateSnakeCase,
Use: "gen-node-key",
Short: "Generate a new node key",
RunE: genNodeKey,
}
func genNodeKey(cmd *cobra.Command, args []string) error {

View File

@@ -13,11 +13,9 @@ import (
// GenValidatorCmd allows the generation of a keypair for a
// validator.
var GenValidatorCmd = &cobra.Command{
Use: "gen-validator",
Aliases: []string{"gen_validator"},
Short: "Generate new validator keypair",
RunE: genValidator,
PreRun: deprecateSnakeCase,
Use: "gen-validator",
Short: "Generate new validator keypair",
RunE: genValidator,
}
func init() {

View File

@@ -0,0 +1,87 @@
package commands
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/spf13/cobra"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
// InspectCmd is the command for starting an inspect server.
var InspectCmd = &cobra.Command{
Use: "inspect",
Short: "Run an inspect server for investigating Tendermint state",
Long: `
inspect runs a subset of Tendermint's RPC endpoints that are useful for debugging
issues with Tendermint.
When the Tendermint consensus engine detects inconsistent state, it will crash the
tendermint process. Tendermint will not start up while in this inconsistent state.
The inspect command can be used to query the block and state store using Tendermint
RPC calls to debug issues of inconsistent state.
`,
RunE: runInspect,
}
func init() {
InspectCmd.Flags().
String("rpc.laddr",
config.RPC.ListenAddress, "RPC listenener address. Port required")
InspectCmd.Flags().
String("db-backend",
config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb")
InspectCmd.Flags().
String("db-dir", config.DBPath, "database directory")
}
func runInspect(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-c
cancel()
}()
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return err
}
blockStore := store.NewBlockStore(blockStoreDB)
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config})
if err != nil {
if err := blockStoreDB.Close(); err != nil {
logger.Error("error closing block store db", "error", err)
}
return err
}
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return err
}
sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return err
}
stateStore := state.NewStore(stateDB)
ins := inspect.New(config.RPC, blockStore, stateStore, sinks, logger)
logger.Info("starting inspect server")
if err := ins.Run(ctx); err != nil {
return err
}
return nil
}

View File

@@ -11,11 +11,9 @@ import (
// ProbeUpnpCmd adds capabilities to test the UPnP functionality.
var ProbeUpnpCmd = &cobra.Command{
Use: "probe-upnp",
Aliases: []string{"probe_upnp"},
Short: "Test UPnP functionality",
RunE: probeUpnp,
PreRun: deprecateSnakeCase,
Use: "probe-upnp",
Short: "Test UPnP functionality",
RunE: probeUpnp,
}
func probeUpnp(cmd *cobra.Command, args []string) error {

View File

@@ -31,7 +31,7 @@ var ReIndexEventCmd = &cobra.Command{
Long: `
reindex-event is an offline tooling to re-index block and tx events to the eventsinks,
you can run this command when the event store backend dropped/disconnected or you want to replace the backend.
The default start-height is 0, meaning the tooling will start reindex from the base block height(inclusive); and the
The default start-height is 0, meaning the tooling will start reindex from the base block height(inclusive); and the
default end-height is 0, meaning the tooling will reindex until the latest block height(inclusive). User can omits
either or both arguments.
`,
@@ -106,7 +106,7 @@ func loadEventSinks(cfg *tmcfg.Config) ([]indexer.EventSink, error) {
if conn == "" {
return nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
es, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, err
}

View File

@@ -17,11 +17,9 @@ var ReplayCmd = &cobra.Command{
// ReplayConsoleCmd allows replaying of messages from the WAL in a
// console.
var ReplayConsoleCmd = &cobra.Command{
Use: "replay-console",
Aliases: []string{"replay_console"},
Short: "Replay messages from WAL in a console",
Use: "replay-console",
Short: "Replay messages from WAL in a console",
Run: func(cmd *cobra.Command, args []string) {
consensus.RunReplayFile(config.BaseConfig, config.Consensus, true)
},
PreRun: deprecateSnakeCase,
}

View File

@@ -14,11 +14,9 @@ import (
// ResetAllCmd removes the database of this Tendermint core
// instance.
var ResetAllCmd = &cobra.Command{
Use: "unsafe-reset-all",
Aliases: []string{"unsafe_reset_all"},
Short: "(unsafe) Remove all the data and WAL, reset this node's validator to genesis state",
RunE: resetAll,
PreRun: deprecateSnakeCase,
Use: "unsafe-reset-all",
Short: "(unsafe) Remove all the data and WAL, reset this node's validator to genesis state",
RunE: resetAll,
}
var keepAddrBook bool
@@ -31,11 +29,9 @@ func init() {
// ResetPrivValidatorCmd resets the private validator files.
var ResetPrivValidatorCmd = &cobra.Command{
Use: "unsafe-reset-priv-validator",
Aliases: []string{"unsafe_reset_priv_validator"},
Short: "(unsafe) Reset this node's validator to genesis state",
RunE: resetPrivValidator,
PreRun: deprecateSnakeCase,
Use: "unsafe-reset-priv-validator",
Short: "(unsafe) Reset this node's validator to genesis state",
RunE: resetPrivValidator,
}
// XXX: this is totally unsafe.

View File

@@ -2,7 +2,6 @@ package commands
import (
"fmt"
"strings"
"time"
"github.com/spf13/cobra"
@@ -65,10 +64,3 @@ var RootCmd = &cobra.Command{
return nil
},
}
// deprecateSnakeCase is a util function for 0.34.1. Should be removed in 0.35
func deprecateSnakeCase(cmd *cobra.Command, args []string) {
if strings.Contains(cmd.CalledAs(), "_") {
fmt.Println("Deprecated: snake_case commands will be replaced by hyphen-case commands in the next major release")
}
}

View File

@@ -8,11 +8,9 @@ import (
// ShowNodeIDCmd dumps node's ID to the standard output.
var ShowNodeIDCmd = &cobra.Command{
Use: "show-node-id",
Aliases: []string{"show_node_id"},
Short: "Show this node's ID",
RunE: showNodeID,
PreRun: deprecateSnakeCase,
Use: "show-node-id",
Short: "Show this node's ID",
RunE: showNodeID,
}
func showNodeID(cmd *cobra.Command, args []string) error {

View File

@@ -16,11 +16,9 @@ import (
// ShowValidatorCmd adds capabilities for showing the validator info.
var ShowValidatorCmd = &cobra.Command{
Use: "show-validator",
Aliases: []string{"show_validator"},
Short: "Show this node's validator info",
RunE: showValidator,
PreRun: deprecateSnakeCase,
Use: "show-validator",
Short: "Show this node's validator info",
RunE: showValidator,
}
func showValidator(cmd *cobra.Command, args []string) error {

View File

@@ -28,6 +28,7 @@ func main() {
cmd.ShowNodeIDCmd,
cmd.GenNodeKeyCmd,
cmd.VersionCmd,
cmd.InspectCmd,
cmd.MakeKeyMigrateCommand(),
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),

View File

@@ -98,3 +98,5 @@ Note the context/background should be written in the present tense.
- [ADR-045: ABCI-Evidence](./adr-045-abci-evidence.md)
- [ADR-057: RPC](./adr-057-RPC.md)
- [ADR-069: Node Initialization](./adr-069-flexible-node-initialization.md)
- [ADR-071: Proposer-Based Timestamps](adr-071-proposer-based-timestamps.md)
- [ADR-072: Restore Requests for Comments](./adr-072-request-for-comments.md)

View File

@@ -0,0 +1,445 @@
# ADR 71: Proposer-Based Timestamps
* [Changelog](#changelog)
* [Status](#status)
* [Context](#context)
* [Alternative Approaches](#alternative-approaches)
* [Remove timestamps altogether](#remove-timestamps-altogether)
* [Decision](#decision)
* [Detailed Design](#detailed-design)
* [Overview](#overview)
* [Proposal Timestamp and Block Timestamp](#proposal-timestamp-and-block-timestamp)
* [Saving the timestamp across heights](#saving-the-timestamp-across-heights)
* [Changes to `CommitSig`](#changes-to-commitsig)
* [Changes to `Commit`](#changes-to-commit)
* [Changes to `Vote` messages](#changes-to-vote-messages)
* [New consensus parameters](#new-consensus-parameters)
* [Changes to `Header`](#changes-to-header)
* [Changes to the block proposal step](#changes-to-the-block-proposal-step)
* [Proposer selects proposal timestamp](#proposer-selects-proposal-timestamp)
* [Proposer selects block timestamp](#proposer-selects-block-timestamp)
* [Proposer waits](#proposer-waits)
* [Changes to the propose step timeout](#changes-to-the-propose-step-timeout)
* [Changes to validation rules](#changes-to-validation-rules)
* [Proposal timestamp validation](#proposal-timestamp-validation)
* [Block timestamp validation](#block-timestamp-validation)
* [Changes to the prevote step](#changes-to-the-prevote-step)
* [Changes to the precommit step](#changes-to-the-precommit-step)
* [Changes to locking a block](#changes-to-locking-a-block)
* [Remove voteTime Completely](#remove-votetime-completely)
* [Future Improvements](#future-improvements)
* [Consequences](#consequences)
* [Positive](#positive)
* [Neutral](#neutral)
* [Negative](#negative)
* [References](#references)
## Changelog
- July 15 2021: Created by @williambanfield
- Aug 4 2021: Draft completed by @williambanfield
- Aug 5 2021: Draft updated to include data structure changes by @williambanfield
- Aug 20 2021: Language edits completed by @williambanfield
## Status
**Accepted**
## Context
Tendermint currently provides a monotonically increasing source of time known as [BFTTime](https://github.com/tendermint/spec/blob/master/spec/consensus/bft-time.md).
This mechanism for producing a source of time is reasonably simple.
Each correct validator adds a timestamp to each `Precommit` message it sends.
The timestamp it sends is either the validator's current known Unix time or one millisecond greater than the previous block time, depending on which value is greater.
When a block is produced, the proposer chooses the block timestamp as the weighted median of the times in all of the `Precommit` messages the proposer received.
The weighting is proportional to the amount of voting power, or stake, a validator has on the network.
This mechanism for producing timestamps is both deterministic and byzantine fault tolerant.
This current mechanism for producing timestamps has a few drawbacks.
Validators do not have to agree at all on how close the selected block timestamp is to their own currently known Unix time.
Additionally, any amount of voting power `>1/3` may directly control the block timestamp.
As a result, it is quite possible that the timestamp is not particularly meaningful.
These drawbacks present issues in the Tendermint protocol.
Timestamps are used by light clients to verify blocks.
Light clients rely on correspondence between their own currently known Unix time and the block timestamp to verify blocks they see;
However, their currently known Unix time may be greatly divergent from the block timestamp as a result of the limitations of `BFTTime`.
The proposer-based timestamps specification suggests an alternative approach for producing block timestamps that remedies these issues.
Proposer-based timestamps alter the current mechanism for producing block timestamps in two main ways:
1. The block proposer is amended to offer up its currently known Unix time as the timestamp for the next block.
1. Correct validators only approve the proposed block timestamp if it is close enough to their own currently known Unix time.
The result of these changes is a more meaningful timestamp that cannot be controlled by `<= 2/3` of the validator voting power.
This document outlines the necessary code changes in Tendermint to implement the corresponding [proposer-based timestamps specification](https://github.com/tendermint/spec/tree/master/spec/consensus/proposer-based-timestamp).
## Alternative Approaches
### Remove timestamps altogether
Computer clocks are bound to skew for a variety of reasons.
Using timestamps in our protocol means either accepting the timestamps as not reliable or impacting the protocols liveness guarantees.
This design requires impacting the protocols liveness in order to make the timestamps more reliable.
An alternate approach is to remove timestamps altogether from the block protocol.
`BFTTime` is deterministic but may be arbitrarily inaccurate.
However, having a reliable source of time is quite useful for applications and protocols built on top of a blockchain.
We therefore decided not to remove the timestamp.
Applications often wish for some transactions to occur on a certain day, on a regular period, or after some time following a different event.
All of these require some meaningful representation of agreed upon time.
The following protocols and application features require a reliable source of time:
* Tendermint Light Clients [rely on correspondence between their known time](https://github.com/tendermint/spec/blob/master/spec/light-client/verification/README.md#definitions-1) and the block time for block verification.
* Tendermint Evidence validity is determined [either in terms of heights or in terms of time](https://github.com/tendermint/spec/blob/8029cf7a0fcc89a5004e173ec065aa48ad5ba3c8/spec/consensus/evidence.md#verification).
* Unbonding of staked assets in the Cosmos Hub [occurs after a period of 21 days](https://github.com/cosmos/governance/blob/ce75de4019b0129f6efcbb0e752cd2cc9e6136d3/params-change/Staking.md#unbondingtime).
* IBC packets can use either a [timestamp or a height to timeout packet delivery](https://docs.cosmos.network/v0.43/ibc/overview.html#acknowledgements).
Finally, inflation distribution in the Cosmos Hub uses an approximation of time to calculate an annual percentage rate.
This approximation of time is calculated using [block heights with an estimated number of blocks produced in a year](https://github.com/cosmos/governance/blob/master/params-change/Mint.md#blocksperyear).
Proposer-based timestamps will allow this inflation calculation to use a more meaningful and accurate source of time.
## Decision
Implement proposer-based timestamps and remove `BFTTime`.
## Detailed Design
### Overview
Implementing proposer-based timestamps will require a few changes to Tendermints code.
These changes will be to the following components:
* The `internal/consensus/` package.
* The `state/` package.
* The `Vote`, `CommitSig`, `Commit` and `Header` types.
* The consensus parameters.
### Proposal Timestamp and Block Timestamp
This design discusses two timestamps: (1) The timestamp in the block and (2) the timestamp in the proposal message.
The existence and use of both of these timestamps can get a bit confusing, so some background is given here to clarify their uses.
The [proposal message currently has a timestamp](https://github.com/tendermint/tendermint/blob/e5312942e30331e7c42b75426da2c6c9c00ae476/types/proposal.go#L31).
This timestamp is the current Unix time known to the proposer when sending the `Proposal` message.
This timestamp is not currently used as part of consensus.
The changes in this ADR will begin using the proposal message timestamp as part of consensus.
We will refer to this as the **proposal timestamp** throughout this design.
The block has a timestamp field [in the header](https://github.com/tendermint/tendermint/blob/dc7c212c41a360bfe6eb38a6dd8c709bbc39aae7/types/block.go#L338).
This timestamp is set currently as part of Tendermints `BFTtime` algorithm.
It is set when a block is proposed and it is checked by the validators when they are deciding to prevote the block.
This field will continue to be used but the logic for creating and validating this timestamp will change.
We will refer to this as the **block timestamp** throughout this design.
At a high level, the proposal timestamp from height `H` is used as the block timestamp at height `H+1`.
The following image shows this relationship.
The rest of this document describes the code changes that will make this possible.
![](./img/pbts-message.png)
### Saving the timestamp across heights
Currently, `BFTtime` uses `LastCommit` to construct the block timestamp.
The `LastCommit` is created at height `H-1` and is saved in the state store to be included in the block at height `H`.
`BFTtime` takes the weighted median of the timestamps in `LastCommit.CommitSig` to build the timestamp for height `H`.
For proposer-based timestamps, the `LastCommit.CommitSig` timestamps will no longer be used to build the timestamps for height `H`.
Instead, the proposal timestamp from height `H-1` will become the block timestamp for height `H`.
To enable this, we will add a `Timestamp` field to the `Commit` struct.
This field will be populated at each height with the proposal timestamp decided on at the previous height.
This timestamp will also be saved with the rest of the commit in the state store [when the commit is finalized](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/internal/consensus/state.go#L1611) so that it can be recovered if Tendermint crashes.
Changes to the `CommitSig` and `Commit` struct are detailed below.
### Changes to `CommitSig`
The [CommitSig](https://github.com/tendermint/tendermint/blob/a419f4df76fe4aed668a6c74696deabb9fe73211/types/block.go#L604) struct currently contains a timestamp.
This timestamp is the current Unix time known to the validator when it issued a `Precommit` for the block.
This timestamp is no longer used and will be removed in this change.
`CommitSig` will be updated as follows:
```diff
type CommitSig struct {
BlockIDFlag BlockIDFlag `json:"block_id_flag"`
ValidatorAddress Address `json:"validator_address"`
-- Timestamp time.Time `json:"timestamp"`
Signature []byte `json:"signature"`
}
```
### Changes to `Commit`
The [Commit](https://github.com/tendermint/tendermint/blob/a419f4df76fe4aed668a6c74696deabb9fe73211/types/block.go#L746) struct does not currently contain a timestamp.
The timestamps in the `Commit.CommitSig` entries are currently used to build the block timestamp.
With these timestamps removed, the commit time will instead be stored in the `Commit` struct.
`Commit` will be updated as follows.
```diff
type Commit struct {
Height int64 `json:"height"`
Round int32 `json:"round"`
++ Timestamp time.Time `json:"timestamp"`
BlockID BlockID `json:"block_id"`
Signatures []CommitSig `json:"signatures"`
}
```
### Changes to `Vote` messages
`Precommit` and `Prevote` messages use a common [Vote struct](https://github.com/tendermint/tendermint/blob/a419f4df76fe4aed668a6c74696deabb9fe73211/types/vote.go#L50).
This struct currently contains a timestamp.
This timestamp is set using the [voteTime](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/internal/consensus/state.go#L2241) function and therefore vote times correspond to the current Unix time known to the validator.
For precommits, this timestamp is used to construct the [CommitSig that is included in the block in the LastCommit](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/types/block.go#L754) field.
For prevotes, this field is unused.
Proposer-based timestamps will use the [RoundState.Proposal](https://github.com/tendermint/tendermint/blob/c3ae6f5b58e07b29c62bfdc5715b6bf8ae5ee951/internal/consensus/types/round_state.go#L76) timestamp to construct the `signedBytes` `CommitSig`.
This timestamp is therefore no longer useful and will be dropped.
`Vote` will be updated as follows:
```diff
type Vote struct {
Type tmproto.SignedMsgType `json:"type"`
Height int64 `json:"height"`
Round int32 `json:"round"`
BlockID BlockID `json:"block_id"` // zero if vote is nil.
-- Timestamp time.Time `json:"timestamp"`
ValidatorAddress Address `json:"validator_address"`
ValidatorIndex int32 `json:"validator_index"`
Signature []byte `json:"signature"`
}
```
### New consensus parameters
The proposer-based timestamp specification includes multiple new parameters that must be the same among all validators.
These parameters are `PRECISION`, `MSGDELAY`, and `ACCURACY`.
The `PRECISION` and `MSGDELAY` parameters are used to determine if the proposed timestamp is acceptable.
A validator will only Prevote a proposal if the proposal timestamp is considered `timely`.
A proposal timestamp is considered `timely` if it is within `PRECISION` and `MSGDELAY` of the Unix time known to the validator.
More specifically, a proposal timestamp is `timely` if `validatorLocalTime - PRECISION < proposalTime < validatorLocalTime + PRECISION + MSGDELAY`.
Because the `PRECISION` and `MSGDELAY` parameters must be the same across all validators, they will be added to the [consensus parameters](https://github.com/tendermint/tendermint/blob/master/proto/tendermint/types/params.proto#L13) as [durations](https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#google.protobuf.Duration).
The proposer-based timestamp specification also includes a [new ACCURACY parameter](https://github.com/tendermint/spec/blob/master/spec/consensus/proposer-based-timestamp/pbts-sysmodel_001_draft.md#pbts-clocksync-external0).
Intuitively, `ACCURACY` represents the difference between the real time and the currently known time of correct validators.
The currently known Unix time of any validator is always somewhat different from real time.
`ACCURACY` is the largest such difference between each validator's time and real time taken as an absolute value.
This is not something a computer can determine on its own and must be specified as an estimate by community running a Tendermint-based chain.
It is used in the new algorithm to [calculate a timeout for the propose step](https://github.com/tendermint/spec/blob/master/spec/consensus/proposer-based-timestamp/pbts-algorithm_001_draft.md#pbts-alg-startround0).
`ACCURACY` is assumed to be the same across all validators and therefore should be included as a consensus parameter.
The consensus will be updated to include this `Timestamp` field as follows:
```diff
type ConsensusParams struct {
Block BlockParams `json:"block"`
Evidence EvidenceParams `json:"evidence"`
Validator ValidatorParams `json:"validator"`
Version VersionParams `json:"version"`
++ Timestamp TimestampParams `json:"timestamp"`
}
```
```go
type TimestampParams struct {
Accuracy time.Duration `json:"accuracy"`
Precision time.Duration `json:"precision"`
MsgDelay time.Duration `json:"msg_delay"`
}
```
### Changes to `Header`
The [Header](https://github.com/tendermint/tendermint/blob/a419f4df76fe4aed668a6c74696deabb9fe73211/types/block.go#L338) struct currently contains a timestamp.
This timestamp is set as the `BFTtime` derived from the block's `LastCommit.CommitSig` timestamps.
This timestamp will no longer be derived from the `LastCommit.CommitSig` timestamps and will instead be included directly into the block's `LastCommit`.
This timestamp will therfore be identical in both the `Header` and the `LastCommit`.
To clarify that the timestamp in the header corresponds to the `LastCommit`'s time, we will rename this timestamp field to `last_timestamp`.
`Header` will be updated as follows:
```diff
type Header struct {
// basic block info
Version version.Consensus `json:"version"`
ChainID string `json:"chain_id"`
Height int64 `json:"height"`
-- Time time.Time `json:"time"`
++ LastTimestamp time.Time `json:"last_timestamp"`
// prev block info
LastBlockID BlockID `json:"last_block_id"`
// hashes of block data
LastCommitHash tmbytes.HexBytes `json:"last_commit_hash"`
DataHash tmbytes.HexBytes `json:"data_hash"`
// hashes from the app output from the prev block
ValidatorsHash tmbytes.HexBytes `json:"validators_hash"`
NextValidatorsHash tmbytes.HexBytes `json:"next_validators_hash"`
ConsensusHash tmbytes.HexBytes `json:"consensus_hash"`
AppHash tmbytes.HexBytes `json:"app_hash"`
// root hash of all results from the txs from the previous block
LastResultsHash tmbytes.HexBytes `json:"last_results_hash"`
// consensus info
EvidenceHash tmbytes.HexBytes `json:"evidence_hash"`
ProposerAddress Address `json:"proposer_address"`
}
```
### Changes to the block proposal step
#### Proposer selects proposal timestamp
The proposal logic already [sets the Unix time known to the validator](https://github.com/tendermint/tendermint/blob/2abfe20114ee3bb3adfee817589033529a804e4d/types/proposal.go#L44) into the `Proposal` message.
This satisfies the proposer-based timestamp specification and does not need to change.
#### Proposer selects block timestamp
The proposal timestamp that was decided in height `H-1` will be stored in the `State` struct's in the `RoundState.LastCommit` field.
The proposer will select this timestamp to use as the block timestamp at height `H`.
#### Proposer waits
Block timestamps must be monotonically increasing.
In `BFTTime`, if a validators clock was behind, the [validator added 1 millisecond to the previous blocks time and used that in its vote messages](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/internal/consensus/state.go#L2246).
A goal of adding proposer-based timestamps is to enforce some degree of clock synchronization, so having a mechanism that completely ignores the Unix time of the validator time no longer works.
Validator clocks will not be perfectly in sync.
Therefore, the proposers current known Unix time may be less than the `LastCommit.Timestamp`.
If the proposers current known Unix time is less than the `LastCommit.Timestamp`, the proposer will sleep until its known Unix time exceeds `LastCommit.Timestamp`.
This change will require amending the [defaultDecideProposal](https://github.com/tendermint/tendermint/blob/822893615564cb20b002dd5cf3b42b8d364cb7d9/internal/consensus/state.go#L1180) method.
This method should now block until the proposers time is greater than `LastCommit.Timestamp`.
#### Changes to the propose step timeout
Currently, a validator waiting for a proposal will proceed past the propose step if the configured propose timeout is reached and no proposal is seen.
Proposer-based timestamps requires changing this timeout logic.
The proposer will now wait until its current known Unix time exceeds the `LastCommit.Timestamp` to propose a block.
The validators must now take this and some other factors into account when deciding when to timeout the propose step.
Specifically, the propose step timeout must also take into account potential inaccuracy in the validators clock and in the clock of the proposer.
Additionally, there may be a delay communicating the proposal message from the proposer to the other validators.
Therefore, validators waiting for a proposal must wait until after the `LastCommit.Timestamp` before timing out.
To account for possible inaccuracy in its own clock, inaccuracy in the proposers clock, and message delay, validators waiting for a proposal will wait until `LastCommit.Timesatmp + 2*ACCURACY + MSGDELAY`.
The spec defines this as `waitingTime`.
The [propose steps timeout is set in enterPropose](https://github.com/tendermint/tendermint/blob/822893615564cb20b002dd5cf3b42b8d364cb7d9/internal/consensus/state.go#L1108) in `state.go`.
`enterPropose` will be changed to calculate waiting time using the new consensus parameters.
The timeout in `enterPropose` will then be set as the maximum of `waitingTime` and the [configured proposal step timeout](https://github.com/tendermint/tendermint/blob/dc7c212c41a360bfe6eb38a6dd8c709bbc39aae7/config/config.go#L1013).
### Changes to validation rules
The rules for validating that a proposal is valid will need slight modification to implement proposer-based timestamps.
Specifically, we will change the validation logic to ensure that the proposal timestamp is `timely` and we will modify the way the block timestamp is validated as well.
#### Proposal timestamp validation
Adding proposal timestamp validation is a reasonably straightforward change.
The current Unix time known to the proposer is already included in the [Proposal message](https://github.com/tendermint/tendermint/blob/dc7c212c41a360bfe6eb38a6dd8c709bbc39aae7/types/proposal.go#L31).
Once the proposal is received, the complete message is stored in the `RoundState.Proposal` field.
The precommit and prevote validation logic does not currently use this timestamp.
This validation logic will be updated to check that the proposal timestamp is within `PRECISION` of the current Unix time known to the validators.
If the timestamp is not within `PRECISION` of the current Unix time known to the validator, the proposal will not be considered it valid.
The validator will also check that the proposal time is greater than the block timestamp from the previous height.
If no valid proposal is received by the proposal timeout, the validator will prevote nil.
This is identical to the current logic.
#### Block timestamp validation
The [validBlock function](https://github.com/tendermint/tendermint/blob/c3ae6f5b58e07b29c62bfdc5715b6bf8ae5ee951/state/validation.go#L14) currently [validates the proposed block timestamp in three ways](https://github.com/tendermint/tendermint/blob/c3ae6f5b58e07b29c62bfdc5715b6bf8ae5ee951/state/validation.go#L118).
First, the validation logic checks that this timestamp is greater than the previous blocks timestamp.
Additionally, it validates that the block timestamp is correctly calculated as the weighted median of the timestamps in the [blocks LastCommit](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/types/block.go#L48).
Finally, the logic also authenticates the timestamps in the `LastCommit`.
The cryptographic signature in each `CommitSig` is created by signing a hash of fields in the block with the validators private key.
One of the items in this `signedBytes` hash is derived from the timestamp in the `CommitSig`.
To authenticate the `CommitSig` timestamp, the validator builds a hash of fields that includes the timestamp and checks this hash against the provided signature.
This takes place in the [VerifyCommit function](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/types/validation.go#L25).
The logic to validate that the block timestamp is greater than the previous blocks timestamp also works for proposer-based timestamps and will not change.
`BFTTime` validation is no longer applicable and will be removed.
Validators will no longer check that the block timestamp is a weighted median of `LastCommit` timestamps.
This will mean removing the call to [MedianTime in the validateBlock function](https://github.com/tendermint/tendermint/blob/4db71da68e82d5cb732b235eeb2fd69d62114b45/state/validation.go#L117).
The `MedianTime` function can be completely removed.
The `LastCommit` timestamps may also be removed.
The `signedBytes` validation logic in `VerifyCommit` will be slightly altered.
The `CommitSig`s in the blocks `LastCommit` will no longer each contain a timestamp.
The validation logic will instead include the `LastCommit.Timestamp` in the hash of fields for generating the `signedBytes`.
The cryptographic signatures included in the `CommitSig`s will then be checked against this `signedBytes` hash to authenticate the timestamp.
Specifically, the `VerifyCommit` function will be updated to use this new timestamp.
### Changes to the prevote step
Currently, a validator will prevote a proposal in one of three cases:
* Case 1: Validator has no locked block and receives a valid proposal.
* Case 2: Validator has a locked block and receives a valid proposal matching its locked block.
* Case 3: Validator has a locked block, sees a valid proposal not matching its locked block but sees +⅔ prevotes for the new proposals block.
The only change we will make to the prevote step is to what a validator considers a valid proposal as detailed above.
### Changes to the precommit step
The precommit step will not require much modification.
Its proposal validation rules will change in the same ways that validation will change in the prevote step.
### Changes to locking a block
When a validator receives a valid proposed block and +2/3 prevotes for that block, it stores the block as its locked block in the [RoundState.ValidBlock](https://github.com/tendermint/tendermint/blob/e8013281281985e3ada7819f42502b09623d24a0/internal/consensus/types/round_state.go#L85) field.
In each subsequent round it will prevote that block.
A validator will only change which block it has locked if it sees +2/3 prevotes for a different block.
This mechanism will remain largely unchanged.
The only difference is the addition of proposal timestamp validation.
A validator will prevote nil in a round if the proposal message it received is not `timely`.
Prevoting nil in this case will not cause a validator to unlock its locked block.
This difference is an incidental result of the changes to prevote validation.
It is included in this design for completeness and to clarify that no additional changes will be made to block locking.
### Remove voteTime Completely
[voteTime](https://github.com/tendermint/tendermint/blob/822893615564cb20b002dd5cf3b42b8d364cb7d9/internal/consensus/state.go#L2229) is a mechanism for calculating the next `BFTTime` given both the validator's current known Unix time and the previous block timestamp.
If the previous block timestamp is greater than the validator's current known Unix time, then voteTime returns a value one millisecond greater than the previous block timestamp.
This logic is used in multiple places and is no longer needed for proposer-based timestamps.
It should therefore be removed completely.
## Future Improvements
* Implement BLS signature aggregation.
By removing fields from the `Precommit` messages, we are able to aggregate signatures.
## Consequences
### Positive
* `<2/3` of validators can no longer influence block timestamps.
* Block timestamp will have stronger correspondence to real time.
* Improves the reliability of light client block verification.
* Enables BLS signature aggregation.
* Enables evidence handling to use time instead of height for evidence validity.
### Neutral
* Alters Tendermints liveness properties.
Liveness now requires that all correct validators have synchronized clocks within a bound.
Liveness will now also require that validators clocks move forward, which was not required under `BFTTime`.
### Negative
* May increase the length of the propose step if there is a large skew between the previous proposer and the current proposers local Unix time.
This skew will be bound by the `PRECISION` value, so it is unlikely to be too large.
* Current chains with block timestamps far in the future will either need to pause consensus until after the erroneous block timestamp or must maintain synchronized but very inaccurate clocks.
## References
* [PBTS Spec](https://github.com/tendermint/spec/tree/master/spec/consensus/proposer-based-timestamp)
* [BFTTime spec](https://github.com/tendermint/spec/blob/master/spec/consensus/bft-time.md)

View File

@@ -0,0 +1,105 @@
# ADR 72: Restore Requests for Comments
## Changelog
- 20-Aug-2021: Initial draft (@creachadair)
## Status
Proposed
## Context
In the past, we kept a collection of Request for Comments (RFC) documents in `docs/rfc`.
Prior to the creation of the ADR process, these documents were used to document
design and implementation decisions about Tendermint Core. The RFC directory
was removed in favor of ADRs, in commit 3761aa69 (PR
[\#6345](https://github.com/tendermint/tendermint/pull/6345)).
For issues where an explicit design decision or implementation change is
required, an ADR is generally preferable to an open-ended RFC: An ADR is
relatively narrowly-focused, identifies a specific design or implementation
question, and documents the consensus answer to that question.
Some discussions are more open-ended, however, or don't require a specific
decision to be made (yet). Such conversations are still valuable to document,
and several members of the Tendermint team have been doing so by writing gists
or Google docs to share them around. That works well enough in the moment, but
gists do not support any kind of collaborative editing, and both gists and docs
are hard to discover after the fact. Google docs have much better collaborative
editing, but are worse for discoverability, especially when contributors span
different Google accounts.
Discoverability is important, because these kinds of open-ended discussions are
useful to people who come later -- either as new team members or as outside
contributors seeking to use and understand the thoughts behind our designs and
the architectural decisions that arose from those discussion.
With these in mind, I propose that:
- We re-create a new, initially empty `docs/rfc` directory in the repository,
and use it to capture these kinds of open-ended discussions in supplement to
ADRs.
- Unlike in the previous RFC scheme, documents in this new directory will
_not_ be used directly for decision-making. This is the key difference
between an RFC and an ADR.
Instead, an RFC will exist to document background, articulate general
principles, and serve as a historical record of discussion and motivation.
In this system, an RFC may _only_ result in a decision indirectly, via ADR
documents created in response to the RFC.
**In short:** If a decision is required, write an ADR; otherwise if a
sufficiently broad discussion is needed, write an RFC.
Just so that there is a consistent format, I also propose that:
- RFC files are named `rfc-XXX-title.{md,rst,txt}` and are written in plain
text, Markdown, or ReStructured Text.
- Like an ADR, an RFC should include a high-level change log at the top of the
document, and sections for:
* Abstract: A brief, high-level synopsis of the topic.
* Background: Any background necessary to understand the topic.
* Discussion: Detailed discussion of the issue being considered.
- Unlike an ADR, an RFC does _not_ include sections for Decisions, Detailed
Design, or evaluation of proposed solutions. If an RFC leads to a proposal
for an actual architectural change, that must be recorded in an ADR in the
usual way, and may refer back to the RFC in its References section.
## Alternative Approaches
Leaving aside implementation details, the main alternative to this proposal is
to leave things as they are now, with ADRs as the only log of record and other
discussions being held informally in whatever medium is convenient at the time.
## Decision
(pending)
## Detailed Design
- Create a new `docs/rfc` directory in the `tendermint` repository. Note that
this proposal intentionally does _not_ pull back the previous contents of
that path from Git history, as those documents were appropriately merged into
the ADR process.
- Create a `README.md` for RFCs that explains the rules and their relationship
to ADRs.
- Create an `rfc-template.md` file for RFC files.
## Consequences
### Positive
- We will have a more discoverable place to record open-ended discussions that
do not immediately result in a design change.
### Negative
- Potentially some people could be confused about the RFC/ADR distinction.

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

40
docs/rfc/README.md Normal file
View File

@@ -0,0 +1,40 @@
---
order: 1
parent:
order: false
---
# Requests for Comments
A Request for Comments (RFC) is a record of discussion on an open-ended topic
related to the design and implementation of Tendermint Core, for which no
immediate decision is required.
The purpose of an RFC is to serve as a historical record of a high-level
discussion that might otherwise only be recorded in an ad hoc way (for example,
via gists or Google docs) that are difficult to discover for someone after the
fact. An RFC _may_ give rise to more specific architectural _decisions_ for
Tendermint, but those decisions must be recorded separately in [Architecture
Decision Records (ADR)](./../architecture).
As a rule of thumb, if you can articulate a specific question that needs to be
answered, write an ADR. If you need to explore the topic and get input from
others to know what questions need to be answered, an RFC may be appropriate.
## RFC Content
An RFC should provide:
- A **changelog**, documenting when and how the RFC has changed.
- An **abstract**, briefly summarizing the topic so the reader can quickly tell
whether it is relevant to their interest.
- Any **background** a reader will need to understand and participate in the
substance of the discussion (links to other documents are fine here).
- The **discussion**, the primary content of the document.
The [rfc-template.md](./rfc-template.md) file includes placeholders for these
sections.
## Table of Contents
<!-- - [RFC-NNN: Title](./rfc-NNN.title.md) -->

35
docs/rfc/rfc-template.md Normal file
View File

@@ -0,0 +1,35 @@
# RFC {RFC-NUMBER}: {TITLE}
## Changelog
- {date}: {changelog}
## Abstract
> A brief high-level synopsis of the topic of discussion for this RFC, ideally
> just a few sentences. This should help the reader quickly decide whether the
> rest of the discussion is relevant to their interest.
## Background
> Any context or orientation needed for a reader to understand and participate
> in the substance of the Discussion. If necessary, this section may include
> links to other documentation or sources rather than restating existing
> material, but should provide enough detail that the reader can tell what they
> need to read to be up-to-date.
### References
> Links to external materials needed to follow the discussion may be added here.
>
> In addition, if the discussion in a request for comments leads to any design
> decisions, it may be helpful to add links to the ADR documents here after the
> discussion has settled.
## Discussion
> This section contains the core of the discussion.
>
> There is no fixed format for this section, but ideally changes to this
> section should be updated before merging to reflect any discussion that took
> place on the PR that made those changes.

View File

@@ -62,3 +62,30 @@ given destination directory. Each archive will contain:
Note: goroutine.out and heap.out will only be written if a profile address is
provided and is operational. This command is blocking and will log any error.
## Tendermint Inspect
Tendermint includes an `inspect` command for querying Tendermint's state store and block
store over Tendermint RPC.
When the Tendermint consensus engine detects inconsistent state, it will crash the
entire Tendermint process.
While in this inconsistent state, a node running Tendermint's consensus engine will not start up.
The `inspect` command runs only a subset of Tendermint's RPC endpoints for querying the block store
and state store.
`inspect` allows operators to query a read-only view of the stage.
`inspect` does not run the consensus engine at all and can therefore be used to debug
processes that have crashed due to inconsistent state.
To start the `inspect` process, run
```bash
tendermint inspect
```
### RPC endpoints
The list of available RPC endpoints can be found by making a request to the RPC port.
For an `inspect` process running on `127.0.0.1:26657`, navigate your browser to
`http://127.0.0.1:26657/` to retrieve the list of enabled RPC endpoints.
Additional information on the Tendermint RPC endpoints can be found in the [rpc documentation](https://docs.tendermint.com/master/rpc).

View File

@@ -64,13 +64,42 @@ It wont kill the node, but it will gather all of the above data and package i
At this point, depending on how severe the degradation is, you may want to restart the process.
## Tendermint Inspect
What if the Tendermint node will not start up due to inconsistent consensus state?
When a node running the Tendermint consensus engine detects an inconsistent state
it will crash the entire Tendermint process.
The Tendermint consensus engine cannot be run in this inconsistent state and the so node
will fail to start up as a result.
The Tendermint RPC server can provide valuable information for debugging in this situation.
The Tendermint `inspect` command will run a subset of the Tendermint RPC server
that is useful for debugging inconsistent state.
### Running inspect
Start up the `inspect` tool on the machine where Tendermint crashed using:
```bash
tendermint inspect --home=</path/to/app.d>
```
`inspect` will use the data directory specified in your Tendermint configuration file.
`inspect` will also run the RPC server at the address specified in your Tendermint configuration file.
### Using inspect
With the `inspect` server running, you can access RPC endpoints that are critically important
for debugging.
Calling the `/status`, `/consensus_state` and `/dump_consensus_state` RPC endpoint
will return useful information about the Tendermint consensus state.
## Outro
Were hoping that the `tendermint debug` subcommand will become de facto the first response to any accidents.
Were hoping that these Tendermint tools will become de facto the first response for any accidents.
Let us know what your experience has been so far! Have you had a chance to try `tendermint debug` yet?
Let us know what your experience has been so far! Have you had a chance to try `tendermint debug` or `tendermint inspect` yet?
Join our chat, where we discuss the current issues and future improvements.
Join our [discord chat](https://discord.gg/vcExX9T), where we discuss the current issues and future improvements.

1
go.mod
View File

@@ -37,6 +37,7 @@ require (
github.com/vektra/mockery/v2 v2.9.0
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
google.golang.org/grpc v1.40.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
pgregory.net/rapid v0.4.7

1
go.sum
View File

@@ -1075,6 +1075,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

36
inspect/doc.go Normal file
View File

@@ -0,0 +1,36 @@
/*
Package inspect provides a tool for investigating the state of a
failed Tendermint node.
This package provides the Inspector type. The Inspector type runs a subset of the Tendermint
RPC endpoints that are useful for debugging issues with Tendermint consensus.
When a node running the Tendermint consensus engine detects an inconsistent consensus state,
the entire node will crash. The Tendermint consensus engine cannot run in this
inconsistent state so the node will not be able to start up again.
The RPC endpoints provided by the Inspector type allow for a node operator to inspect
the block store and state store to better understand what may have caused the inconsistent state.
The Inspector type's lifecycle is controlled by a context.Context
ins := inspect.NewFromConfig(rpcConfig)
ctx, cancelFunc:= context.WithCancel(context.Background())
// Run blocks until the Inspector server is shut down.
go ins.Run(ctx)
...
// calling the cancel function will stop the running inspect server
cancelFunc()
Inspector serves its RPC endpoints on the address configured in the RPC configuration
rpcConfig.ListenAddress = "tcp://127.0.0.1:26657"
ins := inspect.NewFromConfig(rpcConfig)
go ins.Run(ctx)
The list of available RPC endpoints can then be viewed by navigating to
http://127.0.0.1:26657/ in the web browser.
*/
package inspect

149
inspect/inspect.go Normal file
View File

@@ -0,0 +1,149 @@
package inspect
import (
"context"
"errors"
"fmt"
"net"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect/rpc"
"github.com/tendermint/tendermint/libs/log"
tmstrings "github.com/tendermint/tendermint/libs/strings"
rpccore "github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"golang.org/x/sync/errgroup"
)
// Inspector manages an RPC service that exports methods to debug a failed node.
// After a node shuts down due to a consensus failure, it will no longer start
// up its state cannot easily be inspected. An Inspector value provides a similar interface
// to the node, using the underlying Tendermint data stores, without bringing up
// any other components. A caller can query the Inspector service to inspect the
// persisted state and debug the failure.
type Inspector struct {
routes rpccore.RoutesMap
config *config.RPCConfig
indexerService *indexer.Service
eventBus *types.EventBus
logger log.Logger
}
// New returns an Inspector that serves RPC on the specified BlockStore and StateStore.
// The Inspector type does not modify the state or block stores.
// The sinks are used to enable block and transaction querying via the RPC server.
// The caller is responsible for starting and stopping the Inspector service.
///
//nolint:lll
func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspector {
routes := rpc.Routes(*cfg, ss, bs, es, logger)
eb := types.NewEventBus()
eb.SetLogger(logger.With("module", "events"))
is := indexer.NewIndexerService(es, eb)
is.SetLogger(logger.With("module", "txindex"))
return &Inspector{
routes: routes,
config: cfg,
logger: logger,
eventBus: eb,
indexerService: is,
}
}
// NewFromConfig constructs an Inspector using the values defined in the passed in config.
func NewFromConfig(cfg *config.Config) (*Inspector, error) {
bsDB, err := config.DefaultDBProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return nil, err
}
bs := store.NewBlockStore(bsDB)
sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, err
}
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
if err != nil {
return nil, err
}
sinks, err := sink.EventSinksFromConfig(cfg, config.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return nil, err
}
logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
ss := state.NewStore(sDB)
return New(cfg.RPC, bs, ss, sinks, logger), nil
}
// Run starts the Inspector servers and blocks until the servers shut down. The passed
// in context is used to control the lifecycle of the servers.
func (ins *Inspector) Run(ctx context.Context) error {
err := ins.eventBus.Start()
if err != nil {
return fmt.Errorf("error starting event bus: %s", err)
}
defer func() {
err := ins.eventBus.Stop()
if err != nil {
ins.logger.Error("event bus stopped with error", "err", err)
}
}()
err = ins.indexerService.Start()
if err != nil {
return fmt.Errorf("error starting indexer service: %s", err)
}
defer func() {
err := ins.indexerService.Stop()
if err != nil {
ins.logger.Error("indexer service stopped with error", "err", err)
}
}()
return startRPCServers(ctx, ins.config, ins.logger, ins.routes)
}
func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error {
g, tctx := errgroup.WithContext(ctx)
listenAddrs := tmstrings.SplitAndTrimEmpty(cfg.ListenAddress, ",", " ")
rh := rpc.Handler(cfg, routes, logger)
for _, listenerAddr := range listenAddrs {
server := rpc.Server{
Logger: logger,
Config: cfg,
Handler: rh,
Addr: listenerAddr,
}
if cfg.IsTLSEnabled() {
keyFile := cfg.KeyFile()
certFile := cfg.CertFile()
listenerAddr := listenerAddr
g.Go(func() error {
logger.Info("RPC HTTPS server starting", "address", listenerAddr,
"certfile", certFile, "keyfile", keyFile)
err := server.ListenAndServeTLS(tctx, certFile, keyFile)
if !errors.Is(err, net.ErrClosed) {
return err
}
logger.Info("RPC HTTPS server stopped", "address", listenerAddr)
return nil
})
} else {
listenerAddr := listenerAddr
g.Go(func() error {
logger.Info("RPC HTTP server starting", "address", listenerAddr)
err := server.ListenAndServe(tctx)
if !errors.Is(err, net.ErrClosed) {
return err
}
logger.Info("RPC HTTP server stopped", "address", listenerAddr)
return nil
})
}
}
return g.Wait()
}

583
inspect/inspect_test.go Normal file
View File

@@ -0,0 +1,583 @@
package inspect_test
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abcitypes "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/proto/tendermint/state"
httpclient "github.com/tendermint/tendermint/rpc/client/http"
"github.com/tendermint/tendermint/state/indexer"
indexermocks "github.com/tendermint/tendermint/state/indexer/mocks"
statemocks "github.com/tendermint/tendermint/state/mocks"
"github.com/tendermint/tendermint/types"
)
func TestInspectConstructor(t *testing.T) {
cfg := config.ResetTestRoot("test")
t.Cleanup(leaktest.Check(t))
defer func() { _ = os.RemoveAll(cfg.RootDir) }()
t.Run("from config", func(t *testing.T) {
d, err := inspect.NewFromConfig(cfg)
require.NoError(t, err)
require.NotNil(t, d)
})
}
func TestInspectRun(t *testing.T) {
cfg := config.ResetTestRoot("test")
t.Cleanup(leaktest.Check(t))
defer func() { _ = os.RemoveAll(cfg.RootDir) }()
t.Run("from config", func(t *testing.T) {
d, err := inspect.NewFromConfig(cfg)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
stoppedWG := &sync.WaitGroup{}
stoppedWG.Add(1)
go func() {
require.NoError(t, d.Run(ctx))
stoppedWG.Done()
}()
cancel()
stoppedWG.Wait()
})
}
func TestBlock(t *testing.T) {
testHeight := int64(1)
testBlock := new(types.Block)
testBlock.Header.Height = testHeight
testBlock.Header.LastCommitHash = []byte("test hash")
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{})
blockStoreMock.On("LoadBlock", testHeight).Return(testBlock)
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
resultBlock, err := cli.Block(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, testBlock.Height, resultBlock.Block.Height)
require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestTxSearch(t *testing.T) {
testHash := []byte("test")
testTx := []byte("tx")
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash))
testTxResult := &abcitypes.TxResult{
Height: 1,
Index: 100,
Tx: testTx,
}
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
eventSinkMock.On("Type").Return(indexer.KV)
eventSinkMock.On("SearchTxEvents", mock.Anything,
mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })).
Return([]*abcitypes.TxResult{testTxResult}, nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
var page = 1
resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "")
require.NoError(t, err)
require.Len(t, resultTxSearch.Txs, 1)
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx)
cancel()
wg.Wait()
eventSinkMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
blockStoreMock.AssertExpectations(t)
}
func TestTx(t *testing.T) {
testHash := []byte("test")
testTx := []byte("tx")
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
eventSinkMock.On("Type").Return(indexer.KV)
eventSinkMock.On("GetTxByHash", testHash).Return(&abcitypes.TxResult{
Tx: testTx,
}, nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
res, err := cli.Tx(context.Background(), testHash, false)
require.NoError(t, err)
require.Equal(t, types.Tx(testTx), res.Tx)
cancel()
wg.Wait()
eventSinkMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
blockStoreMock.AssertExpectations(t)
}
func TestConsensusParams(t *testing.T) {
testHeight := int64(1)
testMaxGas := int64(55)
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
stateStoreMock.On("LoadConsensusParams", testHeight).Return(types.ConsensusParams{
Block: types.BlockParams{
MaxGas: testMaxGas,
},
}, nil)
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
params, err := cli.ConsensusParams(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, params.ConsensusParams.Block.MaxGas, testMaxGas)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockResults(t *testing.T) {
testHeight := int64(1)
testGasUsed := int64(100)
stateStoreMock := &statemocks.Store{}
// tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
stateStoreMock.On("LoadABCIResponses", testHeight).Return(&state.ABCIResponses{
DeliverTxs: []*abcitypes.ResponseDeliverTx{
{
GasUsed: testGasUsed,
},
},
EndBlock: &abcitypes.ResponseEndBlock{},
BeginBlock: &abcitypes.ResponseBeginBlock{},
}, nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("Height").Return(testHeight)
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
res, err := cli.BlockResults(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, res.TotalGasUsed, testGasUsed)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestCommit(t *testing.T) {
testHeight := int64(1)
testRound := int32(101)
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}, nil)
blockStoreMock.On("LoadSeenCommit").Return(&types.Commit{
Height: testHeight,
Round: testRound,
}, nil)
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
res, err := cli.Commit(context.Background(), &testHeight)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, res.SignedHeader.Commit.Round, testRound)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockByHash(t *testing.T) {
testHeight := int64(1)
testHash := []byte("test hash")
testBlock := new(types.Block)
testBlock.Header.Height = testHeight
testBlock.Header.LastCommitHash = testHash
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{
BlockID: types.BlockID{
Hash: testHash,
},
Header: types.Header{
Height: testHeight,
},
}, nil)
blockStoreMock.On("LoadBlockByHash", testHash).Return(testBlock, nil)
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
res, err := cli.BlockByHash(context.Background(), testHash)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, []byte(res.BlockID.Hash), testHash)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockchain(t *testing.T) {
testHeight := int64(1)
testBlock := new(types.Block)
testBlockHash := []byte("test hash")
testBlock.Header.Height = testHeight
testBlock.Header.LastCommitHash = testBlockHash
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{
BlockID: types.BlockID{
Hash: testBlockHash,
},
})
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
res, err := cli.BlockchainInfo(context.Background(), 0, 100)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, testBlockHash, []byte(res.BlockMetas[0].BlockID.Hash))
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestValidators(t *testing.T) {
testHeight := int64(1)
testVotingPower := int64(100)
testValidators := types.ValidatorSet{
Validators: []*types.Validator{
{
VotingPower: testVotingPower,
},
},
}
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("LoadValidators", testHeight).Return(&testValidators, nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
testPage := 1
testPerPage := 100
res, err := cli.Validators(context.Background(), &testHeight, &testPage, &testPerPage)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, testVotingPower, res.Validators[0].VotingPower)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockSearch(t *testing.T) {
testHeight := int64(1)
testBlockHash := []byte("test hash")
testQuery := "block.height = 1"
stateStoreMock := &statemocks.Store{}
blockStoreMock := &statemocks.BlockStore{}
eventSinkMock := &indexermocks.EventSink{}
eventSinkMock.On("Stop").Return(nil)
eventSinkMock.On("Type").Return(indexer.KV)
blockStoreMock.On("LoadBlock", testHeight).Return(&types.Block{
Header: types.Header{
Height: testHeight,
},
}, nil)
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{
BlockID: types.BlockID{
Hash: testBlockHash,
},
})
eventSinkMock.On("SearchBlockEvents", mock.Anything,
mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })).
Return([]int64{testHeight}, nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress)
require.NoError(t, err)
testPage := 1
testPerPage := 100
testOrderBy := "desc"
res, err := cli.BlockSearch(context.Background(), testQuery, &testPage, &testPerPage, testOrderBy)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, testBlockHash, []byte(res.Blocks[0].BlockID.Hash))
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func requireConnect(t testing.TB, addr string, retries int) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
t.Fatalf("malformed address to dial: %s", addr)
}
var err error
for i := 0; i < retries; i++ {
var conn net.Conn
conn, err = net.Dial(parts[0], parts[1])
if err == nil {
conn.Close()
return
}
// FIXME attempt to yield and let the other goroutine continue execution.
time.Sleep(time.Microsecond * 100)
}
t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err)
}

143
inspect/rpc/rpc.go Normal file
View File

@@ -0,0 +1,143 @@
package rpc
import (
"context"
"net/http"
"time"
"github.com/rs/cors"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/rpc/jsonrpc/server"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
// Server defines parameters for running an Inspector rpc server.
type Server struct {
Addr string // TCP address to listen on, ":http" if empty
Handler http.Handler
Logger log.Logger
Config *config.RPCConfig
}
// Routes returns the set of routes used by the Inspector server.
//
//nolint: lll
func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink, logger log.Logger) core.RoutesMap {
env := &core.Environment{
Config: cfg,
EventSinks: es,
StateStore: s,
BlockStore: bs,
ConsensusReactor: waitSyncCheckerImpl{},
Logger: logger,
}
return core.RoutesMap{
"blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
"consensus_params": server.NewRPCFunc(env.ConsensusParams, "height", true),
"block": server.NewRPCFunc(env.Block, "height", true),
"block_by_hash": server.NewRPCFunc(env.BlockByHash, "hash", true),
"block_results": server.NewRPCFunc(env.BlockResults, "height", true),
"commit": server.NewRPCFunc(env.Commit, "height", true),
"validators": server.NewRPCFunc(env.Validators, "height,page,per_page", true),
"tx": server.NewRPCFunc(env.Tx, "hash,prove", true),
"tx_search": server.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false),
"block_search": server.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false),
}
}
// Handler returns the http.Handler configured for use with an Inspector server. Handler
// registers the routes on the http.Handler and also registers the websocket handler
// and the CORS handler if specified by the configuration options.
func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logger) http.Handler {
mux := http.NewServeMux()
wmLogger := logger.With("protocol", "websocket")
var eventBus types.EventBusSubscriber
websocketDisconnectFn := func(remoteAddr string) {
err := eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != pubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}
wm := server.NewWebsocketManager(routes,
server.OnDisconnect(websocketDisconnectFn),
server.ReadLimit(rpcConfig.MaxBodyBytes))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
server.RegisterRPCFuncs(mux, routes, logger)
var rootHandler http.Handler = mux
if rpcConfig.IsCorsEnabled() {
rootHandler = addCORSHandler(rpcConfig, mux)
}
return rootHandler
}
func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler {
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: rpcConfig.CORSAllowedOrigins,
AllowedMethods: rpcConfig.CORSAllowedMethods,
AllowedHeaders: rpcConfig.CORSAllowedHeaders,
})
h = corsMiddleware.Handler(h)
return h
}
type waitSyncCheckerImpl struct{}
func (waitSyncCheckerImpl) WaitSync() bool {
return false
}
func (waitSyncCheckerImpl) GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) {
return nil, false
}
// ListenAndServe listens on the address specified in srv.Addr and handles any
// incoming requests over HTTP using the Inspector rpc handler specified on the server.
func (srv *Server) ListenAndServe(ctx context.Context) error {
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections)
if err != nil {
return err
}
go func() {
<-ctx.Done()
listener.Close()
}()
return server.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config))
}
// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles
// incoming requests over HTTPS using the Inspector rpc handler specified on the server.
func (srv *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error {
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections)
if err != nil {
return err
}
go func() {
<-ctx.Done()
listener.Close()
}()
return server.ServeTLS(listener, srv.Handler, certFile, keyFile, srv.Logger, serverRPCConfig(srv.Config))
}
func serverRPCConfig(r *config.RPCConfig) *server.Config {
cfg := server.DefaultConfig()
cfg.MaxBodyBytes = r.MaxBodyBytes
cfg.MaxHeaderBytes = r.MaxHeaderBytes
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit {
cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second
}
return cfg
}

View File

@@ -29,10 +29,10 @@ var (
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
BlockchainChannel: {
BlockSyncChannel: {
MsgType: new(bcproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(BlockchainChannel),
ID: byte(BlockSyncChannel),
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
@@ -44,8 +44,8 @@ var (
)
const (
// BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = p2p.ChannelID(0x40)
// BlockSyncChannel is a channel for blocks and status updates
BlockSyncChannel = p2p.ChannelID(0x40)
trySyncIntervalMS = 10
@@ -60,7 +60,7 @@ const (
)
type consensusReactor interface {
// For when we switch from blockchain reactor and block sync to the consensus
// For when we switch from block sync reactor to the consensus
// machine.
SwitchToConsensus(state sm.State, skipWAL bool)
}
@@ -87,17 +87,17 @@ type Reactor struct {
consReactor consensusReactor
blockSync *tmSync.AtomicBool
blockchainCh *p2p.Channel
// blockchainOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockchainCh and receiving messages
blockSyncCh *p2p.Channel
// blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockSyncCh and receiving messages
// from the peer updates channel and other goroutines. We do this instead of directly
// sending on blockchainCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockchainCh.Out channel, since processBlockchainCh
// may close the blockchainCh.Out channel at the same time that other goroutines send to
// blockchainCh.Out.
blockchainOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh
// may close the blockSyncCh.Out channel at the same time that other goroutines send to
// blockSyncCh.Out.
blockSyncOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@@ -119,7 +119,7 @@ func NewReactor(
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
blockchainCh *p2p.Channel,
blockSyncCh *p2p.Channel,
peerUpdates *p2p.PeerUpdates,
blockSync bool,
metrics *cons.Metrics,
@@ -137,23 +137,23 @@ func NewReactor(
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: tmSync.NewBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockchainCh: blockchainCh,
blockchainOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
metrics: metrics,
syncStartTime: time.Time{},
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: tmSync.NewBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
metrics: metrics,
syncStartTime: time.Time{},
}
r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
return r, nil
}
@@ -174,7 +174,7 @@ func (r *Reactor) OnStart() error {
go r.poolRoutine(false)
}
go r.processBlockchainCh()
go r.processBlockSyncCh()
go r.processPeerUpdates()
return nil
@@ -199,7 +199,7 @@ func (r *Reactor) OnStop() {
// Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur.
<-r.blockchainCh.Done()
<-r.blockSyncCh.Done()
<-r.peerUpdates.Done()
}
@@ -214,7 +214,7 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID types.NodeID)
return
}
r.blockchainCh.Out <- p2p.Envelope{
r.blockSyncCh.Out <- p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
}
@@ -223,16 +223,16 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID types.NodeID)
}
r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
r.blockchainCh.Out <- p2p.Envelope{
r.blockSyncCh.Out <- p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
}
}
// handleBlockchainMessage handles envelopes sent from peers on the
// BlockchainChannel. It returns an error only if the Envelope.Message is unknown
// handleBlockSyncMessage handles envelopes sent from peers on the
// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
func (r *Reactor) handleBlockSyncMessage(envelope p2p.Envelope) error {
logger := r.Logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -249,7 +249,7 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
r.pool.AddBlock(envelope.From, block, block.Size())
case *bcproto.StatusRequest:
r.blockchainCh.Out <- p2p.Envelope{
r.blockSyncCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
Height: r.store.Height(),
@@ -288,8 +288,8 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
switch chID {
case BlockchainChannel:
err = r.handleBlockchainMessage(envelope)
case BlockSyncChannel:
err = r.handleBlockSyncMessage(envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
@@ -298,30 +298,30 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
return err
}
// processBlockchainCh initiates a blocking process where we listen for and handle
// envelopes on the BlockchainChannel and blockchainOutBridgeCh. Any error encountered during
// message execution will result in a PeerError being sent on the BlockchainChannel.
// processBlockSyncCh initiates a blocking process where we listen for and handle
// envelopes on the BlockSyncChannel and blockSyncOutBridgeCh. Any error encountered during
// message execution will result in a PeerError being sent on the BlockSyncChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processBlockchainCh() {
defer r.blockchainCh.Close()
func (r *Reactor) processBlockSyncCh() {
defer r.blockSyncCh.Close()
for {
select {
case envelope := <-r.blockchainCh.In:
if err := r.handleMessage(r.blockchainCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID, "envelope", envelope, "err", err)
r.blockchainCh.Error <- p2p.PeerError{
case envelope := <-r.blockSyncCh.In:
if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: envelope.From,
Err: err,
}
}
case envelope := <-r.blockchainOutBridgeCh:
r.blockchainCh.Out <- envelope
case envelope := <-r.blockSyncOutBridgeCh:
r.blockSyncCh.Out <- envelope
case <-r.closeCh:
r.Logger.Debug("stopped listening on blockchain channel; closing...")
r.Logger.Debug("stopped listening on block sync channel; closing...")
return
}
@@ -340,7 +340,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// send a status update the newly added peer
r.blockchainOutBridgeCh <- p2p.Envelope{
r.blockSyncOutBridgeCh <- p2p.Envelope{
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
@@ -406,13 +406,13 @@ func (r *Reactor) requestRoutine() {
return
case request := <-r.requestsCh:
r.blockchainOutBridgeCh <- p2p.Envelope{
r.blockSyncOutBridgeCh <- p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}
case pErr := <-r.errorsCh:
r.blockchainCh.Error <- p2p.PeerError{
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: pErr.peerID,
Err: pErr.err,
}
@@ -423,7 +423,7 @@ func (r *Reactor) requestRoutine() {
go func() {
defer r.poolWG.Done()
r.blockchainOutBridgeCh <- p2p.Envelope{
r.blockSyncOutBridgeCh <- p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}
@@ -554,14 +554,14 @@ FOR_LOOP:
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
r.blockchainCh.Error <- p2p.PeerError{
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: peerID,
Err: err,
}
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
r.blockchainCh.Error <- p2p.PeerError{
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: peerID2,
Err: err,
}

View File

@@ -32,9 +32,9 @@ type reactorTestSuite struct {
reactors map[types.NodeID]*Reactor
app map[types.NodeID]proxy.AppConns
blockchainChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
blockSyncChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
blockSync bool
}
@@ -53,19 +53,19 @@ func setup(
"must specify at least one block height (nodes)")
rts := &reactorTestSuite{
logger: log.TestingLogger().With("module", "blockchain", "testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
blockchainChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
blockSync: true,
logger: log.TestingLogger().With("module", "block_sync", "testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
blockSync: true,
}
chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel)}
rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
i := 0
for nodeID := range rts.network.Nodes {
@@ -161,7 +161,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
blockExec,
blockStore,
nil,
rts.blockchainChannels[nodeID],
rts.blockSyncChannels[nodeID],
rts.peerUpdates[nodeID],
rts.blockSync,
cons.NopMetrics())
@@ -181,7 +181,7 @@ func (rts *reactorTestSuite) start(t *testing.T) {
}
func TestReactor_AbruptDisconnect(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
@@ -216,7 +216,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
}
func TestReactor_SyncTime(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
@@ -239,7 +239,7 @@ func TestReactor_SyncTime(t *testing.T) {
}
func TestReactor_NoBlockResponse(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
@@ -286,7 +286,7 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
// See: https://github.com/tendermint/tendermint/issues/6005
t.SkipNow()
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir)
maxBlockHeight := int64(48)

View File

@@ -507,7 +507,10 @@ func (state *state) send(msg interface{}, events []types.Event) error {
for clientID, subscription := range clientSubscriptions {
if cap(subscription.out) == 0 {
// block on unbuffered channel
subscription.out <- NewMessage(subscription.id, msg, events)
select {
case subscription.out <- NewMessage(subscription.id, msg, events):
case <-subscription.canceled:
}
} else {
// don't block on buffered channels
select {

View File

@@ -1,7 +1,6 @@
package time
import (
"sort"
"time"
)
@@ -16,43 +15,3 @@ func Now() time.Time {
func Canonical(t time.Time) time.Time {
return t.Round(0).UTC()
}
// WeightedTime for computing a median.
type WeightedTime struct {
Time time.Time
Weight int64
}
// NewWeightedTime with time and weight.
func NewWeightedTime(time time.Time, weight int64) *WeightedTime {
return &WeightedTime{
Time: time,
Weight: weight,
}
}
// WeightedMedian computes weighted median time for a given array of WeightedTime and the total voting power.
func WeightedMedian(weightedTimes []*WeightedTime, totalVotingPower int64) (res time.Time) {
median := totalVotingPower / 2
sort.Slice(weightedTimes, func(i, j int) bool {
if weightedTimes[i] == nil {
return false
}
if weightedTimes[j] == nil {
return true
}
return weightedTimes[i].Time.UnixNano() < weightedTimes[j].Time.UnixNano()
})
for _, weightedTime := range weightedTimes {
if weightedTime != nil {
if median <= weightedTime.Weight {
res = weightedTime.Time
break
}
median -= weightedTime.Weight
}
}
return
}

View File

@@ -113,7 +113,7 @@ func (p *Proxy) listen() (net.Listener, *http.ServeMux, error) {
}
// 4) Start listening for new connections.
listener, err := rpcserver.Listen(p.Addr, p.Config)
listener, err := rpcserver.Listen(p.Addr, p.Config.MaxOpenConnections)
if err != nil {
return nil, mux, err
}

View File

@@ -441,22 +441,23 @@ func makeNode(config *cfg.Config,
ProxyAppQuery: proxyApp.Query(),
ProxyAppMempool: proxyApp.Mempool(),
StateStore: stateStore,
BlockStore: blockStore,
EvidencePool: evPool,
ConsensusState: csState,
StateStore: stateStore,
BlockStore: blockStore,
EvidencePool: evPool,
ConsensusState: csState,
ConsensusReactor: csReactor,
BlockSyncReactor: bcReactor.(cs.BlockSyncReactor),
P2PPeers: sw,
PeerManager: peerManager,
GenDoc: genDoc,
EventSinks: eventSinks,
ConsensusReactor: csReactor,
EventBus: eventBus,
Mempool: mp,
Logger: logger.With("module", "rpc"),
Config: *config.RPC,
GenDoc: genDoc,
EventSinks: eventSinks,
EventBus: eventBus,
Mempool: mp,
Logger: logger.With("module", "rpc"),
Config: *config.RPC,
},
}
@@ -825,7 +826,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
listener, err := rpcserver.Listen(
listenAddr,
config,
config.MaxOpenConnections,
)
if err != nil {
return nil, err
@@ -883,7 +884,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
listener, err := rpcserver.Listen(grpcListenAddr, config)
listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections)
if err != nil {
return nil, err
}

View File

@@ -8,7 +8,6 @@ import (
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"strings"
"time"
dbm "github.com/tendermint/tm-db"
@@ -33,9 +32,7 @@ import (
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
null "github.com/tendermint/tendermint/state/indexer/sink/null"
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@@ -78,56 +75,9 @@ func createAndStartIndexerService(
logger log.Logger,
chainID string,
) (*indexer.Service, []indexer.EventSink, error) {
eventSinks := []indexer.EventSink{}
// check for duplicated sinks
sinks := map[string]bool{}
for _, s := range config.TxIndex.Indexer {
sl := strings.ToLower(s)
if sinks[sl] {
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = true
}
loop:
for k := range sinks {
switch k {
case string(indexer.NULL):
// When we see null in the config, the eventsinks will be reset with the
// nullEventSink.
eventSinks = []indexer.EventSink{null.NewEventSink()}
break loop
case string(indexer.KV):
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case string(indexer.PSQL):
conn := config.TxIndex.PsqlConn
if conn == "" {
return nil, nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, nil, errors.New("unsupported event sink type")
}
}
if len(eventSinks) == 0 {
eventSinks = []indexer.EventSink{null.NewEventSink()}
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
if err != nil {
return nil, nil, err
}
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
@@ -362,7 +312,7 @@ func createBlockchainReactor(
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
channels[bcv0.BlockchainChannel], peerUpdates, blockSync,
channels[bcv0.BlockSyncChannel], peerUpdates, blockSync,
metrics,
)
if err != nil {
@@ -727,7 +677,7 @@ func makeNodeInfo(
var bcChannel byte
switch config.BlockSync.Version {
case cfg.BlockSyncV0:
bcChannel = byte(bcv0.BlockchainChannel)
bcChannel = byte(bcv0.BlockSyncChannel)
case cfg.BlockSyncV2:
bcChannel = bcv2.BlockchainChannel

View File

@@ -1 +0,0 @@
<!-- TODO: add in release notes prior to release. -->

View File

@@ -58,6 +58,11 @@ type peers interface {
Peers() p2p.IPeerSet
}
type consensusReactor interface {
WaitSync() bool
GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool)
}
type peerManager interface {
Peers() []types.NodeID
Addresses(types.NodeID) []p2p.NodeAddress
@@ -72,11 +77,12 @@ type Environment struct {
ProxyAppMempool proxy.AppConnMempool
// interfaces defined in types and above
StateStore sm.Store
BlockStore sm.BlockStore
EvidencePool sm.EvidencePool
ConsensusState consensusState
P2PPeers peers
StateStore sm.Store
BlockStore sm.BlockStore
EvidencePool sm.EvidencePool
ConsensusState consensusState
ConsensusReactor consensusReactor
P2PPeers peers
// Legacy p2p stack
P2PTransport transport
@@ -88,7 +94,6 @@ type Environment struct {
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
EventSinks []indexer.EventSink
ConsensusReactor *consensus.Reactor
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
BlockSyncReactor consensus.BlockSyncReactor

View File

@@ -110,7 +110,7 @@ func setup() {
wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
config := server.DefaultConfig()
listener1, err := server.Listen(tcpAddr, config)
listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections)
if err != nil {
panic(err)
}
@@ -126,7 +126,7 @@ func setup() {
wm = server.NewWebsocketManager(Routes)
wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener2, err := server.Listen(unixAddr, config)
listener2, err := server.Listen(unixAddr, config.MaxOpenConnections)
if err != nil {
panic(err)
}

View File

@@ -261,7 +261,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Listen starts a new net.Listener on the given address.
// It returns an error if the address is invalid or the call to Listen() fails.
func Listen(addr string, config *Config) (listener net.Listener, err error) {
func Listen(addr string, maxOpenConnections int) (listener net.Listener, err error) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
return nil, fmt.Errorf(
@@ -274,8 +274,8 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) {
if err != nil {
return nil, fmt.Errorf("failed to listen on %v: %v", addr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
if maxOpenConnections > 0 {
listener = netutil.LimitListener(listener, maxOpenConnections)
}
return listener, nil

View File

@@ -39,8 +39,7 @@ func TestMaxOpenConnections(t *testing.T) {
fmt.Fprint(w, "some body")
})
config := DefaultConfig()
config.MaxOpenConnections = max
l, err := Listen("tcp://127.0.0.1:0", config)
l, err := Listen("tcp://127.0.0.1:0", max)
require.NoError(t, err)
defer l.Close()
go Serve(l, mux, log.TestingLogger(), config) //nolint:errcheck // ignore for tests

View File

@@ -33,7 +33,7 @@ func main() {
rpcserver.RegisterRPCFuncs(mux, routes, logger)
config := rpcserver.DefaultConfig()
listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config)
listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config.MaxOpenConnections)
if err != nil {
tmos.Exit(err.Error())
}

72
state/indexer/doc.go Normal file
View File

@@ -0,0 +1,72 @@
/*
Package indexer defines Tendermint's block and transaction event indexing logic.
Tendermint supports two primary means of block and transaction event indexing:
1. A key-value sink via an embedded database with a proprietary query language.
2. A Postgres-based sink.
An ABCI application can emit events during block and transaction execution in the form
<abci.Event.Type>.<abci.EventAttributeKey>=<abci.EventAttributeValue>
for example "transfer.amount=10000".
An operator can enable one or both of the supported indexing sinks via the
'tx-index.indexer' Tendermint configuration.
Example:
[tx-index]
indexer = ["kv", "psql"]
If an operator wants to completely disable indexing, they may simply just provide
the "null" sink option in the configuration. All other sinks will be ignored if
"null" is provided.
If indexing is enabled, the indexer.Service will iterate over all enabled sinks
and invoke block and transaction indexing via the appropriate IndexBlockEvents
and IndexTxEvents methods.
Note, the "kv" sink is considered deprecated and its query functionality is very
limited, but does allow users to directly query for block and transaction events
against Tendermint's RPC. Instead, operators are encouraged to use the "psql"
indexing sink when more complex queries are required and for reliability purposes
as PostgreSQL can scale.
Prior to starting Tendermint with the "psql" indexing sink enabled, operators
must ensure the following:
1. The "psql" indexing sink is provided in Tendermint's configuration.
2. A 'tx-index.psql-conn' value is provided that contains the PostgreSQL connection URI.
3. The block and transaction event schemas have been created in the PostgreSQL database.
Tendermint provides the block and transaction event schemas in the following
path: state/indexer/sink/psql/schema.sql
To create the schema in a PostgreSQL database, perform the schema query
manually or invoke schema creation via the CLI:
$ psql <flags> -f state/indexer/sink/psql/schema.sql
The "psql" indexing sink prohibits queries via RPC. When using a PostgreSQL sink,
queries can and should be made directly against the database using SQL.
The following are some example SQL queries against the database schema:
* Query for all transaction events for a given transaction hash:
SELECT * FROM tx_events WHERE hash = '3E7D1F...';
* Query for all transaction events for a given block height:
SELECT * FROM tx_events WHERE height = 25;
* Query for transaction events that have a given type (i.e. value wildcard):
SELECT * FROM tx_events WHERE key LIKE '%transfer.recipient%';
Note that if a complete abci.TxResult is needed, you will need to join "tx_events" with
"tx_results" via a foreign key, to obtain contains the raw protobuf-encoded abci.TxResult.
*/
package indexer

View File

@@ -51,43 +51,47 @@ func (is *Service) OnStart() error {
go func() {
for {
msg := <-blockHeadersSub.Out()
select {
case <-blockHeadersSub.Canceled():
return
case msg := <-blockHeadersSub.Out():
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult
if err = batch.Add(&txResult); err != nil {
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err,
)
}
}
if !IndexingEnabled(is.eventSinks) {
continue
}
for _, sink := range is.eventSinks {
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
} else {
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
if err = batch.Add(&txResult); err != nil {
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err,
)
}
}
if len(batch.Ops) > 0 {
err := sink.IndexTxEvents(batch.Ops)
if err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if !IndexingEnabled(is.eventSinks) {
continue
}
for _, sink := range is.eventSinks {
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
} else {
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
}
if len(batch.Ops) > 0 {
err := sink.IndexTxEvents(batch.Ops)
if err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else {
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
}
}
}
}

View File

@@ -164,19 +164,16 @@ func setupDB(t *testing.T) (*dockertest.Pool, error) {
conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
if err = pool.Retry(func() error {
var err error
pSink, psqldb, err = psql.NewEventSink(conn, "test-chainID")
assert.NoError(t, pool.Retry(func() error {
sink, err := psql.NewEventSink(conn, "test-chainID")
if err != nil {
return err
}
pSink = sink
psqldb = sink.DB()
return psqldb.Ping()
}); err != nil {
assert.Error(t, err)
}
}))
resetDB(t)

View File

@@ -1,3 +1,4 @@
// Package psql implements an event sink backed by a PostgreSQL database.
package psql
import (
@@ -24,28 +25,38 @@ const (
DriverName = "postgres"
)
// EventSink is an indexer backend providing the tx/block index services.
// EventSink is an indexer backend providing the tx/block index services. This
// implementation stores records in a PostgreSQL database using the schema
// defined in state/indexer/sink/psql/schema.sql.
type EventSink struct {
store *sql.DB
chainID string
}
func NewEventSink(connStr string, chainID string) (indexer.EventSink, *sql.DB, error) {
// NewEventSink constructs an event sink associated with the PostgreSQL
// database specified by connStr. Events written to the sink are attributed to
// the specified chainID.
func NewEventSink(connStr, chainID string) (*EventSink, error) {
db, err := sql.Open(DriverName, connStr)
if err != nil {
return nil, nil, err
return nil, err
}
return &EventSink{
store: db,
chainID: chainID,
}, db, nil
}, nil
}
func (es *EventSink) Type() indexer.EventSinkType {
return indexer.PSQL
}
// DB returns the underlying Postgres connection used by the sink.
// This is exported to support testing.
func (es *EventSink) DB() *sql.DB { return es.store }
// Type returns the structure type for this sink, which is Postgres.
func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL }
// IndexBlockEvents indexes the specified block header, part of the
// indexer.EventSink interface.
func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
sqlStmt := sq.
Insert(TableEventBlock).
@@ -156,18 +167,22 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
return err
}
// SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
return nil, errors.New("block search is not supported via the postgres event sink")
}
// SearchTxEvents is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("tx search is not supported via the postgres event sink")
}
// GetTxByHash is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
return nil, errors.New("getTxByHash is not supported via the postgres event sink")
}
// HasBlock is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) HasBlock(h int64) (bool, error) {
return false, errors.New("hasBlock is not supported via the postgres event sink")
}
@@ -206,6 +221,5 @@ func indexBlockEvents(
return sqlStmt, nil
}
func (es *EventSink) Stop() error {
return es.store.Close()
}
// Stop closes the underlying PostgreSQL database.
func (es *EventSink) Stop() error { return es.store.Close() }

View File

@@ -341,19 +341,14 @@ func setupDB(t *testing.T) (*dockertest.Pool, error) {
conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
if err = pool.Retry(func() error {
var err error
_, db, err = NewEventSink(conn, chainID)
require.NoError(t, pool.Retry(func() error {
sink, err := NewEventSink(conn, chainID)
if err != nil {
return err
}
db = sink.DB() // set global for test use
return db.Ping()
}); err != nil {
require.NoError(t, err)
}
}))
resetDB(t)

View File

@@ -0,0 +1,65 @@
package sink
import (
"errors"
"strings"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/indexer/sink/kv"
"github.com/tendermint/tendermint/state/indexer/sink/null"
"github.com/tendermint/tendermint/state/indexer/sink/psql"
)
// EventSinksFromConfig constructs a slice of indexer.EventSink using the provided
// configuration.
//
//nolint:lll
func EventSinksFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) ([]indexer.EventSink, error) {
if len(cfg.TxIndex.Indexer) == 0 {
return []indexer.EventSink{null.NewEventSink()}, nil
}
// check for duplicated sinks
sinks := map[string]struct{}{}
for _, s := range cfg.TxIndex.Indexer {
sl := strings.ToLower(s)
if _, ok := sinks[sl]; ok {
return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = struct{}{}
}
eventSinks := []indexer.EventSink{}
for k := range sinks {
switch indexer.EventSinkType(k) {
case indexer.NULL:
// When we see null in the config, the eventsinks will be reset with the
// nullEventSink.
return []indexer.EventSink{null.NewEventSink()}, nil
case indexer.KV:
store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg})
if err != nil {
return nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case indexer.PSQL:
conn := cfg.TxIndex.PsqlConn
if conn == "" {
return nil, errors.New("the psql connection settings cannot be empty")
}
es, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, errors.New("unsupported event sink type")
}
}
return eventSinks, nil
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/gogo/protobuf/proto"
tmtime "github.com/tendermint/tendermint/libs/time"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmversion "github.com/tendermint/tendermint/proto/tendermint/version"
"github.com/tendermint/tendermint/types"
@@ -287,7 +286,7 @@ func (state State) MakeBlock(
// the votes sent by honest processes, i.e., a faulty processes can not arbitrarily increase or decrease the
// computed value.
func MedianTime(commit *types.Commit, validators *types.ValidatorSet) time.Time {
weightedTimes := make([]*tmtime.WeightedTime, len(commit.Signatures))
weightedTimes := make([]*weightedTime, len(commit.Signatures))
totalVotingPower := int64(0)
for i, commitSig := range commit.Signatures {
@@ -298,11 +297,11 @@ func MedianTime(commit *types.Commit, validators *types.ValidatorSet) time.Time
// If there's no condition, TestValidateBlockCommit panics; not needed normally.
if validator != nil {
totalVotingPower += validator.VotingPower
weightedTimes[i] = tmtime.NewWeightedTime(commitSig.Timestamp, validator.VotingPower)
weightedTimes[i] = newWeightedTime(commitSig.Timestamp, validator.VotingPower)
}
}
return tmtime.WeightedMedian(weightedTimes, totalVotingPower)
return weightedMedian(weightedTimes, totalVotingPower)
}
//------------------------------------------------------------------------

46
state/time.go Normal file
View File

@@ -0,0 +1,46 @@
package state
import (
"sort"
"time"
)
// weightedTime for computing a median.
type weightedTime struct {
Time time.Time
Weight int64
}
// newWeightedTime with time and weight.
func newWeightedTime(time time.Time, weight int64) *weightedTime {
return &weightedTime{
Time: time,
Weight: weight,
}
}
// weightedMedian computes weighted median time for a given array of WeightedTime and the total voting power.
func weightedMedian(weightedTimes []*weightedTime, totalVotingPower int64) (res time.Time) {
median := totalVotingPower / 2
sort.Slice(weightedTimes, func(i, j int) bool {
if weightedTimes[i] == nil {
return false
}
if weightedTimes[j] == nil {
return true
}
return weightedTimes[i].Time.UnixNano() < weightedTimes[j].Time.UnixNano()
})
for _, weightedTime := range weightedTimes {
if weightedTime != nil {
if median <= weightedTime.Weight {
res = weightedTime.Time
break
}
median -= weightedTime.Weight
}
}
return
}

View File

@@ -1,54 +1,55 @@
package time
package state
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
tmtime "github.com/tendermint/tendermint/libs/time"
)
func TestWeightedMedian(t *testing.T) {
m := make([]*WeightedTime, 3)
m := make([]*weightedTime, 3)
t1 := Now()
t1 := tmtime.Now()
t2 := t1.Add(5 * time.Second)
t3 := t1.Add(10 * time.Second)
m[2] = NewWeightedTime(t1, 33) // faulty processes
m[0] = NewWeightedTime(t2, 40) // correct processes
m[1] = NewWeightedTime(t3, 27) // correct processes
m[2] = newWeightedTime(t1, 33) // faulty processes
m[0] = newWeightedTime(t2, 40) // correct processes
m[1] = newWeightedTime(t3, 27) // correct processes
totalVotingPower := int64(100)
median := WeightedMedian(m, totalVotingPower)
median := weightedMedian(m, totalVotingPower)
assert.Equal(t, t2, median)
// median always returns value between values of correct processes
assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) &&
(median.Before(t3) || median.Equal(t3)))
m[1] = NewWeightedTime(t1, 40) // correct processes
m[2] = NewWeightedTime(t2, 27) // correct processes
m[0] = NewWeightedTime(t3, 33) // faulty processes
m[1] = newWeightedTime(t1, 40) // correct processes
m[2] = newWeightedTime(t2, 27) // correct processes
m[0] = newWeightedTime(t3, 33) // faulty processes
totalVotingPower = int64(100)
median = WeightedMedian(m, totalVotingPower)
median = weightedMedian(m, totalVotingPower)
assert.Equal(t, t2, median)
// median always returns value between values of correct processes
assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) &&
(median.Before(t2) || median.Equal(t2)))
m = make([]*WeightedTime, 8)
m = make([]*weightedTime, 8)
t4 := t1.Add(15 * time.Second)
t5 := t1.Add(60 * time.Second)
m[3] = NewWeightedTime(t1, 10) // correct processes
m[1] = NewWeightedTime(t2, 10) // correct processes
m[5] = NewWeightedTime(t2, 10) // correct processes
m[4] = NewWeightedTime(t3, 23) // faulty processes
m[0] = NewWeightedTime(t4, 20) // correct processes
m[7] = NewWeightedTime(t5, 10) // faulty processes
m[3] = newWeightedTime(t1, 10) // correct processes
m[1] = newWeightedTime(t2, 10) // correct processes
m[5] = newWeightedTime(t2, 10) // correct processes
m[4] = newWeightedTime(t3, 23) // faulty processes
m[0] = newWeightedTime(t4, 20) // correct processes
m[7] = newWeightedTime(t5, 10) // faulty processes
totalVotingPower = int64(83)
median = WeightedMedian(m, totalVotingPower)
median = weightedMedian(m, totalVotingPower)
assert.Equal(t, t3, median)
// median always returns value between values of correct processes
assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) &&

View File

@@ -9,6 +9,9 @@ import (
)
func Start(testnet *e2e.Testnet) error {
if len(testnet.Nodes) == 0 {
return fmt.Errorf("no nodes in testnet")
}
// Nodes are already sorted by name. Sort them by name then startAt,
// which gives the overall order startAt, mode, name.
@@ -29,9 +32,7 @@ func Start(testnet *e2e.Testnet) error {
sort.SliceStable(nodeQueue, func(i, j int) bool {
return nodeQueue[i].StartAt < nodeQueue[j].StartAt
})
if len(nodeQueue) == 0 {
return fmt.Errorf("no nodes in testnet")
}
if nodeQueue[0].StartAt > 0 {
return fmt.Errorf("no initial nodes in testnet")
}
@@ -93,10 +94,8 @@ func Start(testnet *e2e.Testnet) error {
}
}
logger.Info(fmt.Sprintf("Starting node %v at height %v...", node.Name, node.StartAt))
if _, _, err := waitForHeight(testnet, node.StartAt); err != nil {
return err
}
logger.Info("Starting catch up node", "node", node.Name, "height", node.StartAt)
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err
}