Merge branch 'master' into marko/int64-

This commit is contained in:
Marko Baricevic
2021-02-10 09:53:43 +01:00
31 changed files with 537 additions and 1246 deletions

20
.github/CODEOWNERS vendored
View File

@@ -1,25 +1,25 @@
# CODEOWNERS: https://help.github.com/articles/about-codeowners/
# Everything goes through the following "global owners" by default.
# Everything goes through the following "global owners" by default.
# Unless a later match takes precedence, these three will be
# requested for review when someone opens a PR.
# requested for review when someone opens a PR.
# Note that the last matching pattern takes precedence, so
# global owners are only requested if there isn't a more specific
# codeowner specified below. For this reason, the global codeowners
# codeowner specified below. For this reason, the global codeowners
# are often repeated in package-level definitions.
* @alexanderbez @ebuchman @erikgrinaker @melekes @tessr
* @alexanderbez @ebuchman @melekes @tessr
# Overrides for tooling packages
.github/ @marbar3778 @alexanderbez @ebuchman @erikgrinaker @melekes @tessr
DOCKER/ @marbar3778 @alexanderbez @ebuchman @erikgrinaker @melekes @tessr
.github/ @marbar3778 @alexanderbez @ebuchman @melekes @tessr
DOCKER/ @marbar3778 @alexanderbez @ebuchman @melekes @tessr
# Overrides for core Tendermint packages
abci/ @marbar3778 @alexanderbez @ebuchman @erikgrinaker @melekes @tessr
# Overrides for core Tendermint packages
abci/ @marbar3778 @alexanderbez @ebuchman @melekes @tessr
evidence/ @cmwaters @ebuchman @melekes @tessr
light/ @cmwaters @melekes @ebuchman @tessr
# Overrides for docs
*.md @marbar3778 @alexanderbez @ebuchman @erikgrinaker @melekes @tessr
docs/ @marbar3778 @alexanderbez @ebuchman @erikgrinaker @melekes @tessr
*.md @marbar3778 @alexanderbez @ebuchman @melekes @tessr
docs/ @marbar3778 @alexanderbez @ebuchman @melekes @tessr

View File

@@ -25,7 +25,7 @@ jobs:
make build-docs
- name: Deploy 🚀
uses: JamesIves/github-pages-deploy-action@3.7.1
uses: JamesIves/github-pages-deploy-action@4.0.0
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BRANCH: gh-pages

View File

@@ -60,15 +60,15 @@ jobs:
- name: Set crashers count
working-directory: test/fuzz
run: echo "::set-output name=crashers-count::$(find . -type d -name 'crashers' | xargs -I % sh -c 'ls % | wc -l' | awk '{total += $1} END {print total}')"
run: echo "::set-output name=count::$(find . -type d -name 'crashers' | xargs -I % sh -c 'ls % | wc -l' | awk '{total += $1} END {print total}')"
id: set-crashers-count
outputs:
crashers-count: ${{ steps.set-crashers-count.outputs.crashers-count }}
crashers-count: ${{ steps.set-crashers-count.outputs.count }}
fuzz-nightly-fail:
needs: fuzz-nightly-test
if: ${{ needs.set-crashers-count.outputs.crashers-count != 0 }}
if: ${{ needs.fuzz-nightly-test.outputs.crashers-count != 0 }}
runs-on: ubuntu-latest
steps:
- name: Notify Slack if any crashers

View File

@@ -36,7 +36,7 @@ jobs:
- name: install
run: make install install_abci
if: "env.GIT_DIFF != ''"
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@@ -44,7 +44,7 @@ jobs:
${{ runner.os }}-go-
if: env.GIT_DIFF
# Cache binaries for use by other jobs
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/bin
key: ${{ runner.os }}-${{ github.sha }}-tm-binary
@@ -65,14 +65,14 @@ jobs:
**/**.go
go.mod
go.sum
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
if: env.GIT_DIFF
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/bin
key: ${{ runner.os }}-${{ github.sha }}-tm-binary
@@ -97,14 +97,14 @@ jobs:
**/**.go
go.mod
go.sum
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
if: env.GIT_DIFF
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/bin
key: ${{ runner.os }}-${{ github.sha }}-tm-binary
@@ -128,14 +128,14 @@ jobs:
**/**.go
go.mod
go.sum
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
if: env.GIT_DIFF
- uses: actions/cache@v2.1.3
- uses: actions/cache@v2.1.4
with:
path: ~/go/bin
key: ${{ runner.os }}-${{ github.sha }}-tm-binary

View File

@@ -53,9 +53,9 @@ issues:
linters-settings:
dogsled:
max-blank-identifiers: 3
maligned:
suggest-new: true
# govet:
# check-shadowing: true
golint:
min-confidence: 0
maligned:
suggest-new: true
misspell:
locale: US

View File

@@ -28,10 +28,13 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [proto/p2p] Renamed `DefaultNodeInfo` and `DefaultNodeInfoOther` to `NodeInfo` and `NodeInfoOther` (@erikgrinaker)
- [proto/p2p] Rename `NodeInfo.default_node_id` to `node_id` (@erikgrinaker)
- [libs/os] Kill() and {Must,}{Read,Write}File() functions have been removed. (@alessio)
- [store] \#5848 Remove block store state in favor of using the db iterators directly (@cmwaters) - [state] \#5864 Use an iterator when pruning state (@cmwaters)
- [store] \#5848 Remove block store state in favor of using the db iterators directly (@cmwaters)
- [state] \#5864 Use an iterator when pruning state (@cmwaters)
- [types] \#6023 Remove `tm2pb.Header`, `tm2pb.BlockID`, `tm2pb.PartSetHeader` and `tm2pb.NewValidatorUpdate`.
- Each of the above types has a `ToProto` and `FromProto` method or function which replaced this logic.
- [rpc/client/http] \#6022 Change `timeout` type to `time.Duration` in `NewWithTimeout`
- [light] \#6054 Move `MaxRetryAttempt` option from client to provider.
- `NewWithOptions` now sets the max retry attempts and timeouts (@cmwaters)
- Blockchain Protocol
@@ -54,6 +57,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [cli] \#5772 `gen_node_key` output now contains node ID (`id` field) (@melekes)
- [blockchain/v2] \#5774 Send status request when new peer joins (@melekes)
- [consensus] \#5792 Deprecates the `time_iota_ms` consensus parameter, to reduce the bug surface. The parameter is no longer used. (@valardragon)
- [store] \#5888 store.SaveBlock saves using batches instead of transactions for now to improve ACID properties. This is a quick fix for underlying issues around tm-db and ACID guarantees. (@githubsands)
- [consensus] \#5987 Remove `time_iota_ms` from consensus params. Merge `tmproto.ConsensusParams` and `abci.ConsensusParams`. (@marbar3778)
- [types] \#5994 Reduce the use of protobuf types in core logic. (@marbar3778)
- `ConsensusParams`, `BlockParams`, `ValidatorParams`, `EvidenceParams`, `VersionParams`, `sm.Version` and `version.Consensus` have become native types. They still utilize protobuf when being sent over the wire or written to disk.

View File

@@ -520,7 +520,8 @@ OUTER_LOOP:
}
// If the peer is on a previous height that we have, help catch up.
if (0 < prs.Height) && (prs.Height < rs.Height) && (prs.Height >= conR.conS.blockStore.Base()) {
blockStoreBase := conR.conS.blockStore.Base()
if blockStoreBase > 0 && 0 < prs.Height && prs.Height < rs.Height && prs.Height >= blockStoreBase {
heightLogger := logger.With("height", prs.Height)
// if we never received the commit message from the peer, the block parts wont be initialized
@@ -528,7 +529,7 @@ OUTER_LOOP:
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
heightLogger.Error("Failed to load block meta",
"blockstoreBase", conR.conS.blockStore.Base(), "blockstoreHeight", conR.conS.blockStore.Height())
"blockstoreBase", blockStoreBase, "blockstoreHeight", conR.conS.blockStore.Height())
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
} else {
ps.InitProposalBlockParts(blockMeta.BlockID.PartSetHeader)
@@ -674,7 +675,8 @@ OUTER_LOOP:
// Catchup logic
// If peer is lagging by more than 1, send Commit.
if prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= conR.conS.blockStore.Base() {
blockStoreBase := conR.conS.blockStore.Base()
if blockStoreBase > 0 && prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase {
// Load the block commit for prs.Height,
// which contains precommit signatures for prs.Height.
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {

View File

@@ -8,6 +8,8 @@
- 2020-11-16: Notes on recommended reactor implementation patterns, approve ADR (@erikgrinaker)
- 2021-02-04: Update with new P2P core and Transport API changes (@erikgrinaker).
## Context
In [ADR 061](adr-061-p2p-refactor-scope.md) we decided to refactor the peer-to-peer (P2P) networking stack. The first phase is to redesign and refactor the internal P2P architecture, while retaining protocol compatibility as far as possible.
@@ -20,13 +22,15 @@ Several variations of the proposed design were considered, including e.g. callin
There were also proposals to use LibP2P instead of maintaining our own P2P stack, which were rejected (for now) in [ADR 061](adr-061-p2p-refactor-scope.md).
The initial version of this ADR had a byte-oriented multi-stream transport API, but this had to be abandoned/postponed to maintain backwards-compatibility with the existing MConnection protocol which is message-oriented. See the rejected RFC in [tendermint/spec#227](https://github.com/tendermint/spec/pull/227) for details.
## Decision
The P2P stack will be redesigned as a message-oriented architecture, primarily relying on Go channels for communication and scheduling. It will use IO stream transports to exchange raw bytes with individual peers, bidirectional peer-addressable channels to send and receive Protobuf messages, and a router to route messages between reactors and peers. Message passing is asynchronous with at-most-once delivery.
The P2P stack will be redesigned as a message-oriented architecture, primarily relying on Go channels for communication and scheduling. It will use a message-oriented transport to binary messages with individual peers, bidirectional peer-addressable channels to send and receive Protobuf messages, a router to route messages between reactors and peers, and a peer manager to manage peer lifecycle information. Message passing is asynchronous with at-most-once delivery.
## Detailed Design
This ADR is primarily concerned with the architecture and interfaces of the P2P stack, not implementation details. Separate ADRs may be submitted for individual components, since implementation may be non-trivial. The interfaces described here should therefore be considered a rough architecture outline, not a complete and final design.
This ADR is primarily concerned with the architecture and interfaces of the P2P stack, not implementation details. The interfaces described here should therefore be considered a rough architecture outline, not a complete and final design.
Primary design objectives have been:
@@ -39,10 +43,10 @@ Primary design objectives have been:
The main abstractions in the new stack are:
* `peer`: A node in the network, uniquely identified by a `PeerID` and stored in a `peerStore`.
* `Transport`: An arbitrary mechanism to exchange bytes with a peer using IO `Stream`s across a `Connection`.
* `Channel`: A bidirectional channel to asynchronously exchange Protobuf messages with peers addressed with `PeerID`.
* `Transport`: An arbitrary mechanism to exchange binary messages with a peer across a `Connection`.
* `Channel`: A bidirectional channel to asynchronously exchange Protobuf messages with peers using node ID addressing.
* `Router`: Maintains transport connections to relevant peers and routes channel messages.
* `PeerManager`: Manages peer lifecycle information, e.g. deciding which peers to dial and when, using a `peerStore` for storage.
* Reactor: A design pattern loosely defined as "something which listens on a channel and reacts to messages".
These abstractions are illustrated in the following diagram (representing the internals of node A) and described in detail below.
@@ -51,33 +55,42 @@ These abstractions are illustrated in the following diagram (representing the in
### Transports
Transports are arbitrary mechanisms for exchanging raw bytes with a peer. For example, a gRPC transport would connect to a peer over TCP/IP and send data using the gRPC protocol, while an in-memory transport might communicate with a peer running in another goroutine using internal byte buffers. Note that transports don't have a notion of a `peer` as such - instead, they communicate with an arbitrary endpoint address (e.g. IP address and port number), to decouple them from the rest of the P2P stack.
Transports are arbitrary mechanisms for exchanging binary messages with a peer. For example, a gRPC transport would connect to a peer over TCP/IP and send data using the gRPC protocol, while an in-memory transport might communicate with a peer running in another goroutine using internal Go channels. Note that transports don't have a notion of a "peer" or "node" as such - instead, they establish connections between arbitrary endpoint addresses (e.g. IP address and port number), to decouple them from the rest of the P2P stack.
Transports must satisfy the following requirements:
* Be connection-oriented, and support both listening for inbound connections and making outbound connections using endpoint addresses.
* Support multiple logical IO streams within a single connection, to take full advantage of protocols with native stream support. For example, QUIC supports multiple independent streams, while HTTP/2 and MConn multiplex logical streams onto a single TCP connection.
* Support sending binary messages with distinct channel IDs (although channels and channel IDs are a higher-level application protocol concept explained in the Router section, they are threaded through the transport layer as well for backwards compatibilty with the existing MConnection protocol).
* Provide the public key of the peer, and possibly encrypt or sign the traffic as appropriate. This should be compared with known data (e.g. the peer ID) to authenticate the peer and avoid man-in-the-middle attacks.
* Exchange the MConnection `NodeInfo` and public key via a node handshake, and possibly encrypt or sign the traffic as appropriate.
The initial transport implementation will be a port of the current MConn protocol currently used by Tendermint, and should be backwards-compatible at the wire level as far as possible. This will be followed by an in-memory transport for testing, and a QUIC transport that may eventually replace MConn.
The initial transport is a port of the current MConnection protocol currently used by Tendermint, and should be backwards-compatible at the wire level. An in-memory transport for testing has also been implemented. There are plans to explore a QUIC transport that may replace the MConnection protocol.
The `Transport` interface is:
The `Transport` interface is as follows:
```go
// Transport is an arbitrary mechanism for exchanging bytes with a peer.
// Transport is a connection-oriented mechanism for exchanging data with a peer.
type Transport interface {
// Accept waits for the next inbound connection on a listening endpoint.
Accept(context.Context) (Connection, error)
// Protocols returns the protocols supported by the transport. The Router
// uses this to pick a transport for an Endpoint.
Protocols() []Protocol
// Endpoints returns the local endpoints the transport is listening on, if any.
// How to listen is transport-dependent, e.g. MConnTransport uses Listen() while
// MemoryTransport starts listening via MemoryNetwork.CreateTransport().
Endpoints() []Endpoint
// Accept waits for the next inbound connection on a listening endpoint, blocking
// until either a connection is available or the transport is closed. On closure,
// io.EOF is returned and further Accept calls are futile.
Accept() (Connection, error)
// Dial creates an outbound connection to an endpoint.
Dial(context.Context, Endpoint) (Connection, error)
// Endpoints lists endpoints the transport is listening on. Any endpoint IP
// addresses do not need to be normalized in any way (e.g. 0.0.0.0 is
// valid), as they should be preprocessed before being advertised.
Endpoints() []Endpoint
// Close stops accepting new connections, but does not close active connections.
Close() error
}
```
@@ -91,21 +104,24 @@ The `Endpoint` struct is:
```go
// Endpoint represents a transport connection endpoint, either local or remote.
//
// Endpoints are not necessarily networked (see e.g. MemoryTransport) but all
// networked endpoints must use IP as the underlying transport protocol to allow
// e.g. IP address filtering. Either IP or Path (or both) must be set.
type Endpoint struct {
// Protocol specifies the transport protocol, used by the router to pick a
// transport for an endpoint.
// Protocol specifies the transport protocol.
Protocol Protocol
// Path is an optional, arbitrary transport-specific path or identifier.
Path string
// IP is an IP address (v4 or v6) to connect to. If set, this defines the
// endpoint as a networked endpoint.
IP net.IP
// Port is a network port (either TCP or UDP). If not set, a default port
// may be used depending on the protocol.
// Port is a network port (either TCP or UDP). If 0, a default port may be
// used depending on the protocol.
Port uint16
// Path is an optional transport-specific path or identifier.
Path string
}
// Protocol identifies a transport protocol.
@@ -114,21 +130,30 @@ type Protocol string
Endpoints are arbitrary transport-specific addresses, but if they are networked they must use IP addresses and thus rely on IP as a fundamental packet routing protocol. This enables policies for address discovery, advertisement, and exchange - for example, a private `192.168.0.0/24` IP address should only be advertised to peers on that IP network, while the public address `8.8.8.8` may be advertised to all peers. Similarly, any port numbers if given must represent TCP and/or UDP port numbers, in order to use [UPnP](https://en.wikipedia.org/wiki/Universal_Plug_and_Play) to autoconfigure e.g. NAT gateways.
Non-networked endpoints (without an IP address) are considered local, and will only be advertised to other peers connecting via the same protocol. For example, an in-memory transport used for testing might have `Endpoint{Protocol: "memory", Path: "foo"}` as an address for the node "foo", and this should only be advertised to other nodes using `Protocol: "memory"`.
Non-networked endpoints (without an IP address) are considered local, and will only be advertised to other peers connecting via the same protocol. For example, the in-memory transport used for testing uses `Endpoint{Protocol: "memory", Path: "foo"}` as an address for the node "foo", and this should only be advertised to other nodes using `Protocol: "memory"`.
#### Connections and Streams
#### Connections
A connection represents an established transport connection between two endpoints (and thus two nodes), which can be used to exchange bytes via logically distinct IO streams. Connections are set up either via `Transport.Dial()` (outbound) or `Transport.Accept()` (inbound). The caller is responsible for verifying the remote peer's public key as returned by the connection, following the current MConn protocol behavior for now.
A connection represents an established transport connection between two endpoints (i.e. two nodes), which can be used to exchange binary messages with logical channel IDs (corresponding to the higher-level channel IDs used in the router). Connections are set up either via `Transport.Dial()` (outbound) or `Transport.Accept()` (inbound).
Data is exchanged over IO streams created with `Connection.Stream()`. These implement the standard Go `io.Reader` and `io.Writer` interfaces to read and write bytes. Transports are free to choose how to implement such streams, e.g. by taking advantage of native stream support in the underlying protocol or through multiplexing.
Once a connection is esablished, `Transport.Handshake()` must be called to perform a node handshake, exchanging node info and public keys to verify node identities. Node handshakes should not really be part of the transport layer (it's an application protocol concern), this exists for backwards-compatibility with the existing MConnection protocol which conflates the two. `NodeInfo` is part of the existing MConnection protocol, but does not appear to be documented in the specification -- refer to the Go codebase for details.
`Connection` and the related `Stream` interfaces are:
The `Connection` interface is shown below. It omits certain additions that are currently implemented for backwards compatibility with the legacy P2P stack and are planned to be removed before the final release.
```go
// Connection represents an established connection between two endpoints.
type Connection interface {
// Stream creates a new logically distinct IO stream within the connection.
Stream() (Stream, error)
// Handshake executes a node handshake with the remote peer. It must be
// called once the connection is established, and returns the remote peer's
// node info and public key. The caller is responsible for validation.
Handshake(context.Context, NodeInfo, crypto.PrivKey) (NodeInfo, crypto.PubKey, error)
// ReceiveMessage returns the next message received on the connection,
// blocking until one is available. Returns io.EOF if closed.
ReceiveMessage() (ChannelID, []byte, error)
// SendMessage sends a message on the connection. Returns io.EOF if closed.
SendMessage(ChannelID, []byte) error
// LocalEndpoint returns the local endpoint for the connection.
LocalEndpoint() Endpoint
@@ -136,169 +161,181 @@ type Connection interface {
// RemoteEndpoint returns the remote endpoint for the connection.
RemoteEndpoint() Endpoint
// PubKey returns the public key of the remote peer.
PubKey() crypto.PubKey
// Close closes the connection.
Close() error
}
// Stream represents a single logical IO stream within a connection.
type Stream interface {
io.Reader // Read([]byte) (int, error)
io.Writer // Write([]byte) (int, error)
io.Closer // Close() error
}
```
### Peers
This ADR initially proposed a byte-oriented multi-stream connection API that follows more typical networking API conventions (using e.g. `io.Reader` and `io.Writer` interfaces which easily compose with other libraries). This would also allow moving the responsibility for message framing, node handshakes, and traffic scheduling to the common router instead of reimplementing this across transports, and would allow making better use of multi-stream protocols such as QUIC. However, this would require minor breaking changes to the MConnection protocol which were rejected, see [tendermint/spec#227](https://github.com/tendermint/spec/pull/227) for details. This should be revisited when starting work on a QUIC transport.
Peers are other Tendermint network nodes. Each peer is identified by a unique `PeerID`, and has a set of `PeerAddress` addresses expressed as URLs that they can be reached at. Examples of peer addresses might be e.g.:
### Peer Management
* `mconn://b10c@host.domain.com:25567/path`
* `unix:///var/run/tendermint/peer.sock`
* `memory:testpeer`
Peers are other Tendermint nodes. Each peer is identified by a unique `NodeID` (tied to the node's private key).
Addresses are resolved into one or more transport endpoints, e.g. by resolving DNS hostnames into IP addresses (which should be refreshed periodically). Peers should always be expressed as address URLs, and never as endpoints which are a lower-level construct.
#### Peer Addresses
Nodes have one or more `NodeAddress` addresses expressed as URLs that they can be reached at. Examples of node addresses might be e.g.:
* `mconn://nodeid@host.domain.com:25567/path`
* `memory:nodeid`
Addresses are resolved into one or more transport endpoints, e.g. by resolving DNS hostnames into IP addresses. Peers should always be expressed as address URLs rather than endpoints (which are a lower-level transport construct).
```go
// PeerID is a unique peer ID, generally expressed in hex form.
type PeerID []byte
// NodeID is a hex-encoded crypto.Address. It must be lowercased
// (for uniqueness) and of length 40.
type NodeID string
// PeerAddress is a peer address URL. The User field, if set, gives the
// hex-encoded remote PeerID, which should be verified with the remote peer's
// public key as returned by the connection.
type PeerAddress url.URL
// Resolve resolves a PeerAddress into a set of Endpoints, typically by
// expanding out a DNS name in Host to its IP addresses. Field mapping:
// NodeAddress is a node address URL. It differs from a transport Endpoint in
// that it contains the node's ID, and that the address hostname may be resolved
// into multiple IP addresses (and thus multiple endpoints).
//
// Scheme → Endpoint.Protocol
// Host → Endpoint.IP
// Port → Endpoint.Port
// Path+Query+Fragment,Opaque → Endpoint.Path
//
func (a PeerAddress) Resolve(ctx context.Context) []Endpoint { return nil }
```
The P2P stack needs to track a lot of internal information about peers, such as endpoints, status, priorities, and so on. This is done in an internal `peer` struct, which should not be exposed outside of the `p2p` package (e.g. to reactors) in order to avoid race conditions and lock contention - other packages should use `PeerID`.
The `peer` struct might look like the following, but is intentionally underspecified and will depend on implementation requirements (for example, it will almost certainly have to track statistics about connection failures and retries):
```go
// peer tracks internal status information about a peer.
type peer struct {
ID PeerID
Status PeerStatus
Priority PeerPriority
Endpoints map[PeerAddress][]Endpoint // Resolved endpoints by address.
// If the URL is opaque, i.e. of the form "scheme:opaque", then the opaque part
// is expected to contain a node ID.
type NodeAddress struct {
NodeID NodeID
Protocol Protocol
Hostname string
Port uint16
Path string
}
// PeerStatus specifies peer statuses.
// ParseNodeAddress parses a node address URL into a NodeAddress, normalizing
// and validating it.
func ParseNodeAddress(urlString string) (NodeAddress, error)
// Resolve resolves a NodeAddress into a set of Endpoints, e.g. by expanding
// out a DNS hostname to IP addresses.
func (a NodeAddress) Resolve(ctx context.Context) ([]Endpoint, error)
```
#### Peer Manager
The P2P stack needs to track a lot of internal state about peers, such as their addresses, connection state, priorities, availability, failures, retries, and so on. This responsibility has been separated out to a `PeerManager`, which track this state for the `Router` (but does not maintain the actual transport connections themselves, which is the router's responsibility).
The `PeerManager` is a synchronous state machine, where all state transitions are serialized (implemented as synchronous method calls holding an exclusive mutex lock). Most peer state is intentionally kept internal, stored in a `peerStore` database that persists it as appropriate, and the external interfaces pass the minimum amount of information necessary in order to avoid shared state between router goroutines. This design significantly simplifies the model, making it much easier to reason about and test than if it was baked into the asynchronous ball of concurrency that the P2P networking core must necessarily be. As peer lifecycle events are expected to be relatively infrequent, this should not significantly impact performance either.
The `Router` uses the `PeerManager` to request which peers to dial and evict, and reports in with peer lifecycle events such as connections, disconnections, and failures as they occur. The manager can reject these events (e.g. reject an inbound connection) by returning errors. This happens as follows:
* Outbound connections, via `Transport.Dial`:
* `DialNext()`: returns a peer address to dial, or blocks until one is available.
* `DialFailed()`: reports a peer dial failure.
* `Dialed()`: reports a peer dial success.
* `Ready()`: reports the peer as routed and ready.
* `Disconnected()`: reports a peer disconnection.
* Inbound connections, via `Transport.Accept`:
* `Accepted()`: reports an inbound peer connection.
* `Ready()`: reports the peer as routed and ready.
* `Disconnected()`: reports a peer disconnection.
* Evictions, via `Connection.Close`:
* `EvictNext()`: returns a peer to disconnect, or blocks until one is available.
* `Disconnected()`: reports a peer disconnection.
These calls have the following interface:
```go
// DialNext returns a peer address to dial, blocking until one is available.
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error)
// DialFailed reports a dial failure for the given address.
func (m *PeerManager) DialFailed(address NodeAddress) error
// Dialed reports a successful outbound connection to the given address.
func (m *PeerManager) Dialed(address NodeAddress) error
// Accepted reports a successful inbound connection from the given node.
func (m *PeerManager) Accepted(peerID NodeID) error
// Ready reports the peer as fully routed and ready for use.
func (m *PeerManager) Ready(peerID NodeID) error
// EvictNext returns a peer ID to disconnect, blocking until one is available.
func (m *PeerManager) EvictNext(ctx context.Context) (NodeID, error)
// Disconnected reports a peer disconnection.
func (m *PeerManager) Disconnected(peerID NodeID) error
```
Internally, the `PeerManager` uses a numeric peer score to prioritize peers, e.g. when deciding which peers to dial next. The scoring policy has not yet been implemented, but should take into account e.g. node configuration such a `persistent_peers`, uptime and connection failures, performance, and so on. The manager will also attempt to automatically upgrade to better-scored peers by evicting lower-scored peers when a better one becomes available (e.g. when a persistent peer comes back online after an outage).
The `PeerManager` should also have an API for reporting peer behavior from reactors that affects its score (e.g. signing a block increases the score, double-voting decreases it or even bans the peer), but this has not yet been designed and implemented.
Additionally, the `PeerManager` provides `PeerUpdates` subscriptions that will receive `PeerUpdate` events whenever significant peer state changes happen. Reactors can use these e.g. to know when peers are connected or disconnected, and take appropriate action. This is currently fairly minimal:
```go
// Subscribe subscribes to peer updates. The caller must consume the peer updates
// in a timely fashion and close the subscription when done, to avoid stalling the
// PeerManager as delivery is semi-synchronous, guaranteed, and ordered.
func (m *PeerManager) Subscribe() *PeerUpdates
// PeerUpdate is a peer update event sent via PeerUpdates.
type PeerUpdate struct {
NodeID NodeID
Status PeerStatus
}
// PeerStatus is a peer status.
type PeerStatus string
const (
PeerStatusNew = "new" // New peer which we haven't tried to contact yet.
PeerStatusUp = "up" // Peer which we have an active connection to.
PeerStatusDown = "down" // Peer which we're temporarily disconnected from.
PeerStatusRemoved = "removed" // Peer which has been removed.
PeerStatusBanned = "banned" // Peer which is banned for misbehavior.
PeerStatusUp PeerStatus = "up" // Connected and ready.
PeerStatusDown PeerStatus = "down" // Disconnected.
)
// PeerPriority specifies peer priorities.
type PeerPriority int
// PeerUpdates is a real-time peer update subscription.
type PeerUpdates struct { ... }
const (
PeerPriorityNormal PeerPriority = iota + 1
PeerPriorityValidator
PeerPriorityPersistent
)
// Updates returns a channel for consuming peer updates.
func (pu *PeerUpdates) Updates() <-chan PeerUpdate
// Close closes the peer updates subscription.
func (pu *PeerUpdates) Close()
```
Peer information is stored in a `peerStore`, which may be persisted in an underlying database, and will replace the current address book either partially or in full. It is kept internal to avoid race conditions and tight coupling, and should at the very least contain basic CRUD functionality as outlined below, but will likely need additional functionality and is intentionally underspecified:
```go
// peerStore contains information about peers, possibly persisted to disk.
type peerStore struct {
peers map[string]*peer // Entire set in memory, with PeerID.String() keys.
db dbm.DB // Database for persistence, if non-nil.
}
func (p *peerStore) Delete(id PeerID) error { return nil }
func (p *peerStore) Get(id PeerID) (peer, bool) { return peer{}, false }
func (p *peerStore) List() []peer { return nil }
func (p *peerStore) Set(peer peer) error { return nil }
```
Peer address detection, advertisement and exchange (including detection of externally-reachable addresses via e.g. NAT gateways) is out of scope for this ADR, but may be covered in a separate ADR. The current PEX reactor should probably be absorbed into the core P2P stack and protocol instead of running as a separate reactor, since this needs to mutate the core peer data structures and will thus be tightly coupled with the router.
The `PeerManager` will also be responsible for providing peer information to the PEX reactor that can be gossipped to other nodes. This requires an improved system for peer address detection and advertisement, that e.g. reliably detects peer and self addresses and only gossips private network addresses to other peers on the same network, but this system has not yet been fully designed and implemented.
### Channels
While low-level data exchange happens via transport IO streams, the high-level API is based on a bidirectional `Channel` that can send and receive Protobuf messages addressed by `PeerID`. A channel is identified by an arbitrary `ChannelID` identifier, and can exchange Protobuf messages of one specific type (since the type to unmarshal into must be known). Message delivery is asynchronous and at-most-once.
While low-level data exchange happens via the `Transport`, the high-level API is based on a bidirectional `Channel` that can send and receive Protobuf messages addressed by `NodeID`. A channel is identified by an arbitrary `ChannelID` identifier, and can exchange Protobuf messages of one specific type (since the type to unmarshal into must be predefined). Message delivery is asynchronous and at-most-once.
The channel can also be used to report peer errors, e.g. when receiving an invalid or malignant message. This may cause the peer to be disconnected or banned depending on the router's policy.
The channel can also be used to report peer errors, e.g. when receiving an invalid or malignant message. This may cause the peer to be disconnected or banned depending on `PeerManager` policy, but should probably be replaced by a broader peer behavior API that can also report good behavior.
A `Channel` has this interface:
```go
// Channel is a bidirectional channel for Protobuf message exchange with peers.
type Channel struct {
// ID contains the channel ID.
ID ChannelID
// messageType specifies the type of messages exchanged via the channel, and
// is used e.g. for automatic unmarshaling.
messageType proto.Message
// In is a channel for receiving inbound messages. Envelope.From is always
// set.
In <-chan Envelope
// Out is a channel for sending outbound messages. Envelope.To or Broadcast
// must be set, otherwise the message is discarded.
Out chan<- Envelope
// Error is a channel for reporting peer errors to the router, typically used
// when peers send an invalid or malignant message.
Error chan<- PeerError
}
// Close closes the channel, and is equivalent to close(Channel.Out). This will
// cause Channel.In to be closed when appropriate. The ID can then be reused.
func (c *Channel) Close() error { return nil }
// ChannelID is an arbitrary channel ID.
type ChannelID uint16
// Channel is a bidirectional channel to exchange Protobuf messages with peers.
type Channel struct {
ID ChannelID // Channel ID.
In <-chan Envelope // Inbound messages (peers to reactors).
Out chan<- Envelope // outbound messages (reactors to peers)
Error chan<- PeerError // Peer error reporting.
messageType proto.Message // Channel's message type, for e.g. unmarshalling.
}
// Close closes the channel, also closing Out and Error.
func (c *Channel) Close() error
// Envelope specifies the message receiver and sender.
type Envelope struct {
From PeerID // Message sender, or empty for outbound messages.
To PeerID // Message receiver, or empty for inbound messages.
Broadcast bool // Send message to all connected peers, ignoring To.
Message proto.Message // Payload.
From NodeID // Sender (empty if outbound).
To NodeID // Receiver (empty if inbound).
Broadcast bool // Send to all connected peers, ignoring To.
Message proto.Message // Message payload.
}
// PeerError is a peer error reported by a reactor via the Error channel. The
// severity may cause the peer to be disconnected or banned depending on policy.
// PeerError is a peer error reported via the Error channel.
type PeerError struct {
PeerID PeerID
NodeID NodeID
Err error
Severity PeerErrorSeverity
}
// PeerErrorSeverity determines the severity of a peer error.
type PeerErrorSeverity string
const (
PeerErrorSeverityLow PeerErrorSeverity = "low" // Mostly ignored.
PeerErrorSeverityHigh PeerErrorSeverity = "high" // May disconnect.
PeerErrorSeverityCritical PeerErrorSeverity = "critical" // Ban.
)
```
A channel can reach any connected peer, and is implemented using transport streams against each individual peer, with an initial handshake to exchange the channel ID and any other metadata. The channel will automatically (un)marshal Protobuf to byte slices and use length-prefixed framing (the de facto standard for Protobuf streams) when writing them to the stream.
Message scheduling and queueing is left as an implementation detail, and can use any number of algorithms such as FIFO, round-robin, priority queues, etc. Since message delivery is not guaranteed, both inbound and outbound messages may be dropped, buffered, or blocked as appropriate.
A channel can reach any connected peer, and will automatically (un)marshal the Protobuf messages. Message scheduling and queueing is a `Router` implementation concern, and can use any number of algorithms such as FIFO, round-robin, priority queues, etc. Since message delivery is not guaranteed, both inbound and outbound messages may be dropped, buffered, reordered, or blocked as appropriate.
Since a channel can only exchange messages of a single type, it is often useful to use a wrapper message type with e.g. a Protobuf `oneof` field that specifies a set of inner message types that it can contain. The channel can automatically perform this (un)wrapping if the outer message type implements the `Wrapper` interface (see [Reactor Example](#reactor-example) for an example):
@@ -308,6 +345,8 @@ Since a channel can only exchange messages of a single type, it is often useful
// automatically (un)wrap passed messages using the container type, such that
// the channel can transparently support multiple message types.
type Wrapper interface {
proto.Message
// Wrap will take a message and wrap it in this one.
Wrap(proto.Message) error
@@ -318,39 +357,107 @@ type Wrapper interface {
### Routers
The router manages all P2P networking for a node, and is responsible for keeping track of network peers, maintaining transport connections, and routing channel messages. As such, it must do e.g. connection retries and backoff, message QoS scheduling and backpressure, peer quality assessments, and endpoint detection and advertisement. In addition, the router provides a mechanism to subscribe to peer updates (e.g. peers connecting or disconnecting), and handles reported peer errors from reactors.
The router exeutes P2P networking for a node, taking instructions from and reporting events to the `PeerManager`, maintaining transport connections to peers, and routing messages between channels and peers.
The implementation of the router is likely to be non-trivial, and is intentionally unspecified here. A separate ADR will likely be submitted for this. It is unclear whether message routing/scheduling and peer lifecycle management can be split into two separate components, or if these need to be tightly coupled.
Practically all concurrency in the P2P stack has been moved into the router and reactors, while as many other responsibilities as possible have been moved into separate components such as the `Transport` and `PeerManager` that can remain largely synchronous. Limiting concurrency to a single core component makes it much easier to reason about since there is only a single concurrency structure, while the remaining components can be serial, simple, and easily testable.
The `Router` API is as follows:
The `Router` has a very minimal API, since it is mostly driven by `PeerManager` and `Transport` events:
```go
// Router manages connections to peers and routes Protobuf messages between them
// and local reactors. It also provides peer status updates and error reporting.
type Router struct{}
// Router maintains peer transport connections and routes messages between
// peers and channels.
type Router struct {
// Some details have been omitted below.
// NewRouter creates a new router, using the given peer store to track peers.
// Transports must be pre-initialized to listen on appropriate endpoints.
func NewRouter(peerStore *peerStore, transports map[Protocol]Transport) *Router { return nil }
logger log.Logger
options RouterOptions
nodeInfo NodeInfo
privKey crypto.PrivKey
peerManager *PeerManager
transports []Transport
// Channel opens a new channel with the given ID. messageType should be an empty
// Protobuf message of the type that will be passed through the channel. The
// message can implement Wrapper for automatic message (un)wrapping.
func (r *Router) Channel(id ChannelID, messageType proto.Message) (*Channel, error) { return nil, nil }
peerMtx sync.RWMutex
peerQueues map[NodeID]queue
// PeerUpdates returns a channel with peer updates. The caller must cancel the
// context to end the subscription, and keep consuming messages in a timely
// fashion until the channel is closed to avoid blocking updates.
func (r *Router) PeerUpdates(ctx context.Context) PeerUpdates { return nil }
// PeerUpdates is a channel for receiving peer updates.
type PeerUpdates <-chan PeerUpdate
// PeerUpdate is a peer status update for reactors.
type PeerUpdate struct {
PeerID PeerID
Status PeerStatus
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue
}
// OpenChannel opens a new channel for the given message type. The caller must
// close the channel when done, before stopping the Router. messageType is the
// type of message passed through the channel.
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error)
// Start starts the router, connecting to peers and routing messages.
func (r *Router) Start() error
// Stop stops the router, disconnecting from all peers and stopping message routing.
func (r *Router) Stop() error
```
All Go channel sends in the `Router` and reactors are blocking (the router also selects on signal channels for closure and shutdown). The responsibility for message scheduling, prioritization, backpressure, and load shedding is centralized in a core `queue` interface that is used at contention points (i.e. from all peers to a single channel, and from all channels to a single peer):
```go
// queue does QoS scheduling for Envelopes, enqueueing and dequeueing according
// to some policy. Queues are used at contention points, i.e.:
// - Receiving inbound messages to a single channel from all peers.
// - Sending outbound messages to a single peer from all channels.
type queue interface {
// enqueue returns a channel for submitting envelopes.
enqueue() chan<- Envelope
// dequeue returns a channel ordered according to some queueing policy.
dequeue() <-chan Envelope
// close closes the queue. After this call enqueue() will block, so the
// caller must select on closed() as well to avoid blocking forever. The
// enqueue() and dequeue() channels will not be closed.
close()
// closed returns a channel that's closed when the scheduler is closed.
closed() <-chan struct{}
}
```
The current implementation is `fifoQueue`, which is a simple unbuffered lossless queue that passes messages in the order they were received and blocks until the message is delivered (i.e. it is a Go channel). The router will need a more sophisticated queueing policy, but this has not yet been implemented.
The internal `Router` goroutine structure and design is described in the `Router` GoDoc, which is included below for reference:
```go
// On startup, three main goroutines are spawned to maintain peer connections:
//
// dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer
// address to dial and spawns a goroutine that dials the peer, handshakes
// with it, and begins to route messages if successful.
//
// acceptPeers(): in a loop, waits for an inbound connection via
// Transport.Accept() and spawns a goroutine that handshakes with it and
// begins to route messages if successful.
//
// evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next
// peer to evict, and disconnects it by closing its message queue.
//
// When a peer is connected, an outbound peer message queue is registered in
// peerQueues, and routePeer() is called to spawn off two additional goroutines:
//
// sendPeer(): waits for an outbound message from the peerQueues queue,
// marshals it, and passes it to the peer transport which delivers it.
//
// receivePeer(): waits for an inbound message from the peer transport,
// unmarshals it, and passes it to the appropriate inbound channel queue
// in channelQueues.
//
// When a reactor opens a channel via OpenChannel, an inbound channel message
// queue is registered in channelQueues, and a channel goroutine is spawned:
//
// routeChannel(): waits for an outbound message from the channel, looks
// up the recipient peer's outbound message queue in peerQueues, and submits
// the message to it.
//
// All channel sends in the router are blocking. It is the responsibility of the
// queue interface in peerQueues and channelQueues to prioritize and drop
// messages as appropriate during contention to prevent stalls and ensure good
// quality of service.
```
### Reactor Example
@@ -409,21 +516,21 @@ The reactor itself would be implemented e.g. like this:
```go
// RunEchoReactor wires up an echo reactor to a router and runs it.
func RunEchoReactor(router *p2p.Router) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, err := router.Channel(1, &EchoMessage{})
func RunEchoReactor(router *p2p.Router, peerManager *p2p.PeerManager) error {
channel, err := router.OpenChannel(1, &EchoMessage{})
if err != nil {
return err
}
defer channel.Close()
peerUpdates := peerManager.Subscribe()
defer peerUpdates.Close()
return EchoReactor(ctx, channel, router.PeerUpdates(ctx))
return EchoReactor(context.Background(), channel, peerUpdates)
}
// EchoReactor provides an echo service, pinging all known peers until cancelled.
func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates p2p.PeerUpdates) error {
// EchoReactor provides an echo service, pinging all known peers until the given
// context is cancelled.
func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates *p2p.PeerUpdates) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@@ -451,9 +558,8 @@ func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates p2p.Peer
default:
channel.Error <- PeerError{
PeerID: envelope.From,
Err: fmt.Errorf("unexpected message %T", msg),
Severity: PeerErrorSeverityLow,
PeerID: envelope.From,
Err: fmt.Errorf("unexpected message %T", msg),
}
}
@@ -469,31 +575,9 @@ func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates p2p.Peer
}
```
### Implementation Plan
The existing P2P stack should be gradually migrated towards this design. The easiest path would likely be:
1. Implement the `Channel` and `PeerUpdates` APIs as shims on top of the current `Switch` and `Peer` APIs, and rewrite all reactors to use them instead.
2. Port the `privval` package to no longer use `SecretConnection` (e.g. by using gRPC instead), or temporarily duplicate its functionality.
3. Rewrite the current MConn connection and transport code to use the new `Transport` API, and migrate existing code to use it instead.
4. Implement the new `peer` and `peerStore` APIs, and either make the current address book a shim on top of these or replace it.
5. Replace the existing `Switch` abstraction with the new `Router`.
6. Move the PEX reactor and other address advertisement/exchange into the P2P core, possibly the `Router`.
7. Consider rewriting and/or cleaning up reactors and other P2P-related code to make better use of the new abstractions.
A note on backwards-compatibility: the current MConn protocol takes whole messages expressed as byte slices and splits them up into `PacketMsg` messages, where the final packet of a message has `PacketMsg.EOF` set. In order to maintain wire-compatibility with this protocol, the MConn transport needs to be aware of message boundaries, even though it does not care what the messages actually are. One way to handle this is to break abstraction boundaries and have the transport decode the input's length-prefixed message framing and use this to determine message boundaries, unless we accept breaking the protocol here.
Similarly, implementing channel handshakes with the current MConn protocol would require doing an initial connection handshake as today and use that information to "fake" the local channel handshake without it hitting the wire.
## Status
Accepted
Partially implemented ([#5670](https://github.com/tendermint/tendermint/issues/5670))
## Consequences
@@ -519,7 +603,7 @@ Accepted
* A complete overhaul of P2P internals is likely to cause temporary performance regressions and bugs as the implementation matures.
* Hiding peer management information inside the `p2p` package may prevent certain functionality or require additional deliberate interfaces for information exchange, as a tradeoff to simplify the design, reduce coupling, and avoid race conditions and lock contention.
* Hiding peer management information inside the `PeerManager` may prevent certain functionality or require additional deliberate interfaces for information exchange, as a tradeoff to simplify the design, reduce coupling, and avoid race conditions and lock contention.
### Neutral

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 370 KiB

After

Width:  |  Height:  |  Size: 344 KiB

View File

@@ -10151,9 +10151,9 @@
}
},
"watchpack": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/watchpack/-/watchpack-2.1.0.tgz",
"integrity": "sha512-UjgD1mqjkG99+3lgG36at4wPnUXNvis2v1utwTgQ43C22c4LD71LsYMExdWXh4HZ+RmW+B0t1Vrg2GpXAkTOQw==",
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/watchpack/-/watchpack-2.1.1.tgz",
"integrity": "sha512-Oo7LXCmc1eE1AjyuSBmtC3+Wy4HcV8PxWh2kP6fOl8yTlNS7r0K9l1ao2lrrUza7V39Y3D/BbJgY8VeSlc5JKw==",
"dev": true,
"requires": {
"glob-to-regexp": "^0.4.1",

View File

@@ -7,7 +7,7 @@
"vuepress-theme-cosmos": "^1.0.180"
},
"devDependencies": {
"watchpack": "^2.1.0"
"watchpack": "^2.1.1"
},
"scripts": {
"preserve": "./pre.sh",

2
go.mod
View File

@@ -32,7 +32,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0
github.com/tendermint/tm-db v0.6.3
github.com/tendermint/tm-db v0.6.4
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9
golang.org/x/net v0.0.0-20201021035429-f5854403a974
google.golang.org/grpc v1.35.0

2
go.sum
View File

@@ -555,6 +555,8 @@ github.com/tendermint/tm-db v0.6.2 h1:DOn8jwCdjJblrCFJbtonEIPD1IuJWpbRUUdR8GWE4R
github.com/tendermint/tm-db v0.6.2/go.mod h1:GYtQ67SUvATOcoY8/+x6ylk8Qo02BQyLrAs+yAcLvGI=
github.com/tendermint/tm-db v0.6.3 h1:ZkhQcKnB8/2jr5EaZwGndN4owkPsGezW2fSisS9zGbg=
github.com/tendermint/tm-db v0.6.3/go.mod h1:lfA1dL9/Y/Y8wwyPp2NMLyn5P5Ptr/gvDFNWtrCWSf8=
github.com/tendermint/tm-db v0.6.4 h1:3N2jlnYQkXNQclQwd/eKV/NzlqPlfK21cpRRIx80XXQ=
github.com/tendermint/tm-db v0.6.4/go.mod h1:dptYhIpJ2M5kUuenLr+Yyf3zQOv1SgBZcl8/BmWlMBw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/tendermint/tendermint/libs/log"
@@ -21,8 +22,7 @@ const (
sequential mode = iota + 1
skipping
defaultPruningSize = 1000
defaultMaxRetryAttempts = 10
defaultPruningSize = 1000
// For verifySkipping, when using the cache of headers from the previous batch,
// they will always be at a height greater than 1/2 (normal verifySkipping) so to
// find something in between the range, 9/16 is used.
@@ -90,14 +90,6 @@ func Logger(l log.Logger) Option {
}
}
// MaxRetryAttempts option can be used to set max attempts before replacing
// primary with a witness.
func MaxRetryAttempts(max uint16) Option {
return func(c *Client) {
c.maxRetryAttempts = max
}
}
// MaxClockDrift defines how much new header's time can drift into
// the future. Default: 10s.
func MaxClockDrift(d time.Duration) Option {
@@ -116,7 +108,6 @@ type Client struct {
trustingPeriod time.Duration // see TrustOptions.Period
verificationMode mode
trustLevel tmmath.Fraction
maxRetryAttempts uint16 // see MaxRetryAttempts option
maxClockDrift time.Duration
// Mutex for locking during changes of the light clients providers
@@ -202,7 +193,6 @@ func NewClientFromTrustedStore(
trustingPeriod: trustingPeriod,
verificationMode: skipping,
trustLevel: DefaultTrustLevel,
maxRetryAttempts: defaultMaxRetryAttempts,
maxClockDrift: defaultMaxClockDrift,
primary: primary,
witnesses: witnesses,
@@ -995,6 +985,9 @@ func (c *Client) compareFirstHeaderWithWitnesses(ctx context.Context, h *types.S
compareCtx, cancel := context.WithCancel(ctx)
defer cancel()
c.providerMutex.Lock()
defer c.providerMutex.Unlock()
if len(c.witnesses) < 1 {
return ErrNoWitnesses
}
@@ -1028,8 +1021,11 @@ and remove witness. Otherwise, use the different primary`, e.WitnessIndex), "wit
}
}
for _, idx := range witnessesToRemove {
c.removeWitness(idx)
// we need to make sure that we remove witnesses by index in the reverse
// order so as to not affect the indexes themselves
sort.Ints(witnessesToRemove)
for i := len(witnessesToRemove) - 1; i >= 0; i-- {
c.removeWitness(witnessesToRemove[i])
}
return nil

View File

@@ -768,7 +768,6 @@ func TestClientReplacesPrimaryWithWitnessIfPrimaryIsUnavailable(t *testing.T) {
[]provider.Provider{fullNode, fullNode},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
)
require.NoError(t, err)
@@ -946,7 +945,6 @@ func TestClientRemovesWitnessIfItSendsUsIncorrectHeader(t *testing.T) {
[]provider.Provider{badProvider1, badProvider2},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
)
// witness should have behaved properly -> no error
require.NoError(t, err)
@@ -1086,7 +1084,6 @@ func TestClientEnsureValidHeadersAndValSets(t *testing.T) {
badNode,
[]provider.Provider{badNode, badNode},
dbs.New(dbm.NewMemDB()),
light.MaxRetryAttempts(1),
)
require.NoError(t, err)

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/tendermint/tendermint/light/provider"
@@ -127,8 +128,11 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig
}
}
for _, idx := range witnessesToRemove {
c.removeWitness(idx)
// we need to make sure that we remove witnesses by index in the reverse
// order so as to not affect the indexes themselves
sort.Ints(witnessesToRemove)
for i := len(witnessesToRemove) - 1; i >= 0; i-- {
c.removeWitness(witnessesToRemove[i])
}
// 1. If we had at least one witness that returned the same header then we

View File

@@ -59,7 +59,6 @@ func TestLightClientAttackEvidence_Lunatic(t *testing.T) {
[]provider.Provider{witness},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
)
require.NoError(t, err)
@@ -141,7 +140,6 @@ func TestLightClientAttackEvidence_Equivocation(t *testing.T) {
[]provider.Provider{witness},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
verificationOption,
)
require.NoError(t, err)
@@ -196,7 +194,6 @@ func TestClientDivergentTraces1(t *testing.T) {
[]provider.Provider{witness},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
)
require.Error(t, err)
assert.Contains(t, err.Error(), "does not match primary")
@@ -220,7 +217,6 @@ func TestClientDivergentTraces2(t *testing.T) {
[]provider.Provider{deadNode, deadNode, primary},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
)
require.NoError(t, err)
@@ -255,7 +251,6 @@ func TestClientDivergentTraces3(t *testing.T) {
[]provider.Provider{witness},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
)
require.NoError(t, err)

View File

@@ -18,38 +18,60 @@ var (
// This is very brittle, see: https://github.com/tendermint/tendermint/issues/4740
regexpMissingHeight = regexp.MustCompile(`height \d+ (must be less than or equal to|is not available)`)
maxRetryAttempts = 10
timeout = 5 * time.Second
defaultOptions = Options{
MaxRetryAttempts: 10,
Timeout: 5 * time.Second,
}
)
// http provider uses an RPC client to obtain the necessary information.
type http struct {
chainID string
client rpcclient.RemoteClient
maxRetryAttempts int
}
type Options struct {
// -1 means no limit
MaxRetryAttempts int
// 0 means no timeout.
Timeout time.Duration
}
// New creates a HTTP provider, which is using the rpchttp.HTTP client under
// the hood. If no scheme is provided in the remote URL, http will be used by
// default. The 5s timeout is used for all requests.
func New(chainID, remote string) (provider.Provider, error) {
return NewWithOptions(chainID, remote, defaultOptions)
}
// NewWithOptions is an extension to creating a new http provider that allows the addition
// of a specified timeout and maxRetryAttempts
func NewWithOptions(chainID, remote string, options Options) (provider.Provider, error) {
// Ensure URL scheme is set (default HTTP) when not provided.
if !strings.Contains(remote, "://") {
remote = "http://" + remote
}
httpClient, err := rpchttp.NewWithTimeout(remote, "/websocket", timeout)
httpClient, err := rpchttp.NewWithTimeout(remote, "/websocket", options.Timeout)
if err != nil {
return nil, err
}
return NewWithClient(chainID, httpClient), nil
return NewWithClientAndOptions(chainID, httpClient, options), nil
}
func NewWithClient(chainID string, client rpcclient.RemoteClient) provider.Provider {
return NewWithClientAndOptions(chainID, client, defaultOptions)
}
// NewWithClient allows you to provide a custom client.
func NewWithClient(chainID string, client rpcclient.RemoteClient) provider.Provider {
func NewWithClientAndOptions(chainID string, client rpcclient.RemoteClient, options Options) provider.Provider {
return &http{
client: client,
chainID: chainID,
client: client,
chainID: chainID,
maxRetryAttempts: options.MaxRetryAttempts,
}
}
@@ -108,7 +130,9 @@ func (p *http) validatorSet(ctx context.Context, height *uint64) (*types.Validat
)
for len(vals) != total && page <= maxPages {
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
// create another for loop to control retries. If p.maxRetryAttempts
// is negative we will keep repeating.
for attempt := 0; attempt != p.maxRetryAttempts+1; attempt++ {
res, err := p.client.Validators(ctx, height, &page, &perPage)
if err != nil {
// TODO: standardize errors on the RPC side
@@ -116,7 +140,7 @@ func (p *http) validatorSet(ctx context.Context, height *uint64) (*types.Validat
return nil, provider.ErrLightBlockNotFound
}
// if we have exceeded retry attempts then return no response error
if attempt == maxRetryAttempts {
if attempt == p.maxRetryAttempts {
return nil, provider.ErrNoResponse
}
// else we wait and try again with exponential backoff
@@ -153,7 +177,9 @@ func (p *http) validatorSet(ctx context.Context, height *uint64) (*types.Validat
}
func (p *http) signedHeader(ctx context.Context, height *uint64) (*types.SignedHeader, error) {
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
// create a for loop to control retries. If p.maxRetryAttempts
// is negative we will keep repeating.
for attempt := 0; attempt != p.maxRetryAttempts+1; attempt++ {
commit, err := p.client.Commit(ctx, height)
if err != nil {
// TODO: standardize errors on the RPC side

View File

@@ -71,7 +71,7 @@ func NewLightClientStateProvider(
}
lc, err := light.NewClient(ctx, chainID, trustOptions, providers[0], providers[1:],
lightdb.New(dbm.NewMemDB()), light.Logger(logger), light.MaxRetryAttempts(5))
lightdb.New(dbm.NewMemDB()), light.Logger(logger))
if err != nil {
return nil, err
}

View File

@@ -434,6 +434,8 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
panic("BlockStore can only save a non-nil block")
}
batch := bs.db.NewBatch()
height := block.Height
hash := block.Hash()
@@ -450,27 +452,27 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
// complete as soon as the block meta is written.
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part)
bs.saveBlockPart(height, i, part, batch)
}
// Save block meta
blockMeta := types.NewBlockMeta(block, blockParts)
pbm := blockMeta.ToProto()
if pbm == nil {
panic("nil blockmeta")
}
metaBytes := mustEncode(pbm)
if err := bs.db.Set(blockMetaKey(height), metaBytes); err != nil {
panic(err)
}
if err := bs.db.Set(blockHashKey(hash), []byte(fmt.Sprintf("%d", height))); err != nil {
if err := batch.Set(blockMetaKey(height), metaBytes); err != nil {
panic(err)
}
if err := batch.Set(blockHashKey(hash), []byte(fmt.Sprintf("%d", height))); err != nil {
panic(err)
}
// Save block commit (duplicate and separate from the Block)
pbc := block.LastCommit.ToProto()
blockCommitBytes := mustEncode(pbc)
if err := bs.db.Set(blockCommitKey(height-1), blockCommitBytes); err != nil {
if err := batch.Set(blockCommitKey(height-1), blockCommitBytes); err != nil {
panic(err)
}
@@ -478,19 +480,26 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
// NOTE: we can delete this at a later height
pbsc := seenCommit.ToProto()
seenCommitBytes := mustEncode(pbsc)
if err := bs.db.SetSync(seenCommitKey(height), seenCommitBytes); err != nil {
if err := batch.Set(seenCommitKey(height), seenCommitBytes); err != nil {
panic(err)
}
if err := batch.WriteSync(); err != nil {
panic(err)
}
if err := batch.Close(); err != nil {
panic(err)
}
}
func (bs *BlockStore) saveBlockPart(height uint64, index int, part *types.Part) {
func (bs *BlockStore) saveBlockPart(height uint64, index int, part *types.Part, batch dbm.Batch) {
pbp, err := part.ToProto()
if err != nil {
panic(fmt.Errorf("unable to make part into proto: %w", err))
}
partBytes := mustEncode(pbp)
if err := bs.db.Set(blockPartKey(height, index), partBytes); err != nil {
if err := batch.Set(blockPartKey(height, index), partBytes); err != nil {
panic(err)
}
}

View File

@@ -1,220 +0,0 @@
package consensus
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "consensus"
)
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Height of the chain.
Height metrics.Gauge
// ValidatorLastSignedHeight of a validator.
ValidatorLastSignedHeight metrics.Gauge
// Number of rounds.
Rounds metrics.Gauge
// Number of validators.
Validators metrics.Gauge
// Total power of all validators.
ValidatorsPower metrics.Gauge
// Power of a validator.
ValidatorPower metrics.Gauge
// Amount of blocks missed by a validator.
ValidatorMissedBlocks metrics.Gauge
// Number of validators who did not sign.
MissingValidators metrics.Gauge
// Total power of the missing validators.
MissingValidatorsPower metrics.Gauge
// Number of validators who tried to double sign.
ByzantineValidators metrics.Gauge
// Total power of the byzantine validators.
ByzantineValidatorsPower metrics.Gauge
// Time between this and the last block.
BlockIntervalSeconds metrics.Histogram
// Number of transactions.
NumTxs metrics.Gauge
// Size of the block.
BlockSizeBytes metrics.Gauge
// Total number of transactions.
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge
// Whether or not a node is fast syncing. 1 if yes, 0 if no.
FastSyncing metrics.Gauge
// Whether or not a node is state syncing. 1 if yes, 0 if no.
StateSyncing metrics.Gauge
// Number of blockparts transmitted by peer.
BlockParts metrics.Counter
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "height",
Help: "Height of the chain.",
}, labels).With(labelsAndValues...),
Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "rounds",
Help: "Number of rounds.",
}, labels).With(labelsAndValues...),
Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators",
Help: "Number of validators.",
}, labels).With(labelsAndValues...),
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_last_signed_height",
Help: "Last signed height for a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_missed_blocks",
Help: "Total missed blocks for a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators_power",
Help: "Total power of all validators.",
}, labels).With(labelsAndValues...),
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_power",
Help: "Power of a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators",
Help: "Number of validators who did not sign.",
}, labels).With(labelsAndValues...),
MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators_power",
Help: "Total power of the missing validators.",
}, labels).With(labelsAndValues...),
ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators",
Help: "Number of validators who tried to double sign.",
}, labels).With(labelsAndValues...),
ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators_power",
Help: "Total power of the byzantine validators.",
}, labels).With(labelsAndValues...),
BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_interval_seconds",
Help: "Time between this and the last block.",
}, labels).With(labelsAndValues...),
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_txs",
Help: "Number of transactions.",
}, labels).With(labelsAndValues...),
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_size_bytes",
Help: "Size of the block.",
}, labels).With(labelsAndValues...),
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "total_txs",
Help: "Total number of transactions.",
}, labels).With(labelsAndValues...),
CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "latest_block_height",
Help: "The latest block height.",
}, labels).With(labelsAndValues...),
FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "fast_syncing",
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "state_syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_parts",
Help: "Number of blockparts transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
}
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Height: discard.NewGauge(),
ValidatorLastSignedHeight: discard.NewGauge(),
Rounds: discard.NewGauge(),
Validators: discard.NewGauge(),
ValidatorsPower: discard.NewGauge(),
ValidatorPower: discard.NewGauge(),
ValidatorMissedBlocks: discard.NewGauge(),
MissingValidators: discard.NewGauge(),
MissingValidatorsPower: discard.NewGauge(),
ByzantineValidators: discard.NewGauge(),
ByzantineValidatorsPower: discard.NewGauge(),
BlockIntervalSeconds: discard.NewHistogram(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
}
}

View File

@@ -3,6 +3,7 @@ package consensus
import (
"fmt"
tmcon "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
@@ -89,16 +90,16 @@ func DoublePrevoteMisbehavior() Misbehavior {
}
// add our own vote
cs.sendInternalMessage(msgInfo{&VoteMessage{prevote}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: prevote}, ""})
cs.Logger.Info("Sending conflicting votes")
peers := cs.sw.Peers().List()
// there has to be at least two other peers connected else this behavior works normally
for idx, peer := range peers {
if idx%2 == 0 { // sign the proposal block
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: prevote}))
} else { // sign a nil block
peer.Send(VoteChannel, MustEncode(&VoteMessage{nilPrevote}))
peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: nilPrevote}))
}
}
}

View File

@@ -4,10 +4,8 @@ import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
tmcon "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
@@ -15,277 +13,7 @@ import (
"github.com/tendermint/tendermint/types"
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
}
var pb tmcons.Message
switch msg := msg.(type) {
case *NewRoundStepMessage:
pb = tmcons.Message{
Sum: &tmcons.Message_NewRoundStep{
NewRoundStep: &tmcons.NewRoundStep{
Height: msg.Height,
Round: msg.Round,
Step: uint32(msg.Step),
SecondsSinceStartTime: msg.SecondsSinceStartTime,
LastCommitRound: msg.LastCommitRound,
},
},
}
case *NewValidBlockMessage:
pbPartSetHeader := msg.BlockPartSetHeader.ToProto()
pbBits := msg.BlockParts.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_NewValidBlock{
NewValidBlock: &tmcons.NewValidBlock{
Height: msg.Height,
Round: msg.Round,
BlockPartSetHeader: pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.IsCommit,
},
},
}
case *ProposalMessage:
pbP := msg.Proposal.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_Proposal{
Proposal: &tmcons.Proposal{
Proposal: *pbP,
},
},
}
case *ProposalPOLMessage:
pbBits := msg.ProposalPOL.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_ProposalPol{
ProposalPol: &tmcons.ProposalPOL{
Height: msg.Height,
ProposalPolRound: msg.ProposalPOLRound,
ProposalPol: *pbBits,
},
},
}
case *BlockPartMessage:
parts, err := msg.Part.ToProto()
if err != nil {
return nil, fmt.Errorf("msg to proto error: %w", err)
}
pb = tmcons.Message{
Sum: &tmcons.Message_BlockPart{
BlockPart: &tmcons.BlockPart{
Height: msg.Height,
Round: msg.Round,
Part: *parts,
},
},
}
case *VoteMessage:
vote := msg.Vote.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_Vote{
Vote: &tmcons.Vote{
Vote: vote,
},
},
}
case *HasVoteMessage:
pb = tmcons.Message{
Sum: &tmcons.Message_HasVote{
HasVote: &tmcons.HasVote{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
Index: msg.Index,
},
},
}
case *VoteSetMaj23Message:
bi := msg.BlockID.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_VoteSetMaj23{
VoteSetMaj23: &tmcons.VoteSetMaj23{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: bi,
},
},
}
case *VoteSetBitsMessage:
bi := msg.BlockID.ToProto()
bits := msg.Votes.ToProto()
vsb := &tmcons.Message_VoteSetBits{
VoteSetBits: &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: bi,
},
}
if bits != nil {
vsb.VoteSetBits.Votes = *bits
}
pb = tmcons.Message{
Sum: vsb,
}
default:
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
}
return &pb, nil
}
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
return nil, errors.New("consensus: nil message")
}
var pb Message
switch msg := msg.Sum.(type) {
case *tmcons.Message_NewRoundStep:
rs, err := tmmath.SafeConvertUint8(int64(msg.NewRoundStep.Step))
// deny message based on possible overflow
if err != nil {
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
}
pb = &NewRoundStepMessage{
Height: msg.NewRoundStep.Height,
Round: msg.NewRoundStep.Round,
Step: cstypes.RoundStepType(rs),
SecondsSinceStartTime: msg.NewRoundStep.SecondsSinceStartTime,
LastCommitRound: msg.NewRoundStep.LastCommitRound,
}
case *tmcons.Message_NewValidBlock:
pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader)
if err != nil {
return nil, fmt.Errorf("parts header to proto error: %w", err)
}
pbBits := new(bits.BitArray)
err = pbBits.FromProto(msg.NewValidBlock.BlockParts)
if err != nil {
return nil, fmt.Errorf("parts to proto error: %w", err)
}
pb = &NewValidBlockMessage{
Height: msg.NewValidBlock.Height,
Round: msg.NewValidBlock.Round,
BlockPartSetHeader: *pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.NewValidBlock.IsCommit,
}
case *tmcons.Message_Proposal:
pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal)
if err != nil {
return nil, fmt.Errorf("proposal msg to proto error: %w", err)
}
pb = &ProposalMessage{
Proposal: pbP,
}
case *tmcons.Message_ProposalPol:
pbBits := new(bits.BitArray)
err := pbBits.FromProto(&msg.ProposalPol.ProposalPol)
if err != nil {
return nil, fmt.Errorf("proposal PoL to proto error: %w", err)
}
pb = &ProposalPOLMessage{
Height: msg.ProposalPol.Height,
ProposalPOLRound: msg.ProposalPol.ProposalPolRound,
ProposalPOL: pbBits,
}
case *tmcons.Message_BlockPart:
parts, err := types.PartFromProto(&msg.BlockPart.Part)
if err != nil {
return nil, fmt.Errorf("blockpart msg to proto error: %w", err)
}
pb = &BlockPartMessage{
Height: msg.BlockPart.Height,
Round: msg.BlockPart.Round,
Part: parts,
}
case *tmcons.Message_Vote:
vote, err := types.VoteFromProto(msg.Vote.Vote)
if err != nil {
return nil, fmt.Errorf("vote msg to proto error: %w", err)
}
pb = &VoteMessage{
Vote: vote,
}
case *tmcons.Message_HasVote:
pb = &HasVoteMessage{
Height: msg.HasVote.Height,
Round: msg.HasVote.Round,
Type: msg.HasVote.Type,
Index: msg.HasVote.Index,
}
case *tmcons.Message_VoteSetMaj23:
bi, err := types.BlockIDFromProto(&msg.VoteSetMaj23.BlockID)
if err != nil {
return nil, fmt.Errorf("voteSetMaj23 msg to proto error: %w", err)
}
pb = &VoteSetMaj23Message{
Height: msg.VoteSetMaj23.Height,
Round: msg.VoteSetMaj23.Round,
Type: msg.VoteSetMaj23.Type,
BlockID: *bi,
}
case *tmcons.Message_VoteSetBits:
bi, err := types.BlockIDFromProto(&msg.VoteSetBits.BlockID)
if err != nil {
return nil, fmt.Errorf("block ID msg to proto error: %w", err)
}
bits := new(bits.BitArray)
err = bits.FromProto(&msg.VoteSetBits.Votes)
if err != nil {
return nil, fmt.Errorf("votes to proto error: %w", err)
}
pb = &VoteSetBitsMessage{
Height: msg.VoteSetBits.Height,
Round: msg.VoteSetBits.Round,
Type: msg.VoteSetBits.Type,
BlockID: *bi,
Votes: bits,
}
default:
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
}
if err := pb.ValidateBasic(); err != nil {
return nil, err
}
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
func WALToProto(msg tmcon.WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage
switch msg := msg.(type) {
@@ -300,7 +28,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
}
case msgInfo:
consMsg, err := MsgToProto(msg.Msg)
consMsg, err := tmcon.MsgToProto(msg.Msg)
if err != nil {
return nil, err
}
@@ -323,7 +51,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
case EndHeightMessage:
case tmcon.EndHeightMessage:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_EndHeight{
EndHeight: &tmcons.EndHeight{
@@ -339,11 +67,11 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
}
// WALFromProto takes a proto wal message and return a consensus walMessage and error
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
func WALFromProto(msg *tmcons.WALMessage) (tmcon.WALMessage, error) {
if msg == nil {
return nil, errors.New("nil WAL message")
}
var pb WALMessage
var pb tmcon.WALMessage
switch msg := msg.Sum.(type) {
case *tmcons.WALMessage_EventDataRoundState:
@@ -353,7 +81,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
Step: msg.EventDataRoundState.Step,
}
case *tmcons.WALMessage_MsgInfo:
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg)
walMsg, err := tmcon.MsgFromProto(&msg.MsgInfo.Msg)
if err != nil {
return nil, fmt.Errorf("msgInfo from proto error: %w", err)
}
@@ -376,7 +104,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
}
return pb, nil
case *tmcons.WALMessage_EndHeight:
pb := EndHeightMessage{
pb := tmcon.EndHeightMessage{
Height: msg.EndHeight.Height,
}
return pb, nil

View File

@@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"
tmcon "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -47,7 +48,7 @@ type Reactor struct {
waitSync bool
eventBus *types.EventBus
Metrics *Metrics
Metrics *tmcon.Metrics
}
type ReactorOption func(*Reactor)
@@ -58,7 +59,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
Metrics: NopMetrics(),
Metrics: tmcon.NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
@@ -252,7 +253,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch chID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
case *tmcon.NewRoundStepMessage:
conR.conS.mtx.Lock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
@@ -262,11 +263,11 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
}
ps.ApplyNewRoundStepMessage(msg)
case *NewValidBlockMessage:
case *tmcon.NewValidBlockMessage:
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
case *tmcon.HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
case *tmcon.VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
@@ -291,7 +292,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
src.TrySend(VoteSetBitsChannel, tmcon.MustEncode(&tmcon.VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
@@ -308,12 +309,12 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
}
switch msg := msg.(type) {
case *ProposalMessage:
case *tmcon.ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
case *tmcon.ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
@@ -327,7 +328,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
}
switch msg := msg.(type) {
case *VoteMessage:
case *tmcon.VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
@@ -349,7 +350,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
case *tmcon.VoteSetBitsMessage:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
@@ -429,29 +430,29 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(nrsMsg))
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
csMsg := &tmcon.NewValidBlockMessage{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(csMsg))
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &HasVoteMessage{
msg := &tmcon.HasVoteMessage{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
Index: vote.ValidatorIndex,
}
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(msg))
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
@@ -472,8 +473,8 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcon.NewRoundStepMessage) {
nrsMsg = &tmcon.NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
@@ -486,7 +487,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, MustEncode(nrsMsg))
peer.Send(StateChannel, tmcon.MustEncode(nrsMsg))
}
func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
@@ -506,13 +507,13 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{
msg := &tmcon.BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
@@ -555,9 +556,9 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
msg := &tmcon.ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
}
@@ -567,13 +568,13 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
msg := &tmcon.ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, MustEncode(msg))
peer.Send(DataChannel, tmcon.MustEncode(msg))
}
continue OUTER_LOOP
}
@@ -610,13 +611,13 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Send the part
msg := &BlockPartMessage{
msg := &tmcon.BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
logger.Debug("Sending block part for catchup failed")
@@ -773,7 +774,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
@@ -790,7 +791,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
@@ -807,7 +808,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
@@ -827,7 +828,7 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
@@ -866,11 +867,11 @@ func (conR *Reactor) peerStatsRoutine() {
panic(fmt.Sprintf("Peer %v has no state", peer))
}
switch msg.Msg.(type) {
case *VoteMessage:
case *tmcon.VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
conR.Switch.MarkPeerAsGood(peer)
}
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
conR.Switch.MarkPeerAsGood(peer)
}
@@ -908,7 +909,7 @@ func (conR *Reactor) StringIndented(indent string) string {
}
// ReactorMetrics sets the metrics
func ReactorMetrics(metrics *Metrics) ReactorOption {
func ReactorMetrics(metrics *tmcon.Metrics) ReactorOption {
return func(conR *Reactor) { conR.Metrics = metrics }
}
@@ -1046,9 +1047,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height uint64, round int32, index i
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
msg := &tmcon.VoteMessage{Vote: vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
if ps.peer.Send(VoteChannel, tmcon.MustEncode(msg)) {
ps.SetHasVote(vote)
return true
}
@@ -1255,7 +1256,7 @@ func (ps *PeerState) setHasVote(height uint64, round int32, voteType tmproto.Sig
}
// ApplyNewRoundStepMessage updates the peer state for the new round.
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
func (ps *PeerState) ApplyNewRoundStepMessage(msg *tmcon.NewRoundStepMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@@ -1308,7 +1309,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
}
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
func (ps *PeerState) ApplyNewValidBlockMessage(msg *tmcon.NewValidBlockMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@@ -1325,7 +1326,7 @@ func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
}
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
func (ps *PeerState) ApplyProposalPOLMessage(msg *tmcon.ProposalPOLMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@@ -1342,7 +1343,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
}
// ApplyHasVoteMessage updates the peer state for the new vote.
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
func (ps *PeerState) ApplyHasVoteMessage(msg *tmcon.HasVoteMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@@ -1358,7 +1359,7 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
// `ourVotes` is a BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *tmcon.VoteSetBitsMessage, ourVotes *bits.BitArray) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@@ -1395,12 +1396,6 @@ func (ps *PeerState) StringIndented(indent string) string {
}
//-----------------------------------------------------------------------------
// Messages
// Message is a message that can be sent and received on the Reactor
type Message interface {
ValidateBasic() error
}
// func init() {
// tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage")
@@ -1414,286 +1409,11 @@ type Message interface {
// tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
// }
func decodeMsg(bz []byte) (msg Message, err error) {
func decodeMsg(bz []byte) (msg tmcon.Message, err error) {
pb := &tmcons.Message{}
if err = proto.Unmarshal(bz, pb); err != nil {
return msg, err
}
return MsgFromProto(pb)
return tmcon.MsgFromProto(pb)
}
//-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition
type NewRoundStepMessage struct {
Height uint64
Round int32
Step cstypes.RoundStepType
SecondsSinceStartTime int64
LastCommitRound int32
}
// ValidateBasic performs basic validation.
func (m *NewRoundStepMessage) ValidateBasic() error {
if m.Round < 0 {
return errors.New("negative Round")
}
if !m.Step.IsValid() {
return errors.New("invalid Step")
}
// NOTE: SecondsSinceStartTime may be negative
// LastCommitRound will be -1 for the initial height, but we don't know what height this is
// since it can be specified in genesis. The reactor will have to validate this via
// ValidateHeight().
if m.LastCommitRound < -1 {
return errors.New("invalid LastCommitRound (cannot be < -1)")
}
return nil
}
// ValidateHeight validates the height given the chain's initial height.
func (m *NewRoundStepMessage) ValidateHeight(initialHeight uint64) error {
if m.Height < initialHeight {
return fmt.Errorf("invalid Height %v (lower than initial height %v)",
m.Height, initialHeight)
}
if m.Height == initialHeight && m.LastCommitRound != -1 {
return fmt.Errorf("invalid LastCommitRound %v (must be -1 for initial height %v)",
m.LastCommitRound, initialHeight)
}
if m.Height > initialHeight && m.LastCommitRound < 0 {
return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint
initialHeight)
}
return nil
}
// String returns a string representation.
func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound)
}
//-------------------------------------
// NewValidBlockMessage is sent when a validator observes a valid block B in some round r,
// i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r.
// In case the block is also committed, then IsCommit flag is set to true.
type NewValidBlockMessage struct {
Height uint64
Round int32
BlockPartSetHeader types.PartSetHeader
BlockParts *bits.BitArray
IsCommit bool
}
// ValidateBasic performs basic validation.
func (m *NewValidBlockMessage) ValidateBasic() error {
if m.Round < 0 {
return errors.New("negative Round")
}
if err := m.BlockPartSetHeader.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockPartSetHeader: %v", err)
}
if m.BlockParts.Size() == 0 {
return errors.New("empty blockParts")
}
if m.BlockParts.Size() != int(m.BlockPartSetHeader.Total) {
return fmt.Errorf("blockParts bit array size %d not equal to BlockPartSetHeader.Total %d",
m.BlockParts.Size(),
m.BlockPartSetHeader.Total)
}
if m.BlockParts.Size() > int(types.MaxBlockPartsCount) {
return fmt.Errorf("blockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount)
}
return nil
}
// String returns a string representation.
func (m *NewValidBlockMessage) String() string {
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]",
m.Height, m.Round, m.BlockPartSetHeader, m.BlockParts, m.IsCommit)
}
//-------------------------------------
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct {
Proposal *types.Proposal
}
// ValidateBasic performs basic validation.
func (m *ProposalMessage) ValidateBasic() error {
return m.Proposal.ValidateBasic()
}
// String returns a string representation.
func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal)
}
//-------------------------------------
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct {
Height uint64
ProposalPOLRound int32
ProposalPOL *bits.BitArray
}
// ValidateBasic performs basic validation.
func (m *ProposalPOLMessage) ValidateBasic() error {
if m.ProposalPOLRound < 0 {
return errors.New("negative ProposalPOLRound")
}
if m.ProposalPOL.Size() == 0 {
return errors.New("empty ProposalPOL bit array")
}
if m.ProposalPOL.Size() > types.MaxVotesCount {
return fmt.Errorf("proposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount)
}
return nil
}
// String returns a string representation.
func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
}
//-------------------------------------
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct {
Height uint64
Round int32
Part *types.Part
}
// ValidateBasic performs basic validation.
func (m *BlockPartMessage) ValidateBasic() error {
if m.Round < 0 {
return errors.New("negative Round")
}
if err := m.Part.ValidateBasic(); err != nil {
return fmt.Errorf("wrong Part: %v", err)
}
return nil
}
// String returns a string representation.
func (m *BlockPartMessage) String() string {
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
}
//-------------------------------------
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
}
// ValidateBasic performs basic validation.
func (m *VoteMessage) ValidateBasic() error {
return m.Vote.ValidateBasic()
}
// String returns a string representation.
func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote)
}
//-------------------------------------
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct {
Height uint64
Round int32
Type tmproto.SignedMsgType
Index int32
}
// ValidateBasic performs basic validation.
func (m *HasVoteMessage) ValidateBasic() error {
if m.Round < 0 {
return errors.New("negative Round")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if m.Index < 0 {
return errors.New("negative Index")
}
return nil
}
// String returns a string representation.
func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type)
}
//-------------------------------------
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct {
Height uint64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
}
// ValidateBasic performs basic validation.
func (m *VoteSetMaj23Message) ValidateBasic() error {
if m.Round < 0 {
return errors.New("negative Round")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
}
return nil
}
// String returns a string representation.
func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
}
//-------------------------------------
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
type VoteSetBitsMessage struct {
Height uint64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
Votes *bits.BitArray
}
// ValidateBasic performs basic validation.
func (m *VoteSetBitsMessage) ValidateBasic() error {
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
}
// NOTE: Votes.Size() can be zero if the node does not have any
if m.Votes.Size() > types.MaxVotesCount {
return fmt.Errorf("votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount)
}
return nil
}
// String returns a string representation.
func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
}
//-------------------------------------

View File

@@ -10,6 +10,7 @@ import (
"time"
abci "github.com/tendermint/tendermint/abci/types"
tmcon "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
@@ -36,9 +37,9 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
func (cs *State) readReplayMessage(msg *tmcon.TimedWALMessage, newStepSub types.Subscription) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
if _, ok := msg.Msg.(tmcon.EndHeightMessage); ok {
return nil
}
@@ -67,13 +68,13 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr
peerID = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
case *tmcon.ProposalMessage:
p := msg.Proposal
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
case *tmcon.VoteMessage:
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
@@ -103,7 +104,7 @@ func (cs *State) catchupReplay(csHeight uint64) error {
// this check (since we can crash after writing #ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil {
return err
}
@@ -126,7 +127,7 @@ func (cs *State) catchupReplay(csHeight uint64) error {
if csHeight == cs.state.InitialHeight {
endHeight = 0
}
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", endHeight)
} else if err != nil {
@@ -139,7 +140,7 @@ func (cs *State) catchupReplay(csHeight uint64) error {
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
var msg *tmcon.TimedWALMessage
dec := WALDecoder{gr}
LOOP:

View File

@@ -13,6 +13,7 @@ import (
dbm "github.com/tendermint/tm-db"
cfg "github.com/tendermint/tendermint/config"
tmcon "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/proxy"
@@ -73,7 +74,7 @@ func (cs *State) ReplayFile(file string, console bool) error {
defer pb.fp.Close()
var nextN int // apply N msgs in a row
var msg *TimedWALMessage
var msg *tmcon.TimedWALMessage
for {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
@@ -147,7 +148,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
fmt.Printf("Reseting from %d to %d\n", pb.count, count)
pb.count = 0
pb.cs = newCS
var msg *TimedWALMessage
var msg *tmcon.TimedWALMessage
for i := 0; i < count; i++ {
msg, err = pb.dec.Decode()
if err == io.EOF {

View File

@@ -14,6 +14,7 @@ import (
"github.com/gogo/protobuf/proto"
cfg "github.com/tendermint/tendermint/config"
tmcon "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -78,7 +79,7 @@ type State struct {
// a Write-Ahead Log ensures we can recover from any kind of crash
// and helps us avoid signing conflicting votes
wal WAL
wal tmcon.WAL
replayMode bool // so we don't log signing errors during replay
doWALCatchup bool // determines if we even try to do the catchup
@@ -96,7 +97,7 @@ type State struct {
evsw tmevents.EventSwitch
// for reporting metrics
metrics *Metrics
metrics *tmcon.Metrics
// misbehaviors mapped for each height (can't have more than one misbehavior per height)
misbehaviors map[uint64]Misbehavior
@@ -134,7 +135,7 @@ func NewState(
wal: nilWAL{},
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
metrics: tmcon.NopMetrics(),
misbehaviors: misbehaviors,
}
// set function defaults (may be overwritten before calling Start)
@@ -173,7 +174,7 @@ func (cs *State) handleMsg(mi msgInfo) {
)
msg, peerID := mi.Msg, mi.PeerID
switch msg := msg.(type) {
case *ProposalMessage:
case *tmcon.ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
// err = cs.setProposal(msg.Proposal)
@@ -182,7 +183,7 @@ func (cs *State) handleMsg(mi msgInfo) {
} else {
err = defaultReceiveProposal(cs, msg.Proposal)
}
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(msg, peerID)
if added {
@@ -200,7 +201,7 @@ func (cs *State) handleMsg(mi msgInfo) {
msg.Round)
err = nil
}
case *VoteMessage:
case *tmcon.VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
added, err = cs.tryAddVote(msg.Vote, peerID)
@@ -442,8 +443,8 @@ var (
// msgs from the reactor which may update the state
type msgInfo struct {
Msg Message `json:"msg"`
PeerID p2p.NodeID `json:"peer_key"`
Msg tmcon.Message `json:"msg"`
PeerID p2p.NodeID `json:"peer_key"`
}
// internally generated messages which may update the state
@@ -485,7 +486,7 @@ func (cs *State) SetEventBus(b *types.EventBus) {
}
// StateMetrics sets the metrics.
func StateMetrics(metrics *Metrics) StateOption {
func StateMetrics(metrics *tmcon.Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}
@@ -685,7 +686,7 @@ func (cs *State) Wait() {
// OpenWAL opens a file to log all consensus messages and timeouts for
// deterministic accountability.
func (cs *State) OpenWAL(walFile string) (WAL, error) {
func (cs *State) OpenWAL(walFile string) (tmcon.WAL, error) {
wal, err := NewWAL(walFile)
if err != nil {
cs.Logger.Error("Failed to open WAL", "file", walFile, "err", err)
@@ -709,9 +710,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) {
// AddVote inputs a vote.
func (cs *State) AddVote(vote *types.Vote, peerID p2p.NodeID) (added bool, err error) {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
cs.internalMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
cs.peerMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, peerID}
}
// TODO: wait for event?!
@@ -722,9 +723,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.NodeID) (added bool, err e
func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.NodeID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
cs.internalMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
cs.peerMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, peerID}
}
// TODO: wait for event?!
@@ -735,9 +736,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.NodeID) error
func (cs *State) AddProposalBlockPart(height uint64, round int32, part *types.Part, peerID p2p.NodeID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
cs.internalMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
cs.peerMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, peerID}
}
// TODO: wait for event?!
@@ -998,7 +999,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
}
if _, ok := mi.Msg.(*VoteMessage); ok {
if _, ok := mi.Msg.(*tmcon.VoteMessage); ok {
// we actually want to simulate failing during
// the previous WriteSync, but this isn't easy to do.
// Equivalent would be to fail here and manually remove
@@ -1210,10 +1211,10 @@ func (cs *State) defaultDecideProposal(height uint64, round int32) {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.BlockPartMessage{Height: cs.Height, Round: cs.Round, Part: part}, ""})
}
cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
@@ -1491,7 +1492,7 @@ func (cs *State) finalizeCommit(height uint64) {
// Either way, the State should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
endMsg := EndHeightMessage{height}
endMsg := tmcon.EndHeightMessage{Height: height}
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node",
endMsg, err))
@@ -1660,7 +1661,7 @@ func (cs *State) recordMetrics(height uint64, block *types.Block) {
// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.NodeID) (added bool, err error) {
func (cs *State) addProposalBlockPart(msg *tmcon.BlockPartMessage, peerID p2p.NodeID) (added bool, err error) {
height, round, part := msg.Height, msg.Round, msg.Part
// Blocks might be reused, so round mismatch is OK
@@ -1860,7 +1861,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: vote}, ""})
cs.Logger.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}

View File

@@ -11,8 +11,9 @@ import (
"github.com/gogo/protobuf/proto"
auto "github.com/tendermint/tendermint/libs/autofile"
// tmjson "github.com/tendermint/tendermint/libs/json"
tmcon "github.com/tendermint/tendermint/consensus"
auto "github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/libs/service"
@@ -30,44 +31,12 @@ const (
//--------------------------------------------------------
// types and functions for savings consensus messages
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}
// EndHeightMessage marks the end of the given height inside WAL.
// @internal used by scripts/wal2json util.
type EndHeightMessage struct {
Height uint64 `json:"height"`
}
type WALMessage interface{}
// func init() {
// tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo")
// tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo")
// tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage")
// tmjson.RegisterType(tmcon.EndHeightMessage {}, "tendermint/wal/EndHeightMessage ")
// }
//--------------------------------------------------------
// Simple write-ahead logger
// WAL is an interface for any write-ahead logger.
type WAL interface {
Write(WALMessage) error
WriteSync(WALMessage) error
FlushAndSync() error
SearchForEndHeight(height uint64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
// service methods
Start() error
Stop() error
Wait()
}
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay.
// TODO: currently the wal is overwritten during replay catchup, give it a mode
@@ -84,7 +53,7 @@ type BaseWAL struct {
flushInterval time.Duration
}
var _ WAL = &BaseWAL{}
var _ tmcon.WAL = &BaseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
@@ -126,7 +95,7 @@ func (wal *BaseWAL) OnStart() error {
if err != nil {
return err
} else if size == 0 {
if err := wal.WriteSync(EndHeightMessage{0}); err != nil {
if err := wal.WriteSync(tmcon.EndHeightMessage{Height: 0}); err != nil {
return err
}
}
@@ -181,12 +150,12 @@ func (wal *BaseWAL) Wait() {
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *BaseWAL) Write(msg WALMessage) error {
func (wal *BaseWAL) Write(msg tmcon.WALMessage) error {
if wal == nil {
return nil
}
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
if err := wal.enc.Encode(&tmcon.TimedWALMessage{Time: tmtime.Now(), Msg: msg}); err != nil {
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
@@ -198,7 +167,7 @@ func (wal *BaseWAL) Write(msg WALMessage) error {
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *BaseWAL) WriteSync(msg WALMessage) error {
func (wal *BaseWAL) WriteSync(msg tmcon.WALMessage) error {
if wal == nil {
return nil
}
@@ -230,9 +199,9 @@ type WALSearchOptions struct {
// CONTRACT: caller must close group reader.
func (wal *BaseWAL) SearchForEndHeight(
height uint64,
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
var (
msg *TimedWALMessage
msg *tmcon.TimedWALMessage
gr *auto.GroupReader
)
lastHeightFound := uint64(0)
@@ -268,7 +237,7 @@ func (wal *BaseWAL) SearchForEndHeight(
return nil, false, err
}
if m, ok := msg.Msg.(EndHeightMessage); ok {
if m, ok := msg.Msg.(tmcon.EndHeightMessage); ok {
lastHeightFound = m.Height
if m.Height == height { // found
wal.Logger.Info("Found", "height", height, "index", index)
@@ -299,7 +268,7 @@ func NewWALEncoder(wr io.Writer) *WALEncoder {
// Encode writes the custom encoding of v to the stream. It returns an error if
// the encoded size of v is greater than 1MB. Any error encountered
// during the write is also returned.
func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
func (enc *WALEncoder) Encode(v *tmcon.TimedWALMessage) error {
pbMsg, err := WALToProto(v.Msg)
if err != nil {
return err
@@ -366,7 +335,7 @@ func NewWALDecoder(rd io.Reader) *WALDecoder {
}
// Decode reads the next custom-encoded value from its reader and returns it.
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
func (dec *WALDecoder) Decode() (*tmcon.TimedWALMessage, error) {
b := make([]byte, 4)
_, err := dec.rd.Read(b)
@@ -414,7 +383,7 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)}
}
tMsgWal := &TimedWALMessage{
tMsgWal := &tmcon.TimedWALMessage{
Time: res.Time,
Msg: walMsg,
}
@@ -424,12 +393,13 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
type nilWAL struct{}
var _ WAL = nilWAL{}
var _ tmcon.WAL = nilWAL{}
func (nilWAL) Write(m WALMessage) error { return nil }
func (nilWAL) WriteSync(m WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height uint64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
func (nilWAL) Write(m tmcon.WALMessage) error { return nil }
func (nilWAL) WriteSync(m tmcon.WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height uint64,
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
func (nilWAL) Start() error { return nil }

View File

@@ -1,31 +0,0 @@
// +build gofuzz
package consensus
import (
"bytes"
"io"
)
func Fuzz(data []byte) int {
dec := NewWALDecoder(bytes.NewReader(data))
for {
msg, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
if msg != nil {
panic("msg != nil on error")
}
return 0
}
var w bytes.Buffer
enc := NewWALEncoder(&w)
err = enc.Encode(msg)
if err != nil {
panic(err)
}
}
return 1
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/tendermint/tendermint/abci/example/kvstore"
cfg "github.com/tendermint/tendermint/config"
tmcon "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/privval"
@@ -101,7 +102,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(logger, NewWALEncoder(wr), uint64(numBlocks), numBlocksWritten)
// see wal.go#103
if err := wal.Write(EndHeightMessage{0}); err != nil {
if err := wal.Write(tmcon.EndHeightMessage{Height: 0}); err != nil {
t.Error(err)
}
@@ -190,13 +191,13 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks uint64, signal
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Write(m WALMessage) error {
func (w *byteBufferWAL) Write(m tmcon.WALMessage) error {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return nil
}
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg, ok := m.(tmcon.EndHeightMessage); ok {
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
if endMsg.Height == w.heightToStop {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
@@ -207,7 +208,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error {
}
w.logger.Debug("WAL Write Message", "msg", m)
err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
err := w.enc.Encode(&tmcon.TimedWALMessage{Time: fixedTime, Msg: m})
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
}
@@ -215,7 +216,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error {
return nil
}
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
func (w *byteBufferWAL) WriteSync(m tmcon.WALMessage) error {
return w.Write(m)
}
@@ -223,7 +224,7 @@ func (w *byteBufferWAL) FlushAndSync() error { return nil }
func (w *byteBufferWAL) SearchForEndHeight(
height uint64,
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}

View File

@@ -137,19 +137,19 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger, misbehaviors map[uint
}
// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
type MetricsProvider func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
return func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
return consensus.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
return consensus.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
}
}
@@ -472,7 +472,7 @@ func createConsensusReactor(config *cfg.Config,
mempool *mempl.CListMempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
csMetrics *consensus.Metrics,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger,